数据湖(十六)(Structured Streaming实时写入Iceberg)

男儿欲遂平生志,五经勤向窗前读。这篇文章主要讲述数据湖(十六):Structured Streaming实时写入Iceberg相关的知识,希望能为你提供帮助。


  Structured Streaming实时写入Iceberg目前Spark中Structured Streaming只支持实时向Iceberg中写入数据,不支持实时从Iceberg中读取数据,下面案例我们将使用Structured Streaming从Kafka中实时读取数据,然后将结果实时写入到Iceberg中。
一、创建Kafka topic启动Kafka集群,创建“kafka-iceberg-topic”

[root@node1 bin]# ./kafka-topics.sh--zookeeper node3:2181,node4:2181,node5:2181--create--topic kafka-iceberg-topic--partitions 3 --replication-factor 3


二、编写向Kafka生产数据代码
/**
* 向Kafka中写入数据
*/
object WriteDataToKafka
def main(args: Array[String]): Unit =
val props = new Properties()
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String,String](props)
var counter = 0
var keyFlag = 0
while(true)
counter +=1
keyFlag +=1
val content: String = userlogs()
producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", content))
//producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", s"key-$keyFlag", content))
if(0 == counter%100)
counter = 0
Thread.sleep(5000)


producer.close()


def userlogs()=
val userLogBuffer = new StringBuffer("")
val timestamp = new Date().getTime();
var userID = 0L
var pageID = 0L

//随机生成的用户ID
userID = Random.nextInt(2000)

//随机生成的页面ID
pageID =Random.nextInt(2000);

//随机生成Channel
val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")
val channel = channelNames(Random.nextInt(10))

val actionNames = Array[String]("View", "Register")
//随机生成action行为
val action = actionNames(Random.nextInt(2))

val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
userLogBuffer.append(dateToday)
.append("\\t")
.append(timestamp)
.append("\\t")
.append(userID)
.append("\\t")
.append(pageID)
.append("\\t")
.append(channel)
.append("\\t")
.append(action)
System.out.println(userLogBuffer.toString())
userLogBuffer.toString()



三、编写Structured Streaming读取Kafka数据实时写入Iceberg
object StructuredStreamingSinkIceberg
def main(args: Array[String]): Unit =
//1.准备对象
val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")
.getOrCreate()
//spark.sparkContext.setLogLevel("Error")

//2.创建Iceberg 表
spark.sql(
"""
|create table if not exists hadoop_prod.iceberg_db.iceberg_table (
| current_day string,
| user_id string,
| page_id string,
| channel string,
| action string
|) using iceberg
""".stripMargin)

val checkpointPath = "hdfs://mycluster/iceberg_table_checkpoint"
val bootstrapServers = "node1:9092,node2:9092,node3:9092"
//多个topic 逗号分开
val topic = "kafka-iceberg-topic"

//3.读取Kafka读取数据
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("auto.offset.reset", "latest")
.option("group.id", "iceberg-kafka")
.option("subscribe", topic)
.load()

import spark.implicits._
import org.apache.spark.sql.functions._

val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)].toDF("id", "data")

val transDF: DataFrame = resDF.withColumn("current_day", split(col("data"), "\\t")(0))
.withColumn("ts", split(col("data"), "\\t")(1))
.withColumn("user_id", split(col("data"), "\\t")(2))
.withColumn("page_id", split(col("data"), "\\t")(3))
.withColumn("channel", split(col("data"), "\\t")(4))
.withColumn("action", split(col("data"), "\\t")(5))
.select("current_day", "user_id", "page_id", "channel", "action")

//结果打印到控制台,Default trigger (runs micro-batch as soon as it can)
//val query: StreamingQuery = transDF.writeStream
//.outputMode("append")
//.format("console")
//.start()

//4.流式写入Iceberg表
val query = transDF.writeStream
.format("iceberg")
.outputMode("append")
//每分钟触发一次Trigger.ProcessingTime(1, TimeUnit.MINUTES)
//每10s 触发一次 Trigger.ProcessingTime(1, TimeUnit.MINUTES)
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.option("path", "hadoop_prod.iceberg_db.iceberg_table")
.option("fanout-enabled", "true")
.option("checkpointLocation", checkpointPath)
.start()

query.awaitTermination()




注意:以上代码执行时由于使用的Spark版本为3.1.2,其依赖的Hadoop版本为Hadoop3.2版本,所以需要在本地Window中配置Hadoop3.1.2的环境变量以及将对应的hadoop.dll放入window "C:\\Windows\\System32"路径下。
Structuerd Streaming向Iceberg实时写入数据有以下几个注意点:
  • 写Iceberg表写出数据支持两种模式:append和complete,append是将每个微批数据行追加到表中。complete是替换每个微批数据内容。
  • 向Iceberg中写出数据时指定的path可以是HDFS路径,可以是Iceberg表名,如果是表名,要预先创建好Iceberg表。
  • 写出参数fanout-enabled指的是如果Iceberg写出的表是分区表,在向表中写数据之前要求Spark每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为true,可以针对每个Spark分区打开一个文件,直到当前task批次数据写完,这个文件再关闭。
  • 实时向Iceberg表中写数据时,建议trigger设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”(参照1.9.6.9)和删除旧的快照(1.9.6.10)。
四、查看Iceberg中数据结果启动向Kafka生产数据代码,启动向Iceberg中写入数据的Structured Streaming程序,执行以下代码来查看对应的Iceberg结果:
//1.准备对象
val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")
.getOrCreate()

//2.读取Iceberg 表中的数据结果
spark.sql(
"""
|select * from hadoop_prod.iceberg_db.iceberg_table
""".stripMargin).show()

【数据湖(十六)(Structured Streaming实时写入Iceberg)】

    推荐阅读