Spark DataFrame创建教程介绍学习如何创建Spark DataFrame是 Spark 环境中的第一个实际步骤之一。Spark DataFrames 有助于提供数据结构和其他数据操作功能的视图。根据数据源和文件的数据存储格式,存在不同的方法。
本文介绍了如何使用 PySpark 在 Python 中手动创建 Spark DataFrame,包括相关的Spark DataFrame创建示例。
先决条件
- 安装并配置了Python 3。
- 安装并配置了PySpark。
- 一个Python开发环境,准备测试代码示例(我们使用的是Jupyter笔记本电脑)。
1.创建一个列表,并使用解析它为DataFrame
toDataFrame()
从方法SparkSession
。2. 使用
toDF()
方法将 RDD 转换为 DataFrame
。3.
SparkSession
直接将文件作为DataFrame导入。这些示例使用示例数据和 RDD 进行演示,尽管一般原则适用于类似的数据结构。
注意: Spark 还提供了一个 Streaming API,用于近乎实时地流式传输数据。按照我们的动手指南试用 API:Spark Streaming Guide for Beginners。
Spark DataFrame创建教程:从数据列表创建 DataFrame要从数据列表创建 Spark DataFrame:
1. 用数据生成样本字典列表:
data = https://www.lsbin.com/[
{"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
{"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
{"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
{"Category": 'E', "ID": 4, "Value": 33.87, "Truth": True}
]
2. 导入并创建一个
SparkSession
:from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
3. 使用该
createDataFrame
方法创建一个DataFrame
。检查数据类型以确认变量是 DataFrame:df = spark.createDataFrame(data)
type(df)
文章图片
从 RDD 创建 DataFrame如何创建Spark DataFrame?在 Spark 中工作时的一个典型事件是从现有 RDD 创建一个 DataFrame。创建一个示例 RDD,然后将其转换为 DataFrame。
1. Spark DataFrame创建示例 - 制作一个包含玩具数据的字典列表:
data = https://www.lsbin.com/[
{"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
{"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
{"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
{"Category": 'E', "ID": 4, "Value": 33.87, "Truth": True}
]
2. 导入并创建一个
SparkContext
:from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("projectName").setMaster("local[
*]")
sc = SparkContext.getOrCreate(conf)
3. 从创建的数据生成 RDD。检查类型以确认对象是 RDD:
rdd = sc.parallelize(data)
type(rdd)
4.调用
toDF()
RDD上的方法创建DataFrame。测试对象类型以确认:df = rdd.toDF()
type(df)
文章图片
从数据源创建 DataFrame如何创建Spark DataFrame?Spark 可以处理大量外部数据源来构建 DataFrame。从文件读取的一般语法是:
spark.read.format('<
data source>').load('<
file path/file name>')
数据源名称和路径都是 String 类型。特定数据源还具有将文件作为 DataFrame 导入的替代语法。
从 CSV 文件创建
通过直接读取 CSV 文件来创建 Spark DataFrame:
df = spark.read.csv('<
file name>.csv')
通过提供路径列表将多个 CSV 文件读入一个 DataFrame:
df = spark.read.csv([
'<
file name 1>.csv', '<
file name 2>.csv', '<
file name 3>.csv'])
默认情况下,Spark 为每列添加一个标题。如果 CSV 文件具有要包含的标题,请
option
在导入时添加该方法:df = spark.read.csv('<
file name>.csv').option('header', 'true')
单个选项通过一个接一个地调用来堆叠。或者,
options
在导入过程中需要更多选项时使用该方法:df = spark.read.csv('<
file name>.csv').options(header = True)
文章图片
请注意,使用
option
vs.时的语法是不同的options
。从 TXT 文件创建
Spark DataFrame创建示例:从文本文件创建一个 DataFrame :
df = spark.read.text('<
file name>.txt')
文章图片
该
csv
方法是另一种从txt文件类型读取到 DataFrame 的方法。例如:df = spark.read.option('header', 'true').csv('<
file name>.txt')
文章图片
CSV 是一种文本格式,其中分隔符是逗号 (,),因此该函数能够从文本文件中读取数据。
Spark DataFrame创建教程:从 JSON 文件创建
通过运行以下命令从 JSON 文件创建 Spark DataFrame:
df = spark.read.json('<
file name>.json')
文章图片
从 XML 文件创建
默认情况下,XML 文件兼容性不可用。安装依赖项以从 XML 源创建 DataFrame。
1. 下载Spark XML 依赖项。将.jar文件保存在 Spark jar 文件夹中。
2. 通过运行将 XML 文件读入 DataFrame:
df = spark.read\
.format('com.databricks.spark.xml')\
.option('rowTag', 'row')\
.load('test.xml')
文章图片
rowTag
如果XML文件中的每一行都有不同的标签,请更改该选项。从 RDBMS 数据库创建 DataFrame如何创建Spark DataFrame?从RDBMS读取需要驱动程序连接器。该示例介绍了如何从 MySQL 数据库连接和提取数据。类似的步骤适用于其他数据库类型。
1. 下载MySQL Java Driver 连接器。将.jar文件保存在 Spark jar 文件夹中。
2. 运行SQL 服务器并建立连接。
3. 建立连接并将整个 MySQL数据库表提取到一个 DataFrame 中:
df = spark.read\
.format('jdbc')\
.option('url', 'jdbc:mysql://localhost:3306/db')\
.option('driver', 'com.mysql.jdbc.Driver')\
.option('dbtable','new_table')\
.option('user','root')\
.load()
文章图片
注意:需要创建数据库吗?按照我们的教程:如何在 Workbench 中创建 MySQL 数据库。
Spark DataFrame创建示例添加的选项如下:
- 该URL是
localhost:3306
如果服务器在本地运行。否则,获取你的数据库服务器的 URL 。 - 数据库名称扩展 URL 以访问服务器上的特定数据库。例如,如果一个数据库被命名
db
并且服务器在本地运行,则用于建立连接的完整 URL 是jdbc:mysql://localhost:3306/db
. - 表名确保整个数据库表被拉入DataFrame。使用
.option('query', '< query> ')
而不是.option('dbtable', '< table name> ')
运行特定查询而不是选择整个表。 - 使用数据库的用户名和密码建立连接。在没有密码的情况下运行时,省略指定的选项。
【如何创建Spark DataFrame(分步操作详细指南)】接下来,按照我们的教程之一学习如何处理 Python 中的缺失数据:在 Python 中处理缺失数据:原因和解决方案。
推荐阅读
- 如何使用Kubernetes部署执行滚动更新(分步指南)
- Kubernetes集群管理( 如何使用Rancher BMC集成())
- Terraform与Kubernetes有什么区别(哪个更好?)
- 如何更新Kali Linux(详细操作分步指南)
- Android_程序未处理异常的捕获与处理
- POJ 2486 Apple Tree
- Android6.0机型上调用系统相机拍照返回的resultCode值始终等于0的问题
- Android开发分享功能实现步骤
- Log4j Append属性指定是否追加内容