Flink|FLINK Producer数据写入到kafka 方法一
package kafkaproducer;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.awt.*;
import java.text.DecimalFormat;
import java.util.Properties;
import java.util.Random;
public class Producer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件的方式写入kafka
//DataStreamSource lines = env.readTextFile("file:///d:/test.txt");
DataStreamSource lines = env.addSource(new SourceFunction() {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
int count = 1;
DecimalFormat userDecimal = new DecimalFormat("000");
DecimalFormat typeDecimal = new DecimalFormat("0");
String[] typeList = {"pv", "pu", "cart"};
String[] cityList = {"北京市", "天津市", "上海市", "深圳市", "重庆市"};
@Override
public void run(SourceContext out) throws Exception {
//无限循环
//while (isRunning){
//这里修改需要的调数,方便进行数据统计
while (count <= 100) {
int r_user = (int) (Math.round(Math.random() * 9 + 1));
int r_activity = (int) (Math.round(Math.random() * 4 + 1));
int p_type = (int) (Math.random() * typeList.length);
int t_city = (int) (Math.random() * cityList.length);
String user = "U" + userDecimal.format(r_user);
String activity = "A" + typeDecimal.format(r_activity);
long timeStramp = System.currentTimeMillis();
int pageview = (int) (Math.round(Math.random() * 4 + 1));
String typeP = typeList[p_type];
String city = cityList[t_city];
out.collect(user + " " + activity + " " + timeStramp + " " + pageview + " " + typeP + " " + city);
count++;
}
}@Override
public void cancel() {
isRunning = false;
}
});
String groupID = "test";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
lines.addSink(new FlinkKafkaProducer<>(
groupID,
new SimpleStringSchema(),
prop
));
env.execute("Producer");
}
}
【Flink|FLINK Producer数据写入到kafka 方法一】闲着没事自己弄些数据玩,可以根据需求修改数据条数
推荐阅读
- Flink总结-运行命令参数分析
- 【Flink】Flink手动触发savepoint失败问题
- 读Flink源码谈设计(Exactly Once)
- 行业先锋畅聊|行业先锋畅聊 Flink 未来 —— FFA 2021 圆桌会议(北京)
- 首批+唯一!阿里云实时计算 Flink 版通过信通院大数据产品稳定性测试
- 读Flink源码谈设计(图的抽象与分层)
- Flink|Flink CDC 系列 - Flink MongoDB CDC 在 XTransfer 的生产实践
- 从|从 Flink Forward Asia 2021,看 Flink 未来开启新篇章
- 技术博文|Flink|技术博文|Flink 和 Pulsar 的批流融合
- Flink使用Pod Template将状态快照(Checkpoint、Savepoint)存储在NFS