kafka整合storm
(以下测试内容都是基于jdk1.8.0_66,操作系统为win10,仅供学习和测试,部分内容引用自官网资源,部分引用自《Learning Storm》)
源码已上传github:https://github.com/kyo-qin/tiger/tree/master/tiger-storm-common
1 启动zookeeper
??配置zk的端口为2181,通过命令行启动:
bin\zkServer.cmd
2 启动kafka ??配置kafka中的zk端口为2181,启动并创建主题,主题名为test:
bin\windows\kafka-server-start.bat config\server.properties
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
3 编写storm模块 ??笔者使用的storm版本为apache-storm-1.2.2
3.1 通过maven加载依赖
org.apache.storm
storm-core
1.2.2
org.apache.storm
storm-kafka-client
1.2.2
org.apache.kafka
kafka-clients
2.0.0
注:老版本使用的storm-kafka依赖已经被废弃,建议在以后使用storm-kafka-client依赖进行开发,老版本的storm-kafka依赖为:
3.2 编写bolt处理类 ??我们借用《learning storm》书上的例子,写一组将输入的单词拼成一句话的bolt类,每行输入一个单词,当输入符号“.”时,视为一句话结束。例如:
I
am
a
student
.
应解析为:
I am a student.
??核心代码为:
private List words = new ArrayList();
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// Get the word from the tuple
String word = input.getString(0);
if (StringUtils.isBlank(word)) {
// ignore blank lines
return;
}
logger.info("Received Word:" + word);
System.out.println("Received Word:" + word);
// add word to current list of words
words.add(word);
if (word.endsWith(".")) {
// word ends with '.' which means this is the end
// the SentenceBolt publishes a sentence tuple
collector.emit(new Values(StringUtils.join(words, ' ')));
// and reset the words list.
words.clear();
}
}@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
??再编写一个bolt用于打印输出:
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// get the sentence from the tuple and print it
String sentence = input.getString(0);
logger.info("Received Sentence: " + sentence);
System.out.println("Received Sentence: " + sentence);
}
3.2 配置spout ??新版本的KafkaSpout通过KafkaSpoutConfig类进行配置,KafkaSpoutConfig定义了kafka相关的环境、主题、重试策略、消费的初始偏移量等等参数。示例定义如下:
protected KafkaSpoutConfig newKafkaSpoutConfig() {
return KafkaSpoutConfig.builder("localhost:9092", "test").setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200).setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
.setRetry(newRetryService()).setOffsetCommitPeriodMs(10000).setFirstPollOffsetStrategy(LATEST)
.setMaxUncommittedOffsets(250).build();
}
其中JUST_VALUE_FUNC为kafka消息翻译函数,这边简单的将其输出:
private static Func> JUST_VALUE_FUNC = new Func>() {
@Override
public List
newRetryService定义了重试策略:
protected KafkaSpoutRetryService newRetryService() {
return new KafkaSpoutRetryExponentialBackoff(new TimeInterval(500L, TimeUnit.MICROSECONDS), TimeInterval.milliSeconds(2),
Integer.MAX_VALUE, TimeInterval.seconds(10));
}
3.3 配置topology ??将上述bolt和spout以及配置类组合,配置topology:
public StormTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("KafkaSpout", new KafkaSpout(newKafkaSpoutConfig()), 1);
builder.setBolt("SentenceBolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout");
builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt");
return builder.createTopology();
}
3.4 本地测试运行
public static void main(String[] args) {
KafkaTopologyBasic kb = new KafkaTopologyBasic();
StormTopology topology = kb.buildTopology();
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("KafkaToplogy", conf, topology);
try {
// Wait for some time before exiting
System.out.println("Waiting to consume from kafka");
Thread.sleep(300000);
} catch (Exception exception) {
System.out.println("Thread interrupted exception : " + exception);
}
// kill the KafkaTopology
cluster.killTopology("KafkaToplogy");
// shut down the storm test cluster
cluster.shutdown();
}
通过kafka命令行工具生产消息:
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
输入:
>I
>am
>a
>student
>.
【kafka整合storm】输出:
2018-08-15 17:11:30 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:I
Received Word:I
2018-08-15 17:11:30 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:am
Received Word:am
2018-08-15 17:11:32 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:a
Received Word:a
2018-08-15 17:11:35 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:student
Received Word:student
2018-08-15 17:11:35 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:.
Received Word:.
2018-08-15 17:11:35 [Thread-18-PrinterBolt-executor[2 2]] INFO[org.tiger.storm.common.kafka.PrinterBolt] - Received Sentence: I am a student .
Received Sentence: I am a student .
推荐阅读
- 深入浅出谈一下有关分布式消息技术(Kafka)
- Spring|Spring Boot 整合 Activiti6.0.0
- 15.Kafka
- springboot整合数据库连接池-->druid
- SpringBoot整合MongoDB完整实例代码
- Springboot整合kafka的示例代码
- 15天蜕变之旅Day6~5月29日作业父母能量链接内在整合
- Spring|Spring Boot整合Mybatis进行CRUD
- Springboot整合RabbitMQ(三)——Topic主题交换机
- 搭建大数据三节点(Hadoop、Hbase、Zookeeper、Kafka、Hive)环境(含配置文件参考)