spark读写Hbase数据

计算结果数据需要写入Hbase,特此记录
首先贴出hbase需要的maven依赖
【spark读写Hbase数据】1.2.3

org.apache.hbase
hbase-client
${hbase.version}


slf4j-log4j12
org.slf4j




org.apache.hbase
hbase-server
${hbase.version}

package sparkHbaseimport java.util.logging.{Level, Logger}import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.spark.sql.SparkSession import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.internal.Loggingobject HbaseTest extends Logging{ def main(args: Array[String]): Unit = { writeToHBaseNewAPI() readFromHBaseWithHBaseNewAPIScan() }/** * 写Hbase */ def writeToHBaseNewAPI(): Unit ={ // 屏蔽不必要的日志显示在终端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARNING) val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()val tableName = "person" val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "ip地址") hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)val jobConf = new JobConf(hbaseConf) //设置job的输出格式 val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])//此处的dataFrame是通过读kudu数据得来的,可换作其他方式 val df = sparkSession.read.format("org.apache.kudu.spark.kudu").options(Map("kudu.master"->"node1:7051,node2:7051,node3:7051","kudu.table"->"impala::db.table")).load() .select("rowkey","name","age") val rdd = df.rdd try { val data = https://www.it610.com/article/rdd.map(item=>{ val put = new Put(Bytes.toBytes(item.getAs[String]("rowkey"))) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(item.getAs[String]("name"))) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(item.getAs[Long]("age"))) (new ImmutableBytesWritable, put) }) //保存到HBase表 data.saveAsNewAPIHadoopDataset(job.getConfiguration) } catch { case e: Exception => logError("write data to hbase failed.", e) }sparkSession.stop() }/** * 读hbase数据 */ def readFromHBaseWithHBaseNewAPIScan(): Unit ={ //屏蔽不必要的日志显示在终端上 val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate() val sc = sparkSession.sparkContextval tableName = "person" /** * hbase源初始化 */ val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "ip地址") hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName) verifyTableName(tableName,hbaseConf)/** * 组装scan */ val scan = new Scan() scan.addFamily(Bytes.toBytes("info")) //设置读取的行数,包括起始行,不包括结束行 scan.setStartRow(Bytes.toBytes("1")) //startRow scan.setStopRow(Bytes.toBytes("3")) //StopRow //将scan类转化成string类型 val proto = ProtobufUtil.toScan(scan) val scanToString = new String(Base64.encodeBytes(proto.toByteArray))hbaseConf.set(TableInputFormat.SCAN, scanToString)//读取数据并转化成rdd TableInputFormat是org.apache.hadoop.hbase.mapreduce包下的 val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])hbaseRDD.foreachPartition(iter=>{ iter.foreach(item=>{ val rowKey = Bytes.toString(item._2.getRow) val name = Bytes.toString(item._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))) val age = Bytes.toLong(item._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"))) println(rowKey+" "+name+" "+age) }) })//val dataRDD = hbaseRDD //.map(x => x._2) //.map{result => //(result.getRow, result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")), result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"))) //}.map(row => (new String(row._1), new String(row._2), new String(row._3))) //.collect() //.foreach(r => (println("rowKey:"+r._1 + ", name:" + r._2 + ", age:" + r._3))) }private def verifyTableName(tableName: String, conf: Configuration) = { val conn = ConnectionFactory.createConnection(conf) try { val admin = conn.getAdmin().asInstanceOf[HBaseAdmin] if (!admin.tableExists(TableName.valueOf(tableName))) { logError(s"${tableName} is not exsit") } } catch { case e: Exception => logError("连接源失败", e) } finally { conn.close() } } }


    推荐阅读