Spark与Flink读取动态配置信息

更新正在运行的流式应用程序的配置是常见要求。
1spark使用非静态全局变量,即可解决实时流式配置问题。
实例demo:
```
final FlowConstant flowConstant =new FlowConstant();
...
JavaPairDStream flowInfoPairDStream = diretcStream.mapToPair(new PairFunction, String, FlowInfoBean>() {
@Override
public Tuple2call(Tuple2 tuple2)throws Exception {
...
StringcInOutFlag = IPUtils.isInHomeBitNet(clientIP, flowConstant.HOMENET_BIT);
StringdefaultCInOutFlag = IPUtils.isInHomeBitNet(clientIP, flowConstant.DEFAULT_HOMENET_BIT);
StringsInOutFlag = IPUtils.isInHomeBitNet(serverIP, flowConstant.HOMENET_BIT);
StringdefaultSInOutFlag = IPUtils.isInHomeBitNet(serverIP, flowConstant.DEFAULT_HOMENET_BIT);
...
}
});
```
2在Flink的DataStream API中,这可以通过使用所谓的CoFlatMapFunction处理两个输入流来完成。其中一个流可以是数据流,另一个是控制流。


Spark与Flink读取动态配置信息
文章图片
实例demo:
```
val data: DataStream[String] = ???
val control: DataStream[Int] = ???
val filtered: DataStream[String] = data
// broadcast all control messages to the following CoFlatMap subtasks
.connect(control.broadcast)
// process data and control messages
.flatMap(new DynLengthFilter)
class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {
var length = 0
// filter strings by length
override def flatMap1(value: String, out: Collector[String]): Unit = {
if (value.length < length) {
out.collect(value)
}
}
// receive new filter length
override def flatMap2(value: Int, out: Collector[String]): Unit = {
length = value
}
override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length
override def restoreState(state: Int): Unit = {
length = state
}
}
【Spark与Flink读取动态配置信息】```

    推荐阅读