spark读写Hbase数据
计算结果数据需要写入Hbase,特此记录
首先贴出hbase需要的maven依赖
【spark读写Hbase数据】1.2.3
hbase-client
slf4j-log4j12
hbase-server
package sparkHbaseimport java.util.logging.{Level, Logger}import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.internal.Loggingobject HbaseTest extends Logging{
def main(args: Array[String]): Unit = {
writeToHBaseNewAPI()
readFromHBaseWithHBaseNewAPIScan()
}/**
* 写Hbase
*/
def writeToHBaseNewAPI(): Unit ={
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARNING)
val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()val tableName = "person"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "ip地址")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)val jobConf = new JobConf(hbaseConf)
//设置job的输出格式
val job = Job.getInstance(jobConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])//此处的dataFrame是通过读kudu数据得来的,可换作其他方式
val df = sparkSession.read.format("org.apache.kudu.spark.kudu").options(Map("kudu.master"->"node1:7051,node2:7051,node3:7051","kudu.table"->"impala::db.table")).load()
.select("rowkey","name","age")
val rdd = df.rdd
try {
val data = https://www.it610.com/article/rdd.map(item=>{
val put = new Put(Bytes.toBytes(item.getAs[String]("rowkey")))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(item.getAs[String]("name")))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(item.getAs[Long]("age")))
(new ImmutableBytesWritable, put)
})
//保存到HBase表
data.saveAsNewAPIHadoopDataset(job.getConfiguration)
} catch {
case e: Exception =>
logError("write data to hbase failed.", e)
}sparkSession.stop()
}/**
* 读hbase数据
*/
def readFromHBaseWithHBaseNewAPIScan(): Unit ={
//屏蔽不必要的日志显示在终端上
val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()
val sc = sparkSession.sparkContextval tableName = "person"
/**
* hbase源初始化
*/
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "ip地址")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName)
verifyTableName(tableName,hbaseConf)/**
* 组装scan
*/
val scan = new Scan()
scan.addFamily(Bytes.toBytes("info"))
//设置读取的行数,包括起始行,不包括结束行
scan.setStartRow(Bytes.toBytes("1")) //startRow
scan.setStopRow(Bytes.toBytes("3")) //StopRow
//将scan类转化成string类型
val proto = ProtobufUtil.toScan(scan)
val scanToString = new String(Base64.encodeBytes(proto.toByteArray))hbaseConf.set(TableInputFormat.SCAN, scanToString)//读取数据并转化成rdd TableInputFormat是org.apache.hadoop.hbase.mapreduce包下的
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])hbaseRDD.foreachPartition(iter=>{
iter.foreach(item=>{
val rowKey = Bytes.toString(item._2.getRow)
val name = Bytes.toString(item._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
val age = Bytes.toLong(item._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
println(rowKey+" "+name+" "+age)
})
})//val dataRDD = hbaseRDD
//.map(x => x._2)
//.map{result =>
//(result.getRow, result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")), result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
//}.map(row => (new String(row._1), new String(row._2), new String(row._3)))
//.collect()
//.foreach(r => (println("rowKey:"+r._1 + ", name:" + r._2 + ", age:" + r._3)))
}private def verifyTableName(tableName: String, conf: Configuration) = {
val conn = ConnectionFactory.createConnection(conf)
try {
val admin = conn.getAdmin().asInstanceOf[HBaseAdmin]
if (!admin.tableExists(TableName.valueOf(tableName))) {
logError(s"${tableName} is not exsit")
}
} catch {
case e: Exception =>
logError("连接源失败", e)
} finally {
conn.close()
}
}
}
推荐阅读
- Spark|Spark 数据倾斜及其解决方案
- 我用我的坚持来推动读写
- mvcc原理和hbase实现
- hbase安装2019-03-11
- HDFS读写数据流
- 2018年6月25日+06号蒋昕妍+《卓有成效的管理者》第五章+21天阅读写作自律计划
- 2018年7月10日+20号water+最美的教育最简单+21天阅读写作自律计划
- 鲤鱼跃龙门&心灵读写(第260天)
- java文件读写操作实例详解
- python文件读写(二)-rar|python文件读写(二)-rar, zip,tar, gz压缩,解压缩文件