SPARK基础4(DataFrame操作)

【SPARK基础4(DataFrame操作)】在上文《SPARK基础2(读入文件、转临时表、RDD与DataFrame)》中,我们简单介绍了spark中的DataFrame,我们知道了spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能。在本文中我们主要介绍,DataFrame基本API常用操作。
查看数据

// 默认只显示20条 commodityDF.show() // 是否最多只显示20个字符,默认为true commodityDF.show(false) // 完整查看10条数据 commodityDF.show(10, false) // 取前n行数据, 和take与head不同的是,limit方法不是Action操作。 commodityDF.limit(5).show(false)

加载数据到数组
// 将数据加载到集合里面 commodityDF.collect().foreach(println) // 和collect类似,只不过转换成list commodityDF.collectAsList()

获取指定字段的统计信息
// 获取指定字段的统计信息count, mean, stddev, min, max 返回的还是Dataframe commodityDF.describe("price", "yprice").show(false) // 遍历每个统计信息 commodityDF.describe("price", "yprice").collect().foreach(println) // 取0行的 "count" 数据 println(commodityDF.describe("price", "yprice").collect()(0))

去重
// 去除一行数据完全相同的 println(df.distinct().count()) // 删除指定字段存在相同的数据 println(df.dropDuplicates(Seq("price", "yprice")).count())

获取n行数据
// 获取第一行数据 println(commodityDF.first()) println(commodityDF.head()) // 获取前5条数据 commodityDF.head(5).foreach(println) commodityDF.take(5).foreach(println) // 以行数据list返回 commodityDF.takeAsList(5)

条件查询
// where和filter方法和SQL的where后面的语句一样 commodityDF.where("price>100 or yprice<200").show() commodityDF.filter("price>100 or yprice<200").show()

选取字段
// 选取 name ,price字段 commodityDF.select("name", "price").show(5, false) // 对price字段的数据都+100 commodityDF.select(commodityDF("name"), commodityDF("price") + 100).show(5) // 对指定字段进行特殊处理; price字段重名名为 p, 对price取四舍五入 commodityDF.selectExpr("name", "price as p", "round(price)").show(10) // 只获取单个字段 val name = commodityDF.col("name") val parice = commodityDF.apply("price")

删除指定字段
// 删除指定字段 price val c1 = commodityDF.drop("price") val c2 = c1.drop(c1("yprice")) c2.show(5)

排序
// 降序排序 commodityDF.orderBy(-commodityDF("price")).show(5) commodityDF.orderBy(commodityDF("price").desc).show(5) // 升序排序 commodityDF.orderBy("price").show(5) commodityDF.orderBy(commodityDF("price")).show(5) //多列排序 df.sort(df.age.desc(), df.name.asc()).show()

分组
// 对字段数据分组, 再对分组后的数据处理 count max mean sum agg commodityDF.groupBy("degree").count().show(5) commodityDF.groupBy(commodityDF("degree")).count().show(5) commodityDF.groupBy(commodityDF("degree")).max("price", "yprice").show(5)

agg聚合
// 聚合agg一般和group by一起使用 commodityDF.agg("price" -> "max", "yprice" -> "sum").show() // 对degree分组然后取 price字段的最大值和downNum的平均值 commodityDF.groupBy("degree").agg("price" -> "max", "downNum" -> "mean").show()

join
// 组合数据DF val df1 = commodityDF.limit(5) val df2 = commodityDF.limit(10) val df3 = commodityDF.filter("id>5 and id<11") // 只有id字段相同的才会横向组合 inner df1.join(df2, "id").show() df1.join(df2, df1("id") === df2("id")).show() df1.join(df2, df1("id") === df2("id"), "inner").show() // 根据 id和name两个字段join df2.join(df3, Seq("id", "name")).show()Leaddetails.join( Utm_Master, Leaddetails("LeadSource") <=> Utm_Master("LeadSource") && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source") && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium") && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"), "left" )

重命名
// 重命名字段名,如果指定的字段名不存在,不进行任何操作 commodityDF.limit(5).show() commodityDF.withColumnRenamed("name", "reName").limit(5).show() df.select(df.name.alias("username"),df.age).show()

添加字段
// 往当前DataFrame中新增一列 commodityDF.withColumn("newCol", commodityDF("name")).show()

    推荐阅读