Spark对HBase进行数据的读写操作
读取HBase数据 通过newAPIHadoopRDD这个API构建读取的RDD
def read(): Unit = {
val conf = HBaseConfiguration.create()
val sparkConf = new SparkConf().setMaster("local").setAppName("HBase")
val sc = new SparkContext(sparkConf)
// 设置HBase的配置
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "slave1,slave2,slave3,spark")
conf.set("hbase.rootdir", "hdfs://master:9000/hbase")
conf.set("hbase.master", "master:60000")
// 设置查询的表明
conf.set(TableInputFormat.INPUT_TABLE, "spark_test")
// 注意TableInputFormat需要从org.apache.hadoop.hbase.mapreduce模块导入
val RDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[Result])
val count = RDD.count()
print("The Count of RDD:" + count)
RDD.cache()
// 遍历输出
RDD.foreach({case(_, result) =>
val key = Bytes.toString(result.getRow)
// name是列族,first_name是列名
val first_name = Bytes.toString(result.getValue("name".getBytes, "first_name".getBytes))
val last_name = Bytes.toString(result.getValue("name".getBytes(), "last_name".getBytes()))
printf("ID: %s, first_name: %s, last_name: %s", key, first_name, last_name)
})
}
往HBase写入数据 【Spark对HBase进行数据的读写操作】有两个API可以实现写入功能:
saveAsHadoopDataset
saveAsNewAPIHadoopDataset
这两个API最主要的区别就是传入的参数类型不同,第1个是org.apache.hadoop.mapred.JobConf
,第2个是org.apache.hadoop.conf.Configuration
;
还有就是设置写入格式的类不同,第1个是org.apache.hadoop.hbase.mapred.TableOutputFormat
,第2个是org.apache.hadoop.hbase.mapreduce.TableOutputFormat
def write(): Unit ={
val conf = HBaseConfiguration.create()
val sparkConf = new SparkConf().setMaster("local").setAppName("HBase")
val sc = new SparkContext(sparkConf)
// 设置HBase的配置
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "slave1,slave2,slave3,spark")
conf.set("hbase.rootdir", "hdfs://master:9000/hbase")
conf.set("hbase.master", "master:60000")
conf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE, "spark_test")
val job = new JobConf(conf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
// 输出表的设置可在job或conf二选一
//job.set(TableOutputFormat.OUTPUT_TABLE, "spark_test")
val data = https://www.it610.com/article/sc.makeRDD(Array("2,qun,zhang", "3,yingjun,zhang"))
val rdd = data.map(_.split(",")).map{arr => {
val put = new Put(Bytes.toBytes(arr(0))) // 行id
put.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first_name"), Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first_name"), Bytes.toBytes(arr(2)))
(new ImmutableBytesWritable, put)
}}
rdd.saveAsHadoopDataset(job)
}
saveAsNewAPIHadoopDataset
def writeNew(): Unit ={
val sparkConf = new SparkConf().setMaster("local").setAppName("HBase")
val sc = new SparkContext(sparkConf)
// 设置HBase的配置
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3,spark")
sc.hadoopConfiguration.set("hbase.rootdir", "hdfs://master:9000/hbase")
sc.hadoopConfiguration.set("hbase.master", "master:60000")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, "spark_test")val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
// org.apache.hadoop.hbase.mapreduce.TableOutputFormat
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val data = https://www.it610.com/article/sc.makeRDD(Array("2,qun,zhang", "3,yingjun,zhang"))
val rdd = data.map(_.split(",")).map{arr => {
val put = new Put(Bytes.toBytes(arr(0))) // 行id
put.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first_name"), Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first_name"), Bytes.toBytes(arr(2)))
(new ImmutableBytesWritable, put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
完整的代码已上传至https://github.com/SGyuanshi/Spark/blob/master/SparkScala/src/main/scala/HBase.scala
推荐阅读
- 布丽吉特,人生绝对的赢家
- 进必趋|进必趋 退必迟,问起对 视勿移
- 对称加密和非对称加密的区别
- 对抗抑郁最好的方法
- 装聋作哑,关系融洽
- 社保代缴公司服务费包含哪些
- 数组常用方法一
- 幸福的婚姻不争对错!读《你要的是幸福还是对错》有感。
- 把一切献给现在
- 年轻人对未来迷茫时该怎么办()