kafka整合storm

(以下测试内容都是基于jdk1.8.0_66,操作系统为win10,仅供学习和测试,部分内容引用自官网资源,部分引用自《Learning Storm》)
源码已上传github:https://github.com/kyo-qin/tiger/tree/master/tiger-storm-common
1 启动zookeeper ??配置zk的端口为2181,通过命令行启动:

bin\zkServer.cmd

2 启动kafka ??配置kafka中的zk端口为2181,启动并创建主题,主题名为test:
bin\windows\kafka-server-start.bat config\server.properties bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

3 编写storm模块 ??笔者使用的storm版本为apache-storm-1.2.2
3.1 通过maven加载依赖
org.apache.storm storm-core 1.2.2 org.apache.storm storm-kafka-client 1.2.2 org.apache.kafka kafka-clients 2.0.0

注:老版本使用的storm-kafka依赖已经被废弃,建议在以后使用storm-kafka-client依赖进行开发,老版本的storm-kafka依赖为:

3.2 编写bolt处理类 ??我们借用《learning storm》书上的例子,写一组将输入的单词拼成一句话的bolt类,每行输入一个单词,当输入符号“.”时,视为一句话结束。例如:
I am a student .

应解析为:
I am a student.

??核心代码为:
private List words = new ArrayList(); @Override public void execute(Tuple input, BasicOutputCollector collector) { // Get the word from the tuple String word = input.getString(0); if (StringUtils.isBlank(word)) { // ignore blank lines return; } logger.info("Received Word:" + word); System.out.println("Received Word:" + word); // add word to current list of words words.add(word); if (word.endsWith(".")) { // word ends with '.' which means this is the end // the SentenceBolt publishes a sentence tuple collector.emit(new Values(StringUtils.join(words, ' '))); // and reset the words list. words.clear(); } }@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); }

??再编写一个bolt用于打印输出:
@Override public void execute(Tuple input, BasicOutputCollector collector) { // get the sentence from the tuple and print it String sentence = input.getString(0); logger.info("Received Sentence: " + sentence); System.out.println("Received Sentence: " + sentence); }

3.2 配置spout ??新版本的KafkaSpout通过KafkaSpoutConfig类进行配置,KafkaSpoutConfig定义了kafka相关的环境、主题、重试策略、消费的初始偏移量等等参数。示例定义如下:
protected KafkaSpoutConfig newKafkaSpoutConfig() { return KafkaSpoutConfig.builder("localhost:9092", "test").setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200).setRecordTranslator(JUST_VALUE_FUNC, new Fields("str")) .setRetry(newRetryService()).setOffsetCommitPeriodMs(10000).setFirstPollOffsetStrategy(LATEST) .setMaxUncommittedOffsets(250).build(); }

其中JUST_VALUE_FUNC为kafka消息翻译函数,这边简单的将其输出:
private static Func> JUST_VALUE_FUNC = new Func>() { @Override public List apply(ConsumerRecord record) { return new Values(record.value()); } };
newRetryService定义了重试策略:
protected KafkaSpoutRetryService newRetryService() { return new KafkaSpoutRetryExponentialBackoff(new TimeInterval(500L, TimeUnit.MICROSECONDS), TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); }

3.3 配置topology ??将上述bolt和spout以及配置类组合,配置topology:
public StormTopology buildTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("KafkaSpout", new KafkaSpout(newKafkaSpoutConfig()), 1); builder.setBolt("SentenceBolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout"); builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt"); return builder.createTopology(); }

3.4 本地测试运行
public static void main(String[] args) { KafkaTopologyBasic kb = new KafkaTopologyBasic(); StormTopology topology = kb.buildTopology(); LocalCluster cluster = new LocalCluster(); Config conf = new Config(); cluster.submitTopology("KafkaToplogy", conf, topology); try { // Wait for some time before exiting System.out.println("Waiting to consume from kafka"); Thread.sleep(300000); } catch (Exception exception) { System.out.println("Thread interrupted exception : " + exception); } // kill the KafkaTopology cluster.killTopology("KafkaToplogy"); // shut down the storm test cluster cluster.shutdown(); }

通过kafka命令行工具生产消息:
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

输入:
>I >am >a >student >.

【kafka整合storm】输出:
2018-08-15 17:11:30 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:I Received Word:I 2018-08-15 17:11:30 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:am Received Word:am 2018-08-15 17:11:32 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:a Received Word:a 2018-08-15 17:11:35 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:student Received Word:student 2018-08-15 17:11:35 [Thread-20-SentenceBolt-executor[3 3]] INFO[org.tiger.storm.common.kafka.SentenceBolt] - Received Word:. Received Word:. 2018-08-15 17:11:35 [Thread-18-PrinterBolt-executor[2 2]] INFO[org.tiger.storm.common.kafka.PrinterBolt] - Received Sentence: I am a student . Received Sentence: I am a student .

    推荐阅读