MapReduce自定义Partitioner、排序、GroupingComparator实现 同一订单中金额最大的商品
一、概述
- 自定义Bean、Partitioner、排序、GroupingComparator实现 同一订单中金额最大的商品,减少数据流。
- 自定义Bean:将订单id和商品金额作为bean的属性,并将bean作为key,利用Partitioner、排序、GroupingComparator。
- 自定义Partitioner:将同一订单号的数据划分到同一分区,因为分区采用的是对reduce个数的取模策略,所以同一个订单号的数据会由同一个reduce来处理。保证了可以获取到最大金额。
- 自定义排序:在数据益写到磁盘前(快排)、合并小文件(归并排序)、reduce拉取所有数据后(归并排序)。先按订单,在按金额从大到小排序,保证了每个订单的第一条数据就是最大的商品金额。
- 自定义GroupingComparator:利用reduce端的GroupingComparator来实现将一组bean看成相同的key,当orderId相同时,就认为是相同的key,默认是比较对象的二进制是否相同。
import java.io.File;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.cfl.hadoop.demo.model.OrderMaxPriceProductBean;
import com.cfl.hadoop.util.FileUtils;
/**
* 获取每个订单中最大价格的商品
* @author chenfenli
*
*/
public class OrderMaxPriceProductMapReduce { public static class OrderMaxPriceProductMapper extends Mapper
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class OrderMaxPriceProductBean implements WritableComparable { private String orderId;
private Double amount;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(amount);
} @Override
public void readFields(DataInput in) throws IOException {
orderId = in.readUTF();
amount = in.readDouble();
} @Override
public int compareTo(OrderMaxPriceProductBean o) {
// 将相同的订单id按金额从大到小排序
int flag = this.orderId.compareTo(o.getOrderId());
// 订单排序,默认从小到大
if (flag == 0) {
flag = -this.amount.compareTo(o.getAmount());
// 金额排序,默认从小到大,添加负号改成从大到小
}
return flag;
}
public void set(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
} public String getOrderId() {
return orderId;
} public void setOrderId(String orderId) {
this.orderId = orderId;
} public Double getAmount() {
return amount;
} public void setAmount(Double amount) {
this.amount = amount;
} @Override
public String toString() {
return "[orderId=" + orderId + ", amount=" + amount + "]";
}
}
三、结果
- 原始文件:
order_001,product_001,222.00
order_001,product_005,25.98
order_002,product_005,25.98
order_002,product_004,520.08
order_002,product_003,114.4
order_003,product_003,114.4
order_003,product_003,114.4
- 结果文件(part-r-00000):
order_001:222.0
order_003:114.4
- 结果文件(part-r-00001):
order_002:520.08
- 可以看到:NumReduceTasks为2,根据orderId的hashcode取模进行分区,order_001、order_03在同一个输出文件,order_002在一个输出文件里面
推荐阅读
- SpringBoot调用公共模块的自定义注解失效的解决
- python自定义封装带颜色的logging模块
- 列出所有自定义的function和view
- Spring|Spring Boot 自动配置的原理、核心注解以及利用自动配置实现了自定义 Starter 组件
- Hadoop|Hadoop MapReduce Job提交后的交互日志
- 自定义MyAdapter
- Android自定义view实现圆环进度条效果
- Flutter自定义view|Flutter自定义view —— 闯关进度条
- js保留自定义小数点
- django|django 自定义.save()方法