MapReduce自定义Partitioner、排序、GroupingComparator实现 同一订单中金额最大的商品

一、概述

  • 自定义Bean、Partitioner、排序、GroupingComparator实现 同一订单中金额最大的商品,减少数据流。
  • 自定义Bean:将订单id和商品金额作为bean的属性,并将bean作为key,利用Partitioner、排序、GroupingComparator。
  • 自定义Partitioner:将同一订单号的数据划分到同一分区,因为分区采用的是对reduce个数的取模策略,所以同一个订单号的数据会由同一个reduce来处理。保证了可以获取到最大金额。
  • 自定义排序:在数据益写到磁盘前(快排)、合并小文件(归并排序)、reduce拉取所有数据后(归并排序)。先按订单,在按金额从大到小排序,保证了每个订单的第一条数据就是最大的商品金额。
  • 自定义GroupingComparator:利用reduce端的GroupingComparator来实现将一组bean看成相同的key,当orderId相同时,就认为是相同的key,默认是比较对象的二进制是否相同。
二、代码
import java.io.File; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.cfl.hadoop.demo.model.OrderMaxPriceProductBean; import com.cfl.hadoop.util.FileUtils; /** * 获取每个订单中最大价格的商品 * @author chenfenli * */ public class OrderMaxPriceProductMapReduce { public static class OrderMaxPriceProductMapper extends Mapper { private OrderMaxPriceProductBean k = new OrderMaxPriceProductBean(); @Override protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { String valueStr = value.toString(); if(StringUtils.isEmpty(valueStr)) { return; } String[] ss= valueStr.split(","); k.set(ss[0], Double.valueOf(ss[2])); System.out.println("------------max: " + k.toString()); context.write(k, NullWritable.get()); } } public static class OrderMaxPriceProductReduce extends Reducer { private Text k = new Text(); @Override protected void reduce(OrderMaxPriceProductBean key, Iterable value, Reducer.Context context) throws IOException, InterruptedException { System.out.println("reduce: " + key.toString()); k.set(key.getOrderId() + ":" + key.getAmount()); context.write(k, NullWritable.get()); } } /** * 自定义分区,将相同的订单id分配到同一个分区,因为采取的取模策略,所以会分配到同一个reduce进行处理 * @author chenfenli * */ public static class OrderMaxPriceProductPartitioner extends Partitioner { @Override public int getPartition(OrderMaxPriceProductBean key, NullWritable value, int numPartitions) { // 相同的orderId的订单bean,会发往相同的partition,而且产生的分区数,是会跟用户设置的reducetask数保持一致 return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions; } } /** * 利用reduce端的GroupingComparator来实现将一组bean看成相同的key: * 当orderId相同时,就认为是相同的key,默认是比较对象的二进制是否相同 * @author chenfenli * */ public static class OrderMaxPriceProductGroupingComparator extends WritableComparator { // 固定写法 protected OrderMaxPriceProductGroupingComparator() { super(OrderMaxPriceProductBean.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable a, WritableComparable b) { // 比较两个bean时,指定比较bean中的orderId OrderMaxPriceProductBean abean = (OrderMaxPriceProductBean)a; OrderMaxPriceProductBean bbean = (OrderMaxPriceProductBean)b; return abean.getOrderId().compareTo(bbean.getOrderId()); } } public static void main(String[] args) { try { String inputPath = "/Users/chenfenli/Documents/work_haozhun/Hadoop/src/main/java/com/cfl/hadoop/files/order"; String outputPath = "/Users/chenfenli/Documents/work_haozhun/Hadoop/src/main/java/com/cfl/hadoop/files/temp"; FileUtils.deleteFile(new File(outputPath)); Configuration conf = new Configuration(); Job job= Job.getInstance(conf); job.setJarByClass(OrderMaxPriceProductMapReduce.class); job.setMapperClass(OrderMaxPriceProductMapper.class); job.setMapOutputKeyClass(OrderMaxPriceProductBean.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(OrderMaxPriceProductReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(2); // 自定义Partitioner job.setPartitionerClass(OrderMaxPriceProductPartitioner.class); // 自定义GroupingComparator job.setGroupingComparatorClass(OrderMaxPriceProductGroupingComparator.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); int flag = job.waitForCompletion(true) ? 0:1; System.out.println(flag); } catch (Exception e) { e.printStackTrace(); } } }

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderMaxPriceProductBean implements WritableComparable { private String orderId; private Double amount; @Override public void write(DataOutput out) throws IOException { out.writeUTF(orderId); out.writeDouble(amount); } @Override public void readFields(DataInput in) throws IOException { orderId = in.readUTF(); amount = in.readDouble(); } @Override public int compareTo(OrderMaxPriceProductBean o) { // 将相同的订单id按金额从大到小排序 int flag = this.orderId.compareTo(o.getOrderId()); // 订单排序,默认从小到大 if (flag == 0) { flag = -this.amount.compareTo(o.getAmount()); // 金额排序,默认从小到大,添加负号改成从大到小 } return flag; } public void set(String orderId, Double amount) { this.orderId = orderId; this.amount = amount; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public Double getAmount() { return amount; } public void setAmount(Double amount) { this.amount = amount; } @Override public String toString() { return "[orderId=" + orderId + ", amount=" + amount + "]"; } }


三、结果
  • 原始文件:
order_001,product_001,222.00 order_001,product_005,25.98 order_002,product_005,25.98 order_002,product_004,520.08 order_002,product_003,114.4 order_003,product_003,114.4 order_003,product_003,114.4

  • 结果文件(part-r-00000):
order_001:222.0 order_003:114.4

  • 结果文件(part-r-00001):
order_002:520.08

  • 可以看到:NumReduceTasks为2,根据orderId的hashcode取模进行分区,order_001、order_03在同一个输出文件,order_002在一个输出文件里面

    推荐阅读