2.pyspark.sql.DataFrame

Spark SQL和DataFrames重要的类有:

  • pyspark.sql.SQLContext: DataFrame和SQL方法的主入口
  • pyspark.sql.DataFrame: 将分布式数据集分组到指定列名的数据框中
  • pyspark.sql.Column :DataFrame中的列
  • pyspark.sql.Row: DataFrame数据的行
  • pyspark.sql.HiveContext: 访问Hive数据的主入口
  • pyspark.sql.GroupedData: 由DataFrame.groupBy()创建的聚合方法集
  • pyspark.sql.DataFrameNaFunctions: 处理丢失数据(空数据)的方法
  • pyspark.sql.DataFrameStatFunctions: 统计功能的方法
    -pyspark.sql.functions DataFrame:可用的内置函数
  • pyspark.sql.types: 可用的数据类型列表
  • pyspark.sql.Window: 用于处理窗口函数
2.pyspark.sql.DataFrame
class pyspark.sql.DataFrame(jdf,sql_ctx) """ 分布式的收集数据分组到命名列中。一个DataFrame相当于sparksql中一个相关的表,可在sqlcontext使用各种方法创建 eg1: people = sqlContext.read.parquet("...") 一旦创建,可以使用在DataFrame,Column中定义的不同的dsl方法操作 eg2: ageCol = people.age eg3: people = sqlContext.read.parquet("...") department = sqlContext.read.parquet("...") people.filter(people.age>30).join(department,people.deptId == department.id).groupBy(department.name, 'gender').agg({'salary':'avg','age':'max'}) """

2.1.agg:没有组的情况下聚集整个DataFrame(df.groupBy.aggg()简写)
a =[('jack',5),('john',4),('tom',2)] df = sqlContext.createDataFrame(a,['name','age']) df.agg({'age':'max'}).collect() --------------[Row(max(age)=5)]---------------------------- from pyspark.sql import functions as F df.agg(F.min(df.age)).collect() --------------[Row(min(age)=2)]---------------------------------

2.2.alias(alias):返回一个设置别名的新DataFrame
a = [('Alice',2),('Bob',5)] df = sqlContext.createDataFrame(a,['name','age']) from pyspark.sql.functions import * df_as1 = df.alias('df_as1') df_as2 = df.alias('sf_as2') joined_df = df_as1.join(df_as2,col('df_as1.name') == col('df_as2.name'),'inner') joined_df.select(col('df_as1.name'),col(df_as2.name),col('df_as2.age)).collect() --------------[Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)]-------------

2.3.cache():用默认的存储级别缓存数据(mermory_only_ser) 2.4.coalesce(numPartitions):返回一个有确切的分区数的分区的新的DataFrame,与在一个RDD上定义的合并类似,这个操作产生一个窄依赖,如果从1000个分区到100个分区,不会有shuffle过程,而是每100个新分区会需要当前分区10个
df.coalesce(1).rdd.getNumPartitions() ---------1--------------

2.5.collect():返回所有的记录数为行的列表
df.collect() ---------[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]--------------

2.6.columns:返回所有列名的列表
df.columns ---------['age', 'name']--------------

2.7.corr(col1,col2,menthod=None):计算一个DataFrame相关的两列为double值。通常只支持皮尔逊相关系数。DataFrame.corr()和DataFrameStatFunctions.corr()类似。
1.col1:第一列的名称 2.col2:第二列的名称 3.method:相关方法,当前只支持皮尔逊相关系数

2.8.count():返回DataFrame的行数
df.count() ---------2--------------

2.9.cov(col1,col2):计算由列名指定列的昂本协方差为double值。DataFrame.cov()和DataFrameStatFunctions.cov()类似
1.col1:第一列的名称 2.col2:第二列的名称

2.10.crasstab(col1,col2):计算给定列的份数频数表,也称为相关表。每一列的去重值的个数应该小于1e4.最多返回1e6个非零对.每一行的第一列都是col1的去重值。。。。。
1.col1 – 第一列的名称. 去重项作为每行的第一项。 2. col2 – 第二列的名称. 去重项作为DataFrame的列名称。

2.11.cube(*cols):创建使用指定列的当前DataFrame的多维立方体,这样可以聚合这些数据
l=[('Alice',2),('Bob',5)] df = sqlContext.createDataFrame(l,['name','age']) df.cube('name', df.age).count().show() +-----+----+-----+ | name| age|count| +-----+----+-----+ | null|2|1| |Alice|null|1| |Bob|5|1| |Bob|null|1| | null|5|1| | null|null|2| |Alice|2|1| +-----+----+-----+

2.12.describe(*cols):计算数值列的统计信息。包括计数,平均,标准差,最小和最大。如果没有指定任何列,这个函数计算统计所有数值列。
df.describe().show() +-------+------------------+ |summary|age| +-------+------------------+ |count|2| |mean|3.5| | stddev|2.1213203435596424| |min|2| |max|5| +-------+------------------+ df.describe().show() +-------+------------------+ |summary|age| +-------+------------------+ |count|2| |mean|3.5| | stddev|2.1213203435596424| |min|2| |max|5| +-------+------------------+

2.13.describe(*cols):返回行去重的新的DataFrame。
l=[('Alice',2),('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.distinct().count() 2

2.14.drop(col):返回删除指定列的新的DataFrame。
df.drop('age').collect() [Row(name=u'Alice'), Row(name=u'Bob')] >>> l1=[('Bob',5)] >>> df = sqlContext.createDataFrame(l1,['name','age']) >>> l2=[('Bob',85)] >>> df2 = sqlContext.createDataFrame(l2,['name','height']) >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect() [Row(age=5, height=85, name=u'Bob')] >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect() [Row(age=5, name=u'Bob', height=85)]

2.15.dropduplicates(subset=None):返回去掉重复行的一个新的DataFrame ,通常只考虑某几行。drop_duplicates()和dropDuplicates()类似。
from pyspark.sql import Row df sc.parallelize([Row(name='Alice',age=5.height=80),Row(name='Alice',age=5,height=80), Row(name='Alice',age=10,height=80)]).toDF() df.dorpDuplicates().show() +---+------+-----+ |age|height| name| +---+------+-----+ |5|80|Alice| | 10|80|Alice| +---+------+-----+df.dropDuplicates(['name', 'height']).show() +---+------+-----+ |age|height| name| +---+------+-----+ |5|80|Alice| +---+------+-----+

2.16.drop_duplicates(subset=None):同上 2.17.dropna(how='any',thresh=None.subset=None):返回一个删除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()类似。
1.how – 'any'或者'all'。如果'any',删除包含任何空值的行。如果'all',删除所有值为null的行。 2.thresh – int,默认为None,如果指定这个值,删除小于阈值的非空值的行。这个会重写'how'参数。 3.subset – 选择的列名称列表。>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> dfnew = df.cube('name', df.age).count() >>> dfnew.show() +-----+----+-----+ | name| age|count| +-----+----+-----+ | null|2|1| |Alice|null|1| |Bob|5|1| |Bob|null|1| | null|5|1| | null|null|2| |Alice|2|1| +-----+----+-----+ >>> dfnew.na.drop().show() +-----+---+-----+ | name|age|count| +-----+---+-----+ |Bob|5|1| |Alice|2|1| +-----+---+-----+

2.18.dtypes:返回所有列名及类型的列表。
>>> df.dtypes [('age', 'int'), ('name', 'string')]

2.19.explain(extended=False):将逻辑和物理计划打印到控制塔以进行调试
1. extended – boolean类型,默认为False。如果为False,只打印物理计划。 >>> df.explain() == Physical Plan == Scan ExistingRDD[age#0,name#1]>>> df.explain(True) == Parsed Logical Plan == ... == Analyzed Logical Plan == ... == Optimized Logical Plan == ... == Physical Plan == ...

2.20.fillna(value,subset=None):替换空值和na.fill()类似,DataFrame.fillna()和dataframenafunctions.fill()类似。
1.value:要代替空值的值有int,long,float,string或dict.如果值是字典,subset参数将被忽略。值必须是要替换的列的映射, 替换值必须是int,long,float或者string. 2.subset:要替换的列名列表。在subset指定的列,没有对应数据类型的会被忽略。例如,如果值是字符串,subset包含一个非字符串的列, 这个非字符串的值会被忽略。 >>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> dfnew = df.cube('name', df.age).count() >>> dfnew.show() +-----+----+-----+ | name| age|count| +-----+----+-----+ | null|2|1| |Alice|null|1| |Bob|5|1| |Bob|null|1| | null|5|1| | null|null|2| |Alice|2|1| +-----+----+-----+ >>> dfnew.na.fill(50).show() +-----+---+-----+ | name|age|count| +-----+---+-----+ | null|2|1| |Alice| 50|1| |Bob|5|1| |Bob| 50|1| | null|5|1| | null| 50|2| |Alice|2|1| +-----+---+-----+ >>> dfnew.na.fill({'age': 50, 'name': 'unknown'}).show() +-------+---+-----+ |name|age|count| +-------+---+-----+ |unknown|2|1| |Alice| 50|1| |Bob|5|1| |Bob| 50|1| |unknown|5|1| |unknown| 50|2| |Alice|2|1| +-------+---+-----+

2.21.folter(condition):用给定的条件过滤行。where()和filter()类型。
condition:一个列的bool类型或字符串的SQL表达式。 >>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.filter(df.age > 3).collect() [Row(age=5, name=u'Bob')] >>> df.where(df.age == 2).collect() [Row(age=2, name=u'Alice')] >>> df.filter("age > 3").collect() [Row(age=5, name=u'Bob')] >>> df.where("age = 2").collect() [Row(age=2, name=u'Alice')]

2.22.first():返回第一行。
>>> df.first() Row(age=2, name=u'Alice')

2.23.flatMap(f):返回在每行应用F函数后的新的RDD,然后将结果压扁,是df.rdd.flatMap()的简写。
df.flatMap(lambda p:p.name).collect() [u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']

2.24.foreach(f):应用f函数到DataFrame的所有行。是df.rdd.foreach()的简写。
>>> def f(person): ...print(person.name) >>> df.foreach(f) Alice Bob

2.25.foreachPartition(f):应用f函数到DataFrame的每一个分区。是 df.rdd.foreachPartition()的缩写。
>>> def f(people): ...for person in people: ...print(person.name) >>> df.foreachPartition(f) Alice Bob

2.26.freqItems(cols,support=None):
1.cols:要计算重复项的列名,为字符串类型的列表或者元祖。 2.support:要计算频率项的频率值。默认是1%。参数必须大于1e-4.

2.27.groupBy(*cols):使用指定的列分组DataFrame,这样可以聚合计算。可以从GroupedData查看所有可用的聚合方法。groupby()和groupBy()类似。
1.cols: 分组依据的列。每一项应该是一个字符串的列名或者列的表达式。 >>> df.groupBy().avg().collect() [Row(avg(age)=3.5)] >>> df.groupBy('name').agg({'age': 'mean'}).collect() [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)] >>> df.groupBy(df.name).avg().collect() [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)] >>> df.groupBy(['name', df.age]).count().collect() [Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)]

2.28.head(n=None):返回前n行
1.n:int类型,默认为1,要返回的行数。 2.返回值:如果n大于1,返回行列表,如果n为1,返回单独的一行。 >>> df.head() Row(age=2, name=u'Alice') >>> df.head(1) [Row(age=2, name=u'Alice')]

2.29.insertInto(tableName, overwrite=False):插入DataFrame内容到指定表。注:在1.4中已过时,使用DataFrameWriter.insertInto()代替。 2.30.intersect(other):返回新的DataFrame,包含仅同时在当前框和另一个框的行。相当于SQL中的交集。 2.31.join(other,on=None,how=None):使用给定的关联表达式,关联另一个DataFrame。以下执行df1和df2之间完整的外连接。
1.other:连接的右侧 2.on:一个连接的列名称字符串, 列名称列表,一个连接表达式(列)或者列的列表。如果on参数是一个字符串或者字符串列表,表示连接列的名称,这些名称必须同时存在join的两个表中, 这样执行的是一个等价连接。 3.how:字符串,默认'inner'。inner,outer,left_outer,right_outer,leftsemi之一。 >>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> l2=[('Tom',80),('Bob',85)] >>> df2 = sqlContext.createDataFrame(l2,['name','height']) >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect() [Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] >>> df.join(df2, 'name', 'outer').select('name', 'height').collect() [Row(name=u'Tom', height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] >>> l3=[('Alice',2,60),('Bob',5,80)] >>> df3 = sqlContext.createDataFrame(l3,['name','age','height']) >>> cond = [df.name == df3.name, df.age == df3.age] >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] >>> df.join(df2, 'name').select(df.name, df2.height).collect() [Row(name=u'Bob', height=85)] >>> l4=[('Alice',1),('Bob',5)] >>> df4 = sqlContext.createDataFrame(l4,['name','age']) >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect() [Row(name=u'Bob', age=5)]

2.32.limit(num):将结果计数限制为指定的数字。
>>> df.limit(1).collect() [Row(age=2, name=u'Alice')] >>> df.limit(0).collect() []

2.33.map(f):通过每行应用f函数返回新的RDD,是 df.rdd.map()的缩写。
df.map(lambda p:p.name).collect() [u'Alice', u'Bob']

2.34.mapPartitions(f,preservesPartitioning=False):通过每分区应用f函数返回新的RDD,是 df.rdd.mapPartitions()的缩写。
rdd = sc.parallelize([1,2,3,4],4) def f(iterator):yield 1 df.mapPartitions(f).sum() 4

2.35.na():返回DataFrameNaFunctions用于处理缺失值。 2.36.orderBy(*cols, **kwargs):返回按照指定列排序的新的DataFrame。
1.cols:用来排序的列或列名称的列表。 2.ascending:布尔值或布尔值列表(默认 True). 升序排序与降序排序。指定多个排序顺序的列表。如果指定列表, 列表的长度必须等于列的长度。 >>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.sort(df.age.desc()).collect() [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] >>> df.sort("age", ascending=False).collect() [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] >>> df.orderBy(df.age.desc()).collect() [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] >>> from pyspark.sql.functions import * >>> df.sort(asc("age")).collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] >>> df.orderBy(desc("age"), "name").collect() [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect() [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]

2.37.persist(storageLevel=StorageLevel(false,True,False,False,1)):设置存储级别以在第一次操作运行完成后保存其值。这只能用来分配新的存储级别,如果RDD没有设置存储级别的话。如果没有指定存储级别,默认为(memory_only_ser)。 2.38.printSchema():打印schema以树的格式
>>> df.printSchema() root |-- name: string (nullable = true) |-- age: long (nullable = true)

2.39.randomSplit(weights,seed=None):随机提供的权重随机的划分DataFrame
1.weights:doubles类型的列表作为权重来划分DataFrame。权重会被恢复如果总值不到1.0 2.seed:random的随机数 >>> l4=[('Alice',1),('Bob',5),('Jack',8),('Tom',10)] >>> df4 = sqlContext.createDataFrame(l4,['name','age']) >>> splits = df4.randomSplit([1.0, 2.0],24) >>> splits[0].count() 1 >>> splits[1].count() 3

2.40.rdd:返回内容为行的RDD。 2.41.registerAsTable(name):注:在1.4中已过时,使用registerTempTable()代替。 2.42.registerTempTable(name):使用给定的名字注册该RDD为临时表,这个临时表的有效期与用来创建这个DataFrame的SQLContext相关
>>> df.registerTempTable("people") >>> df2 = sqlContext.sql("select * from people") >>> sorted(df.collect()) == sorted(df2.collect()) True

2.43.repartition(numPartitions,*cols):按照给定的分区表达式分区,返回新的DataFrame。产生的DataFrame是哈希分区。numPartitions参数可以是一个整数来指定分区数,或者是一个列。如果是一个列,这个列会作为第一个分区列。如果没有指定,将使用默认的分区数。1.6版本修改: 添加可选参数可以指定分区列。如果分区列指定的话,numPartitions也是可选的。
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.repartition(10).rdd.getNumPartitions() 10 >>> data = https://www.it610.com/article/df.unionAll(df).repartition("age") >>> data.show() +-----+---+ | name|age| +-----+---+ |Alice|2| |Alice|2| |Bob|5| |Bob|5| +-----+---+ >>> data = https://www.it610.com/article/data.repartition(7,"age") >>> data.show() +-----+---+ | name|age| +-----+---+ |Bob|5| |Bob|5| |Alice|2| |Alice|2| +-----+---+ >>> data.rdd.getNumPartitions() 7 >>> data = https://www.it610.com/article/data.repartition("name", "age") >>> data.show() +-----+---+ | name|age| +-----+---+ |Bob|5| |Bob|5| |Alice|2| |Alice|2| +-----+---+

2.44.replace(to_replace,value,subset=None):返回用另外一个值替换了一个值的新的DataFrame。DataFrame.replace() 和 DataFrameNaFunctions.replace() 类似。
1.to_replace:整形,长整形,浮点型,字符串,或者列表。要替换的值。如果值是字典,那么值会被忽略,to_replace必须是一个从列名(字符串)到要替换的 值的映射。要替换的值必须是一个整形,长整形,浮点型,或者字符串。 2.value:整形,长整形,浮点型,字符串或者列表。要替换为的值。要替换为的值必须是一个整形,长整形,浮点型,或者字符串。如果值是列表或者元组,值应该 和to_replace有相同的长度。 3.subset:要考虑替换的列名的可选列表。在subset指定的列如果没有匹配的数据类型那么将被忽略。例如,如果值是字符串,并且subset参数包含一个非字 符串的列,那么非字符串的列被忽略。 >>> l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)] >>> df4 = sqlContext.createDataFrame(l4,['name','age','height']) >>> df4.na.replace(10, 20).show() +-----+----+------+ | name| age|height| +-----+----+------+ |Alice|20|80| |Bob|5|null| |Tom|null|null| | null|null|null| +-----+----+------+ >>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show() +----+----+------+ |name| age|height| +----+----+------+ |A|10|80| |B|5|null| | Tom|null|null| |null|null|null| +----+----+------+

2.45.rollup(*cols):使用指定的列为当前的DataFrame创建一个多维汇总,这样可以聚合这些数据。
>>> l=[('Alice',2,80),('Bob',5,None)] >>> df = sqlContext.createDataFrame(l,['name','age','height']) >>> df.rollup('name', df.age).count().show() +-----+----+-----+ | name| age|count| +-----+----+-----+ |Alice|null|1| |Bob|5|1| |Bob|null|1| | null|null|2| |Alice|2|1| +-----+----+-----+

2.46.sample(withReplacement,fractions,seed=None):返回DataFrame的子集采样。
>>> df.sample(False, 0.5, 42).count() 2

2.47.sampleBy(col,fractions,seed=None):根据每个层次上给出的分数,返回没有替换的分层样本。返回没有替换的分层抽样 基于每层给定的一小部分 在给定的每层的片段
1.col:定义层的列 2.fractions:每层的抽样数。如果没有指定层, 将其数目视为0. 3.seed:随机数 4.返回值:返回代表分层样本的新的DataFrame >>> from pyspark.sql.functions import col >>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key")) >>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0) >>> sampled.groupBy("key").count().orderBy("key").show() +---+-----+ |key|count| +---+-----+ |0|5| |1|9| +---+-----+

2.48.save(path=None, source=None, mode='error', **options):保存DataFrame的数据到数据源。注:在1.4中已过时,使用DataFrameWriter.save()代替。 2.49.saveAsParquetFile(path):保存内容为一个Parquet文件,代表这个schema。注:在1.4中已过时,使用DataFrameWriter.parquet() 代替。 2.50.saveAsTable(tableName, source=None, mode='error', **options):将此DataFrame的内容作为表保存到数据源注:在1.4中已过时,使用DataFrameWriter.saveAsTable() 代替。 2.51.schema:返回DataFrame的schema为types.StructType。
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.schema StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))

2.52.select(*col):提供一组表达式并返回一个新的DataFrame。
1.cols:列名(字符串)或表达式(列)列表。 如果其中一列的名称为“*”,那么该列将被扩展为包括当前DataFrame中的所有列。 >>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.select('*').collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] >>> df.select('name', 'age').collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] >>> df.select(df.name, (df.age + 10).alias('age')).collect() [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]

2.53.selectExpr(*expr):投射一组SQL表达式并返回一个新的DataFrame。这是接 受SQL表达式的select()的变体。
>>> df.selectExpr("age * 2", "abs(age)").collect() [Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]

2.54.show(n=20, truncate=True):将前n行打印到控制台。 受SQL表达式的select()的变体。
1.n:要显示的行数。 2.truncate:是否截断长字符串并对齐单元格。 >>> df DataFrame[name: string, age: bigint] >>> df.show() +-----+---+ | name|age| +-----+---+ |Alice|2| |Bob|5| +-----+---+

2.55.sort(*cols, **kwargs):返回按指定列排序的新DataFrame。 受SQL表达式的select()的变体。
1. cols:要排序的列或列名称列表。 2.ascending :布尔值或布尔值列表(默认为True)。 排序升序降序。 指定多个排序顺序的列表。 如果指定了列表,列表的长度必须等于列的长度。

2.56.sortWithinPartitions(*cols, **kwargs):返回一个新的DataFrame,每个分区按照指定的列排序。 【2.pyspark.sql.DataFrame】受SQL表达式的select()的变体。
1. cols:要排序的列或列名称列表。 2.ascending :布尔值或布尔值列表(默认为True)。 排序升序降序。 指定多个排序顺序的列表。 如果指定了列表,列表的长度必须等于列的长度。 >>> df.sortWithinPartitions("age", ascending=False).show() +-----+---+ | name|age| +-----+---+ |Alice|2| |Bob|5| +-----+---+

2.57.stat:返回统计功能的DataFrameStatFunctions。 2.58.subtract(other):返回一个新的DataFrame,这个DataFrame中包含的行不在另一个DataFrame中。这相当于SQL中的EXCEPT。 2.59.take(num):返回前num行的行列表
>>> df.take(2) [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]

2.60.toDF(*cols):返回一个新类:具有新的指定列名称的DataFrame。
1.cols:新列名列表(字符串)。 >>> df.toDF('f1', 'f2').collect() [Row(f1=u'Alice', f2=2), Row(f1=u'Bob', f2=5)]

2.61.toJSON(use_unicode=True):将DataFrame转换为字符串的RDD。每行都将转换为JSON格式作为返回的RDD中的一个元素。
>>> df.toJSON().first() u'{"name":"Alice","age":2}'

2.62.toJSONPandas():将此DataFrame的内容返回为Pandas pandas.DataFrame。这只有在pandas安装和可用的情况下才可用。
>>> df.toPandas() agename 02Alice 15Bob

2.63.unionAll(other):返回包含在这个frame和另一个frame的行的联合的新DataFrame。这相当于SQL中的UNION ALL。 2.64.unpersist(blocking=True):将DataFrame标记为非持久性,并从内存和磁盘中删除所有的块。 2.65.where(condition):使用给定表达式过滤行。where()是filter()的别名。
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.filter(df.age > 3).collect() [Row(name=u'Bob', age=5)] >>> df.where(df.age == 2).collect() [Row(name=u'Alice', age=2)]>>> df.filter("age > 3").collect() [Row(name=u'Bob', age=5)] >>> df.where("age = 2").collect() [Row(name=u'Alice', age=2)]

2.66.withColumn(colName, col):通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
1.colName:字符串,新列的名称 2.col:新列的列表达式 df.withColumn('age2', df.age + 2).collect() [Row(name=u'Alice', age=2, age2=4), Row(name=u'Bob', age=5, age2=7)]

2.67.withColumnRenamed(existing, new):通过重命名现有列来返回新的DataFrame。
1.existing:字符串,要重命名的现有列的名称 2.new:字符串,列的新名称 >>> df.withColumnRenamed('age', 'age2').collect() [Row(name=u'Alice', age2=2), Row(name=u'Bob', age2=5)]

2.68.write:用于将DataFrame的内容保存到外部存储的接口。返回:DataFrameWriter [引用原文](https://www.cnblogs.com/wonglu/p/7784825.html)

    推荐阅读