0009 - 基于MapReduce的应用案例

曾无好事来相访,赖尔高文一起予。这篇文章主要讲述0009 - 基于MapReduce的应用案例相关的知识,希望能为你提供帮助。
大数据梦工厂(0009 - 基于MapReduce的应用案例)
1 - MapReduce词频统计案例 1.1 - 样本数据这是一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数。

[root@hadoop-01 ~]# cat input.txt Spark HBase Azkaban Flume Hive Flink Storm Hadoop HBase Spark Flink Presto Kudu Azkaban HBase Storm Presto Kafka HBase Hadoop Hive Flink Kudu HBase Flink Hive Storm Hive Flink Hadoop Flume HBase Hive Kudu Zookeeper Hadoop Spark HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Kudu Hive Flink Hadoop Kafka HBase Hive Presto Zookeeper Presto Kudu Hadoop Kafka Zookeeper Hadoop Flume Azkaban Kudu Presto Kafka Zookeeper Kafka Flume[root@hadoop-01 ~]# hdfs dfs -mkdir -p /tmp/wordcount/input [root@hadoop-01 ~]# hdfs dfs -put -f input.txt /tmp/wordcount/input/input.txt

0009 - 基于MapReduce的应用案例

文章图片

项目完整源码下载地址: WordCountDemo
1.2 - 项目依赖进行 MapReduce 编程,需要导入 hadoop-client 的依赖版本:
< dependency> < groupId> org.apache.hadoop< /groupId> < artifactId> hadoop-client< /artifactId> < version> ${hadoop.version}< /version> < /dependency>

1.3 - WordCountMapper将 input.txt 文件的每行数据按照指定分隔符进行拆分。这里需要注意在 MapReduce 中必须使用 Hadoop 定义的类型,因为 Hadoop 预定义的类型都是可序列化,可比较的,所有类型均实现了 WritableComparable 接口。
在 Map 中将每行数据按照分隔符进行拆分:
/** * Object: Mapping 输入文件的内容 * Text: Mapping 输入的每一行的数据 * Text: Mapping 输出 key 的类型 * IntWritable : Mapping 输出 value 的类型 */ public class WordCountMapper extends Mapper< Object, Text, Text, IntWritable> {@Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 获取到一行文件的内容 String line = value.toString(); // 按照空格切分这一行的内容为一个单词数组 String[] words = StringUtils.split(line, " "); // 遍历输出 < key, value> 键值对 for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } }

WordCountMapper 对应下图的 Mapping 操作:
0009 - 基于MapReduce的应用案例

文章图片

1.4 - WordCountReducer在 Reduce 中进行单词出现次数的统计:
/** * Text: Mapping 输入的 key 的类型 * IntWritable: Mapping 输入的 value 的类型 * Text: Reducing 输出的 key 的类型 * IntWritable: Reducing 输出的 value 的类型 */public class WordCountReducer extends Reducer< Text, IntWritable, Text, IntWritable> {@Override protected void reduce(Text key, Iterable< IntWritable> values, Context context) throws IOException, InterruptedException { // 统计数字 int count = 0; // 累加求和 for (IntWritable value : values) { count += value.get(); } // 输出 < 单词:count> 键值对 context.write(key, new IntWritable(count)); } }

WordCountReducer 对应下图的 Shuffling 和 Reducing 操作(Shuffling 的输出是 Reducing 的输入):
0009 - 基于MapReduce的应用案例

文章图片

1.5 - WordCountApp打包 jar 到集群使用 hadoop 命令提交作业示例:
public class WordCountApp {// 1、使用硬编码,显示参数,实际开发中可以通过外部传参 private static final String HDFS_URL = "hdfs://172.16.1.2:8020"; private static final String HADOOP_USER_NAME = "hdfs"; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {// 文件的输入路径和输出路径由外部传参指定 if (args.length < 2) { System.err.println("Usage: wordcount < in> [< in> ...] < out> "); System.exit(2); }// 指定 Hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常 System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); Configuration configuration = new Configuration(); // 指定 HDFS 的地址 configuration.set("fs.defaultFS", HDFS_URL); // 2、获取 job 对象 Job job = Job.getInstance(configuration); // 3、设置 jar 存储位置 job.setJarByClass(WordCountApp.class); // 4、关联 Mapper 和 Reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 5、设置 Mapper 阶段输出数据的 key 和 value 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 6、设置 Reducer 阶段输出数据的 key 和 value 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 7、如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常 FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME); Path outputPath = new Path(args[1]); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); }// 8、设置输入文件和输出文件的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); // 9、提交 job 到群集并等待它完成,参数设置为 true 代表打印显示对应的进度 boolean result = job.waitForCompletion(true); // 10、退出程序 System.exit(result ? 0 : 1); } }

提示: 如果不设置 Mapper 操作的输出类型,则默认和 Reducer 操作输出的类型相同。
1.6 - 提交到 MapReduce 集群运行可以在本机配置 Hadoop 开发环境,直接在 IDE 中启动进行测试。本案例打包提交到服务器中运行。由于本项目没有使用除 Hadoop 外的第三方依赖,因此直接打包即可:
1、打包 jar 文件
0009 - 基于MapReduce的应用案例

文章图片

或者使用 mvn 命令打包:
[root@hadoop-01 ~]# mvn package -DskipTests

2、提交作业
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountApp /tmp/wordcount/input /tmp/wordcount/output

3、作业完成后,在 YARN 的 Web 界面查看 Applications 运行情况
0009 - 基于MapReduce的应用案例

文章图片

4、查看 HDFS 上统计结果
# 查看生成目录 [root@hadoop-01 ~]# hdfs dfs -ls /tmp/wordcount/output# 查看统计结果 [root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00000

0009 - 基于MapReduce的应用案例

文章图片

提示: 每个作业的 Reduce 任务的默认个数为 1(通过 mapreduce.job.reduces 设置)。
【0009 - 基于MapReduce的应用案例】每个 Reduce 任务都会产生一个输出文件,如果设置 Reduce 个数为 3,那么 Map 输出数据会被分成 3 份,Reduce 输出的 part-r-00000 文件也会有 3 个。
2 - 词频统计案例之Combiner 2.1 - 代码实现如果需要使用 Combiner 功能,只要在打包 jar 文件时,添加下面一行代码即可:
// 5、设置Combiner job.setCombinerClass(WordCountReducer.class);

2.2 - 执行结果
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountCombinerApp /tmp/wordcount/input /tmp/wordcount/output

加入 Combiner 功能后,统计结果不会发生变化,但是可以从打印的日志看出 Combiner 的效果:
没有加入 Combiner 功能的打印日志如下:
0009 - 基于MapReduce的应用案例

文章图片

加入 Combiner 功能后的打印日志如下:
0009 - 基于MapReduce的应用案例

文章图片

本案例只有一个输入文件并且小于 128M,所以只有一个 Map 进行处理。可以看到经过 Combiner 功能后,Combine input records 值由 70 降低为 12(样本中单词种类就只有 12 种),在本案例中 Combiner 能降低需要传输的数据量。
3 - 词频统计案例之Partitioner 3.1 - 默认的 Partitioner假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品种类进行拆分。要实现这个功能,就需要用到自定义 Partitioner
MapReduce 默认的分类规则:在构建 job 的时候,如果不指定,则默认使用的是 HashPartitioner:对 key 值进行哈希散列,并对 numReduceTasks 取余。其实现如下:
public class HashPartitioner< K, V> extends Partitioner< K, V> {public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }

3.2 - 自定义 Partitioner使用 Partitioner 自定义分类规则,按照单词进行分类:
public class CustomPartitioner extends Partitioner< Text, IntWritable> {@Override public int getPartition(Text key, IntWritable value, int numPartitions) { if (key.toString().equals("Azkaban")) { return 0; }else if (key.toString().equals("Flink")) { return 1; }else if (key.toString().equals("Flume")) { return 2; }else if (key.toString().equals("HBase")) { return 3; }else if (key.toString().equals("Hadoop")) { return 4; }else if (key.toString().equals("Hive")) { return 5; }else if (key.toString().equals("Kafka")) { return 6; }else if (key.toString().equals("Kudu")) { return 7; }else if (key.toString().equals("Presto")) { return 8; }else if (key.toString().equals("Spark")) { return 9; }else if (key.toString().equals("Storm")) { return 10; }else { return 11; } } }

在构建 job 时,指定使用自定义的分类规则,并设置 reduce 的个数:
// 6、设置自定义 Partitioner 规则 job.setPartitionerClass(CustomPartitioner.class); // 7、设置 Reduce 个数 job.setNumReduceTasks(12);

3.3 - 执行结果
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountCombinerPartitionerApp /tmp/wordcount/input /tmp/wordcount/output

执行结果如下,分别生成 12 个文件,每个文件中为对应单词的统计结果:
[root@hadoop-01 ~]# hdfs dfs -ls /tmp/wordcount/output/ Found 13 items -rw-r--r--3 hdfs supergroup0 2021-09-03 15:30 /tmp/wordcount/output/_SUCCESS -rw-r--r--3 hdfs supergroup10 2021-09-03 15:30 /tmp/wordcount/output/part-r-00000 -rw-r--r--3 hdfs supergroup8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00001 -rw-r--r--3 hdfs supergroup8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00002 -rw-r--r--3 hdfs supergroup9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00003 -rw-r--r--3 hdfs supergroup9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00004 -rw-r--r--3 hdfs supergroup7 2021-09-03 15:30 /tmp/wordcount/output/part-r-00005 -rw-r--r--3 hdfs supergroup8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00006 -rw-r--r--3 hdfs supergroup7 2021-09-03 15:30 /tmp/wordcount/output/part-r-00007 -rw-r--r--3 hdfs supergroup9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00008 -rw-r--r--3 hdfs supergroup8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00009 -rw-r--r--3 hdfs supergroup8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00010 -rw-r--r--3 hdfs supergroup12 2021-09-03 15:30 /tmp/wordcount/output/part-r-00011 [root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00000 Azkaban 3 [root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00001 Flink8 [root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00002 Flume4

0009 - 基于MapReduce的应用案例

文章图片

::: hljs-center
扫一扫,我们的故事就开始了。
:::
0009 - 基于MapReduce的应用案例

文章图片


    推荐阅读