启用kerberos的环境下在CDH中使用Spark|启用kerberos的环境下在CDH中使用Spark SQL On HBase

一、前言 在大数据领域,海量存储与快速检索方面HBase早已有了自己的一席之地。MapReduce计算框架早已对接了HBase,以HBase作为数据源,完成批量数据的读写。而Hive默认底层以MapReduce作为计算引擎,支持 以HBase作为外部表,通过HQL对HBase中的数据进行分析,Hive On HBase 也是很好的满足在某些场景下通过SQL对HBase表中的数据进行分析。
如今即MapReduce之后,Spark在大数据领域有着举足轻重的地位,无论是跑批,流处理,甚至图计算等都有它的用武之地。因此类似于Hive On HBase这种通过SQL的方式对HBase数据做交互式分析。Spark SQL On HBase成为不少用户的需求。而截至目前Spark并未提供已HBase最为数据源。
二、Spark SQL On HBase社区相关的进展

  • hortonworks: Apache HBase Connector
  • 华为: Fast SQL on HBase using SparkSQL
  • cloudera: SparkOnHBase
三、如何使用Spark SQL On HBase 现在市面上的Spark对接HBase的方式多种多样,根据个人感觉,hortonworks公司的不错,因此本文选择hortonworks公司开源的对接方式。
以下是使用步骤:
  1. 编译源码
  2. 在源码中找到编译出来的jar,在提交作业时指定
  3. 在提交作业时,所使用的HBase jar,必须与编译源码时的HBase的版本对应
  4. 用过HBase用户去认证
  5. 通过命令行提交用用程序
以spark-shell为例,提交应用程序:
spark-shell --master yarn --jars shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar
在spark-shell中先导入相关包,并引用sqlContext的命令:
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.{DataFrame, SparkSession}
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
再按如下步骤依次执行:
  1. Define the catalog for the schema mapping:
【启用kerberos的环境下在CDH中使用Spark|启用kerberos的环境下在CDH中使用Spark SQL On HBase】def catalog = s"""{
|"table":{"namespace":"default","name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
  1. Prepare the data and populate the HBase table:
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte
)
object HBaseRecord {
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(
s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s”String$i: $t”,
i.toByte)
}
}
val data = https://www.it610.com/article/(0 to 255).map { i => HBaseRecord(i, “extra”)}
sc.parallelize(data).
toDF.
write.
options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
注意:在spark-shell中使用粘贴模式(:paste)执行case class HBaseRecord以及object HBaseRecord
  1. Load the DataFrame:
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
val df = withCatalog(catalog)
  1. Language integrated query:
val s = df.filter((($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005"||
$"col0" === "row020"||
$"col0" === "r20"||
$"col0" <= "row005") &&
($"col4" === 1 ||
$"col4" === 42))
.select("col0", "col1", "col4")
s.show
  1. SQL query:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show

    推荐阅读