flink之WordCount

1.代码如下
public class WordCount {
public static void main(String[] args)throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
DataStream text;
if (params.has("input")){
text = env.readTextFile(params.get("input"));
}else {
text = env.fromElements(WordCountData.WORDS);
}
DataStream> counts =
text.flatMap(new FlatMapFunction>() {
public void flatMap(String s, Collector> collector)throws Exception {
String[] toknes = s.toLowerCase().split(" ");
for (String token : toknes){
if (token.length() >0) {
collector.collect(new Tuple2(token,1));
}
}
}
}).keyBy(0).sum(1);
if (params.has("output")){
counts.writeAsText(params.get("output"));
}else {
counts.print();
}
env.execute("StreamWodCount");
}
}
2.打包执行
【flink之WordCount】flink run --class com.hui.learn.WordCount target/word-count-with-input-1.0-SNAPSHOT.jar --input /Users/didi/IdeaProjects/flink/wordcountwithinput/input --output /Users/didi/IdeaProjects/flink/wordcountwithinput/output2

    推荐阅读