简介
【spark-sql-perf】spark-sql-perf 是一个 spark sql 性能测试框架,可以用来进行一些基准测试。
测试环境:
- spark 2.4.0
- spark-sql-perf_2.11-0.5.0-SNAPSHOT
sudo yum install gcc make flex bison byacc git
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=LINUX
spark-sql-perf 编译打包,从
$spark-sql-perf/target/scala-2.11
下获得需要的jar包(spark-sql-perf_2.11-0.5.0-SNAPSHOT.jar)git clone https://github.com/databricks/spark-sql-perf.git
sbt package
启动 spark-shell
spark-shell \
--conf spark.executor.instances=40 \
--conf spark.executor.cores=3 \
--conf spark.executor.memory=8g \
--conf spark.executor.memoryOverhead=2g \
--jars scala-logging-slf4j_2.11-2.1.2.jar,scala-logging-api_2.11-2.1.2.jar,spark-sql-perf_2.11-0.5.0-SNAPSHOT.jar
生成数据
需要提前将
tpcds-kit
分发到所有 spark executor 节点import com.databricks.spark.sql.perf.tpcds.TPCDSTablesval rootDir = "hdfs://ns/user/admin/tpcds/data"
val dsdgenDir = "/path/to/tpcds-kit/tools"
val scaleFactor = "20"
val format = "parquet"
val databaseName = "tpcds"val sqlContext = spark.sqlContext
val tables = new TPCDSTables(sqlContext,
dsdgenDir = dsdgenDir,
scaleFactor = scaleFactor,
useDoubleForDecimal = true,
useStringForDate = true)tables.genData(
location = rootDir,
format = format,
overwrite = true,
partitionTables = true,
clusterByPartitionColumns = true,
filterOutNullPartitionValues = false,
tableFilter = "",
numPartitions = 120)//创建临时表
tables.createTemporaryTables(rootDir, format)
//将表信息注册到 hive metastore
//sql(s"create database $databaseName")
//tables.createExternalTables(rootDir, format, databaseName, overwrite = true, discoverPartitions = true)
执行查询
默认情况下,使用
runExperiment
会在后台线程中进行,最终将结果以JSON格式保存到 resultLocation
下时间戳命名的子目录中,例如 $resultLocation/timestamp=1429213883272
import com.databricks.spark.sql.perf.tpcds.TPCDSval tpcds = new TPCDS (sqlContext)
val databaseName = "tpcds"
sql(s"use $databaseName")val resultLocation = "hdfs://ns/user/admin/result"
val iterations = 1
val queries = tpcds.tpcds2_4Queries
//单个查询设置超时时间
val timeout = 300val experiment = tpcds.runExperiment(
queries,
iterations = iterations,
resultLocation = resultLocation,
forkThread = true)
experiment.waitForFinish(timeout)
文章图片
运行时截图 查询结果
有两种获取方式,如果 experiment 还没有关闭,可以使用
experiment.getCurrentResults
方法获取//从 experiment 获取结果
experiment.getCurrentResults.
withColumn("Name", substring(col("name"), 2, 100)).
withColumn("Runtime", (col("parsingTime") + col("analysisTime") + col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0).
selectExpr('Name, 'Runtime)
如果已经关闭,则可以从 resultLocation 中获取结果JSON文件并解析
//从文件中读取
val result = spark.read.json(resultLocation)
result.select("results.name","results.executionTime").flatMap(r=>{
val name = r.getAs[Seq[String]]("name")
val executionTime = r.getAs[Seq[Double]]("executionTime")
name.zip(executionTime)
}).toDF("name","executionTime").show()
TPC-DS TPC-DS采用星型、雪花型等多维数据模式。它包含7张事实表,17张纬度表平均每张表含有18列。其工作负载包含99个SQL查询,覆盖SQL99和2003的核心部分以及OLAP。这个测试集包含对大数据集的统计、报表生成、联机查询、数据挖掘等复杂应用,测试用的数据和值是有倾斜的,与真实数据一致。可以说TPC-DS是与真实场景非常接近的一个测试集,也是难度较大的一个测试集。
TPC-DS的这个特点跟大数据的分析挖掘应用非常类似。Hadoop等大数据分析技术也是对海量数据进行大规模的数据分析和深度挖掘,也包含交互式联机查询和统计报表类应用,同时大数据的数据质量也较低,数据分布是真实而不均匀的。因此TPC-DS成为客观衡量多个不同Hadoop版本以及SQL on Hadoop技术的最佳测试集。这个基准测试有以下几个主要特点:
- 一共99个测试案例,遵循SQL'99和SQL 2003的语法标准,SQL案例比较复杂
- 分析的数据量大,并且测试案例是在回答真实的商业问题
- 测试案例中包含各种业务模型(如分析报告型,迭代式的联机分析型,数据挖掘型等)
- 几乎所有的测试案例都有很高的IO负载和CPU计算需求
java.lang.ClassNotFoundException: com.typesafe.scalalogging.slf4j.LazyLogging
java.lang.ClassNotFoundException: com.typesafe.scalalogging.Logging
导入相应的包即可
--jars /path/to/scala-logging-slf4j_2.11-2.1.2.jar,/path/to/scala-logging-api_2.11-2.1.2.jar