#|大数据进阶之路——Spark SQL基本配置


文章目录

      • Spark安装
      • 编译失败
      • 环境搭建
      • Standalone
      • 本地IDE
      • HiveContextAPP
      • SparkSessinon
      • Spark Shell
      • Spark Sql
      • thriftserver/beeline的使用
      • jdbc

MapReduce的局限性:
1)代码繁琐;
2)只能够支持map和reduce方法;
3)执行效率低下;
4)不适合迭代多次、交互式、流式的处理;
框架多样化:
1)批处理(离线):MapReduce、Hive、Pig
2)流式处理(实时): Storm、JStorm
3)交互式计算:Impala
学习、运维成本无形中都提高了很多
===> Spark
#|大数据进阶之路——Spark SQL基本配置
文章图片

Spark安装
前置要求:
1)Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+ 2)export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"

mvn编译命令:
./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
[hadoop@hadoop001 spark-2.1.0]$ cat pom.xml [hadoop@hadoop001 spark-2.1.0]$ pwd /home/hadoop/source/spark-2.1.02.2.0 2.5.0 ${hadoop.version} ...... ...............hadoop-2.62.6.4 0.9.3 3.4.6 2.6.0

路径下执行
[hadoop@hadoop001 spark-2.1.0]$ pwd /home/hadoop/source/spark-2.1.0

==> ./build/mvn -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0 -DskipTests clean package
编译可以运行的包
./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0
make-distribution.sh
#|大数据进阶之路——Spark SQL基本配置
文章图片

spark-$VERSION-bin-$NAME.tgz
—>spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz
编译失败
Failed to execute goal on project ...: Could not resolve dependencies for project ...

pom.xml中添加
cloudera https://repository.cloudera.com/artifactory/cloudera-repos/

如果scala2.10
需要添加./dev/change-scala-version.sh 2.10
环境搭建
local
  • tar -zxvf park-2.1.0-bin-2.6.0-cdh5.7.0.tgz -C ~/app/
  • 配置环境SPARK_HOME
  • source ~./bash_profile
运行
spark-shell --master local[2]
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37) at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37) a) Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the "BONECP" plugin to create a ConnectionPool gave an error : The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver. at org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:259) java:104).............................................at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005) 571) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:624)at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631) at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:325) at org.datanucleus.store.AbstractStoreManager.registerConnectionFactory(AbstractStoreManager.java:282) at org.datanucleus.store.AbstractStoreManager.(AbstractStoreManager.java:240)Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver. at org.datanucleus.store.rdbms.connectionpool.AbstractConnectionPoolFactory.loadDriver(AbstractConnectionPoolFactory.java:58) at org.datanucleus.store.rdbms.connectionpool.BoneCPConnectionPoolFactory.createConnectionPool(BoneCPConnectionPoolFactory.java:54) at org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:238) ... 145 more

原因没有引入mysql驱动
【#|大数据进阶之路——Spark SQL基本配置】spark-shell --master local[2] --jar /home/hadoop/software/mysql-connector-java-5.1.27-bin.jar
[hadoop@hadoop001 software]$ spark-shell --master local[2] --jars /home/hadoop/software/mysql-connector-java-5.1.27-bin.jar Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 20/10/16 20:42:32 WARN SparkContext: Support for Java 7 is deprecated as of Spark 2.0.0 20/10/16 20:42:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 20/10/16 20:42:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 20/10/16 20:42:50 ERROR ObjectStore: Version information found in metastore differs 1.1.0 from expected schema version 1.2.0. Schema verififcation is disabled hive.metastore.schema.verification so setting version. 20/10/16 20:42:53 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Spark context Web UI available at http://192.168.43.214:4041 Spark context available as 'sc' (master = local[2], app id = local-1602906155852). Spark session available as 'spark'. Welcome to ______ / __/_____ _____/ /__ _\ \/ _ \/ _ `/ __/'_/ /___/ .__/\_,_/_/ /_/\_\version 2.1.0 /_/Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51) Type in expressions to have them evaluated. Type :help for more information.

Standalone
Spark Standalone模式的架构和Hadoop HDFS/YARN很类似的
1 master + n worker
spark-env.sh
SPARK_MASTER_HOST=hadoop001 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=2g SPARK_WORKER_INSTANCES=1

master:
hadoop1

slaves:
hadoop2 hadoop3 hadoop4 .... hadoop10

==> start-all.sh 会在 hadoop1机器上启动master进程,在slaves文件配置的所有hostname的机器上启动worker进程
Spark WordCount统计
val file = spark.sparkContext.textFile(“file:///home/hadoop/data/wc.txt”)
val wordCounts = file.flatMap(line => line.split(",")).map((word => (word, 1))).reduceByKey(_ + _)
wordCounts.collect
#|大数据进阶之路——Spark SQL基本配置
文章图片

本地IDE
A master URL must be set in your configuration
点击edit configuration,在左侧点击该项目。在右侧VM options中输入“-Dspark.master=local”,指示本程序本地单线程运行,再次运行即可。
package org.exampleimport org.apache.spark.sql.SQLContext import org.apache.spark.{ SparkConf, SparkContext}object SQLContextAPP {def main(args: Array[String]): Unit = {//1创建相应的Spark val sparkConf = new SparkConf() sparkConf.setAppName("SQLContextAPP") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc)//2数据处理 val people = sqlContext.read.format("json").load("people.json") people.printSchema() people.show()//3关闭资源 sc.stop()}}

root |-- age: long (nullable = true) |-- name: string (nullable = true).........................| age|name| +----+-------+ |null|Michael| |30|Andy| |19| Justin| +----+-------+

配置maven环境变量cmd控制台提示:mvn不是内部或外部命令,也不是可运行的程序或批处理文件
首先maven环境变量:
变量名:MAVEN_HOME变量值:E:\apache-maven-3.2.3变量名:Path变量值:; %MAVEN_HOME%\bin

然后到项目的目录下直接执行
C:\Users\jacksun\IdeaProjects\SqarkSQL\ mvn clean package -DskipTests
#|大数据进阶之路——Spark SQL基本配置
文章图片

在集群上测试
spark-submit \ --name SQLContextApp \ --class org.example.SQLContextApp \ --master local[2] \ /home/hadoop/lib/sql-1.0.jar \ /home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json

HiveContextAPP
注意:
1)To use a HiveContext, you do not need to have an existing Hive setup
2)hive-site.xml
package org.exampleimport org.apache.spark.{ SparkConf, SparkContext} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContextobject HiveContextAPP {def main(args: Array[String]): Unit = {//1创建相应的Spark val path =args(0) val sparkConf = new SparkConf()//测试和生产中AppName和Master是通过脚本执行的 //sparkConf.setAppName("HiveContextAPP").setMaster("local[2]")val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc)//2数据处理 hiveContext.table("emp").show//3关闭资源 sc.stop()} }

spark-submit \ --name HiveContextApp \ --class org.example.HiveContextApp \ --master local[2] \ /home/hadoop/lib/sql-1.0.jar \ --jars /home/hadoop/software/mysql-connector-java-5.1.27-bin.jar

SparkSessinon
package org.exampleimport org.apache.spark.sql.SparkSessionobject SparkSessionApp {def main(args: Array[String]) {val spark = SparkSession.builder().appName("SparkSessionApp") .master("local[2]").getOrCreate()val people = spark.read.json("people.json") people.show()spark.stop() } }

Spark Shell
  • 启动hive
[hadoop@hadoop001 bin]$ pwd /home/hadoop/app/hive-1.1.0-cdh5.7.0/bin [hadoop@hadoop001 bin]$ hive ls: cannot access /home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/lib/spark-assembly-*.jar: No such file or directory which: no hbase in (/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/bin:/home/hadoop/app/scala-2.11.8/bin:/home/hadoop/app/hive-1.1.0-cdh5.7.0/bin:/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/bin:/home/hadoop/app/apache-maven-3.3.9/bin:/home/hadoop/app/jdk1.7.0_51/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin)Logging initialized using configuration in jar:file:/home/hadoop/app/hive-1.1.0-cdh5.7.0/lib/hive-common-1.1.0-cdh5.7.0.jar!/hive-log4j.properties WARNING: Hive CLI is deprecated and migration to Beeline is recommended. hive>

  • 拷贝
    [hadoop@hadoop001 conf]$ cp hive-site.xml ~/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/conf/
  • 启动Spark
    spark-shell --master local[2] --jars /home/hadoop/software/mysql-connector-java-5.1.27-bin.jar
scala> spark.sql("show tables").show +--------+------------+-----------+ |database|tableName|isTemporary| +--------+------------+-----------+ | default|dept|false| | default|emp|false| | default|hive_table_1|false| | default|hive_table_2|false| | default|t|false| +--------+------------+-----------+hive> show tables; OK dept emp hive_wordcount

scala> spark.sql("select * from emp e join dept d on e.deptno=d.deptno").show +-----+------+---------+----+----------+------+------+------+------+----------+--------+ |empno| ename|job| mgr|hiredate|sal|comm|deptno|deptno|dname|loc| +-----+------+---------+----+----------+------+------+------+------+----------+--------+ | 7369| SMITH|CLERK|7902|1980-12-17| 800.0|null|20|20|RESEARCH|DALLAS| | 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0|30|30|SALES| CHICAGO| | 7521|WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|30|30|SALES| CHICAGO| | 7566| JONES|MANAGER|7839|1981-4-2|2975.0|null|20|20|RESEARCH|DALLAS| | 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0|30|30|SALES| CHICAGO| | 7698| BLAKE|MANAGER|7839|1981-5-1|2850.0|null|30|30|SALES| CHICAGO| | 7782| CLARK|MANAGER|7839|1981-6-9|2450.0|null|10|10|ACCOUNTING|NEW YORK| | 7788| SCOTT|ANALYST|7566| 1987-4-19|3000.0|null|20|20|RESEARCH|DALLAS| | 7839|KING|PRESIDENT|null|1981-11-17|5000.0|null|10|10|ACCOUNTING|NEW YORK| | 7844|TURNER| SALESMAN|7698|1981-9-8|1500.0|0.0|30|30|SALES| CHICAGO| | 7876| ADAMS|CLERK|7788| 1987-5-23|1100.0|null|20|20|RESEARCH|DALLAS| | 7900| JAMES|CLERK|7698| 1981-12-3| 950.0|null|30|30|SALES| CHICAGO| | 7902|FORD|ANALYST|7566| 1981-12-3|3000.0|null|20|20|RESEARCH|DALLAS| | 7934|MILLER|CLERK|7782| 1982-1-23|1300.0|null|10|10|ACCOUNTING|NEW YORK| +-----+------+---------+----+----------+------+------+------+------+----------+--------+hive> select * from emp e join dept d on e.deptno=d.deptno > ; Query ID = hadoop_20201020054545_f7fbda3e-439e-409e-b2ce-3c553d969ed4 Total jobs = 1 20/10/20 05:48:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Execution log at: /tmp/hadoop/hadoop_20201020054545_f7fbda3e-439e-409e-b2ce-3c553d969ed4.log 2020-10-20 05:48:49 Starting to launch local task to process map join; maximum memory = 477102080 2020-10-20 05:48:51 Dump the side-table for tag: 1 with group count: 4 into file: file:/tmp/hadoop/5c8577b3-c00d-4ece-9899-c0e3de66f2f2/hive_2020-10-20_05-48-27_437_556791932773953494-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile01--.hashtable 2020-10-20 05:48:51 Uploaded 1 File to: file:/tmp/hadoop/5c8577b3-c00d-4ece-9899-c0e3de66f2f2/hive_2020-10-20_05-48-27_437_556791932773953494-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile01--.hashtable (404 bytes) 2020-10-20 05:48:51 End of local task; Time Taken: 2.691 sec. Execution completed successfully MapredLocal task succeeded Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_1602849227137_0002, Tracking URL = http://hadoop001:8088/proxy/application_1602849227137_0002/ Kill Command = /home/hadoop/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop job-kill job_1602849227137_0002 Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0 2020-10-20 05:49:13,663 Stage-3 map = 0%,reduce = 0% 2020-10-20 05:49:36,950 Stage-3 map = 100%,reduce = 0%, Cumulative CPU 13.08 sec MapReduce Total cumulative CPU time: 13 seconds 80 msec Ended Job = job_1602849227137_0002 MapReduce Jobs Launched: Stage-Stage-3: Map: 1Cumulative CPU: 13.08 secHDFS Read: 7639 HDFS Write: 927 SUCCESS Total MapReduce CPU Time Spent: 13 seconds 80 msec OK 7369 SMITH CLERK 7902 1980-12-17 800.0 NULL 20 20 RESEARCH DALLAS 7499 ALLEN SALESMAN 7698 1981-2-20 1600.0 300.0 30 30 SALES CHICAGO 7521 WARD SALESMAN 7698 1981-2-22 1250.0 500.0 30 30 SALES CHICAGO 7566 JONES MANAGER 7839 1981-4-2 2975.0 NULL 20 20 RESEARCH DALLAS 7654 MARTIN SALESMAN 7698 1981-9-28 1250.0 1400.0 30 30 SALES CHICAGO 7698 BLAKE MANAGER 7839 1981-5-1 2850.0 NULL 30 30 SALES CHICAGO 7782 CLARK MANAGER 7839 1981-6-9 2450.0 NULL 10 10 ACCOUNTING NEW YORK 7788 SCOTT ANALYST 7566 1987-4-19 3000.0 NULL 20 20 RESEARCH DALLAS 7839 KING PRESIDENT NULL 1981-11-17 5000.0 NULL 10 10 ACCOUNTINGNEW YORK 7844 TURNER SALESMAN 7698 1981-9-8 1500.0 0.0 30 30 SALES CHICAGO 7876 ADAMS CLERK 7788 1987-5-23 1100.0 NULL 20 20 RESEARCH DALLAS 7900 JAMES CLERK 7698 1981-12-3 950.0 NULL 30 30 SALES CHICAGO 7902 FORD ANALYST 7566 1981-12-3 3000.0 NULL 20 20 RESEARCH DALLAS 7934 MILLER CLERK 7782 1982-1-23 1300.0 NULL 10 10 ACCOUNTING NEW YORK Time taken: 71.998 seconds, Fetched: 14 row(s) hive>

SPARK SQL 基本秒出结果,hive比较耗时
  • hive-site.xml
    删除警告
hive.metastore.schema.verification false

Spark Sql
20/10/20 06:20:09 INFO DAGScheduler: Job 1 finished: processCmd at CliDriver.java:376, took 0.261151 s 7369 SMITH CLERK 7902 1980-12-17 800.0 NULL 20 20 RESEARCH DALLAS 7499 ALLEN SALESMAN 7698 1981-2-20 1600.0 300.0 30 30 SALES CHICAGO 7521 WARD SALESMAN 7698 1981-2-22 1250.0 500.0 30 30 SALES CHICAGO 7566 JONES MANAGER 7839 1981-4-2 2975.0 NULL 20 20 RESEARCH DALLAS 7654 MARTIN SALESMAN 7698 1981-9-28 1250.0 1400.0 30 30 SALES CHICAGO 7698 BLAKE MANAGER 7839 1981-5-1 2850.0 NULL 30 30 SALES CHICAGO 7782 CLARK MANAGER 7839 1981-6-9 2450.0 NULL 10 10 ACCOUNTING NEW YORK 7788 SCOTT ANALYST 7566 1987-4-19 3000.0 NULL 20 20 RESEARCH DALLAS 7839 KING PRESIDENT NULL 1981-11-17 5000.0 NULL 10 10 ACCOUNTINGNEW YORK 7844 TURNER SALESMAN 7698 1981-9-8 1500.0 0.0 30 30 SALES CHICAGO 7876 ADAMS CLERK 7788 1987-5-23 1100.0 NULL 20 20 RESEARCH DALLAS 7900 JAMES CLERK 7698 1981-12-3 950.0 NULL 30 30 SALES CHICAGO 7902 FORD ANALYST 7566 1981-12-3 3000.0 NULL 20 20 RESEARCH DALLAS 7934 MILLER CLERK 7782 1982-1-23 1300.0 NULL 10 10 ACCOUNTING NEW YORK Time taken: 13.625 seconds, Fetched 14 row(s) 20/10/20 06:20:09 INFO CliDriver: Time taken: 13.625 seconds, Fetched 14 row(s)

explain extended select a.key*(2+3), b.value from t a join t b on a.key = b.key and a.key > 3;
== Parsed Logical Plan == 'Project [unresolvedalias(('a.key * (2 + 3)), None), 'b.value] +- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3)) :- 'UnresolvedRelation `t`, a +- 'UnresolvedRelation `t`, b== Analyzed Logical Plan == (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE)): double, value: string Project [(cast(key#321 as double) * cast((2 + 3) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#325, value#324] +- Join Inner, ((key#321 = key#323) && (cast(key#321 as double) > cast(3 as double))) :- SubqueryAlias a :+- MetastoreRelation default, t +- SubqueryAlias b +- MetastoreRelation default, t== Optimized Logical Plan == Project [(cast(key#321 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#325, value#324] +- Join Inner, (key#321 = key#323) :- Project [key#321] :+- Filter (isnotnull(key#321) && (cast(key#321 as double) > 3.0)) :+- MetastoreRelation default, t +- Filter (isnotnull(key#323) && (cast(key#323 as double) > 3.0)) +- MetastoreRelation default, t== Physical Plan == *Project [(cast(key#321 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#325, value#324] +- *SortMergeJoin [key#321], [key#323], Inner :- *Sort [key#321 ASC NULLS FIRST], false, 0 :+- Exchange hashpartitioning(key#321, 200) :+- *Filter (isnotnull(key#321) && (cast(key#321 as double) > 3.0)) :+- HiveTableScan [key#321], MetastoreRelation default, t +- *Sort [key#323 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key#323, 200) +- *Filter (isnotnull(key#323) && (cast(key#323 as double) > 3.0)) +- HiveTableScan [key#323, value#324], MetastoreRelation default, t

thriftserver/beeline的使用
spark下的sbin
  1. 启动thriftserver:
    ./start-thriftserver.sh --master local[2] --jars /home/hadoop/software/mysql-connector-java-5.1.27-bin.jar
    #|大数据进阶之路——Spark SQL基本配置
    文章图片
默认端口是10000 ,可以修改
./start-thriftserver.sh\ --master local[2] \ --jars ~/software/mysql-connector-java-5.1.27-bin.jar\ --hiveconf hive.server2.thrift.port=14000

2)启动beeline
beeline -u jdbc:hive2://localhost:10000 -n hadoop
#|大数据进阶之路——Spark SQL基本配置
文章图片

beeline -u jdbc:hive2://localhost:14000 -n hadoop
thriftserver和普通的spark-shell/spark-sql有什么区别?
1)spark-shell、spark-sql都是一个spark application;
2)thriftserver
  • 不管你启动多少个客户端(beeline/code),永远都是一个spark application
  • 解决了一个数据共享的问题,多个客户端可以共享数据;
jdbc
注意事项:在使用jdbc开发时,一定要先启动thriftserver Exception in thread "main" java.sql.SQLException: Could not open client transport with JDBC Uri: jdbc:hive2://hadoop001:14000: java.net.ConnectException: Connection refused

org.spark-project.hive hive-jdbc 1.2.1.spark2

package org.example import java.sql.DriverManager object JDBCApp {def main(args: Array[String]) {Class.forName("org.apache.hive.jdbc.HiveDriver")val conn = DriverManager.getConnection("jdbc:hive2://192.168.43.214:10000","hadoop","") val pstmt = conn.prepareStatement("select empno, ename, sal from emp") val rs = pstmt.executeQuery() while (rs.next()) {println("empno:" + rs.getInt("empno") + " , ename:" + rs.getString("ename") + " , sal:" + rs.getDouble("sal"))}rs.close() pstmt.close() conn.close()}}

#|大数据进阶之路——Spark SQL基本配置
文章图片

    推荐阅读