Hadoop 入门笔记 十八: MapReduce基础编程

一.MapReduce mapReduce Patition Combiner 1. Partition分区
1.默认情况 在默认情况下,不管map阶段有多少个并发执行task,到reduce阶段,所有的结果都将有一个reduce来处理,并且最终结果输出到一个文件中。
默认情况下MapReduce 执行流程:
Hadoop 入门笔记 十八: MapReduce基础编程
文章图片

2. 修改ReduceTask个数 在MapReduce程序的驱动类中,通过job提供的方法,可以修改reducetask的个数。
Hadoop 入门笔记 十八: MapReduce基础编程
文章图片

使用api修改reducetask个数之后,输出结果文件的个数和reducetask个数对应。也就是说有几个ReduceTask,最终输出文件就是几个,比如设置为6个,此时的输出结果如下所示:
Hadoop 入门笔记 十八: MapReduce基础编程
文章图片

此时,MapReduce的执行流程如下所示:
Hadoop 入门笔记 十八: MapReduce基础编程
文章图片

3. Partition数据分区概念

  • 默认情况下,MapReduce是只有一个reducetask来进行数据的处理。这就使得不管输入的数据量多大,最终的结果都是输出到一个文件中。
  • 当改变reducetask个数的时候,作为maptask就会涉及到数据分区的问题:MapTask输出的结果如何分配给各个ReduceTask来处理。
Hadoop 入门笔记 十八: MapReduce基础编程
文章图片

4. partition默认规则 MapReduce默认分区规则是HashPartitioner。跟map输出的数据key有关。
Hadoop 入门笔记 十八: MapReduce基础编程
文章图片

5. partition注意事项
  • reducetask个数的改变导致了数据分区的产生,而不是有数据分区导致了reducetask个数改变。
  • 数据分区的核心是分区规则。即如何分配数据给各个reducetask。
  • 默认的规则可以保证只要map阶段输出的key一样,数据就一定可以分区到同一个reducetask,但是不能保证数据平均分区。
  • reducetask个数的改变还会导致输出结果文件不再是一个整体,而是输出到多个文件中。
2. Combiner 规约
1.数据规约的含义 数据归约是指在尽可能保持数据原貌的前提下,最大限度地精简数据量。
2. MapReduce 弊端
  • MapReduce是一种具有两个执行阶段的分布式计算程序,Map阶段和Reduce阶段之间会涉及到跨网络数据传递。
  • 每一个MapTask都可能会产生大量的本地输出,这就导致跨网络传输数据量变大,网络IO性能低。比如WordCount单词统计案例,假如文件中有1000个单词,其中999个为hello,这将产生999个的键值对在网络中传递,性能及其低下。
3. Combiner 组件概念
  • Combiner中文叫做数据规约,是MapReduce的一种优化手段。
  • Combiner的作用就是对map端的输出先做一次局部合并,以减少在map和reduce节点之间的数据传输量。
    Hadoop 入门笔记 十八: MapReduce基础编程
    文章图片
4. Combiner 组件使用
  • Combiner是MapReduce程序中除了Mapper和Reducer之外的一种组件,默认情况下不启用。
  • Combiner本质就是Reducer,combiner和reducer的区别在于运行的位置:
    combiner是在每一个maptask所在的节点本地运行,是局部聚合;reducer是对所有maptask的输出结果计算,是全局聚合。
  • 具体实现步骤:
    自定义一个CustomCombiner类,继承Reducer,重写reduce方法
    job.setCombinerClass(CustomCombiner.class)
5. Combiner 使用注意事项
  • Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。
  • 下述场景禁止使用Combiner,因为这样不仅优化了网络传输数据量,还改变了最终的执行结果
  • 业务和数据个数相关的。
  • 业务和整体排序相关的。
  • Combiner组件不是禁用,而是慎用。用的好提高程序性能,用不好,改变程序结果且不易发现。
二. MapReduce 编程指南 1. MapReduce 编程技巧概述
  • MapReduce执行流程了然于心,能够知道数据在MapReduce中流转过程。
  • 业务需求解读准确,即需要明白做什么。
  • 牢牢把握住key的选择,因为MapReduce很多行为跟key相关,比如:排序、分区、分组。
  • 学会自定义组件修改默认行为,当默认的行为不满足业务需求,可以尝试自定义规则。
  • 通过画图梳理业务执行流程,确定每个阶段的数据类型。
2. MapReduce执行流程梳理
Hadoop 入门笔记 十八: MapReduce基础编程
文章图片

1. Map阶段执行过程
  • 第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,Split size = Block size。每一个切片由一个MapTask处理。(getSplits)
  • 第二阶段是对切片中的数据按照一定的规则解析成对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。(TextInputFormat)
  • 第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个,调用一次map方法。每次调用map方法会输出零个或多个键值对。
  • 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。
  • 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到文件中。
  • 第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。
2. Map阶段执行过程
  • 第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。
  • 第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
  • 第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。
1. key 的重要性体现
  • 在MapReduce编程中,核心是牢牢把握住每个阶段的输入输出key是什么。因为mr中很多默认行为都跟key相关。
    • 排序:key的字典序a-z 正序
    • 分区:key.hashcode % reducetask 个数
    • 分组:key相同的分为一组
  • 最重要的是,如果觉得默认的行为不满足业务需求,MapReduce还支持自定义排序、分区、分组的规则,这将使得编程更加灵活和方便。
三.美国新冠疫情Covid-19 病例统计 现有美国2021-1-28号,各个县county的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:
2021-01-28,Juneau City and Borough,Alaska,02110,1108,3 2021-01-28,Kenai Peninsula Borough,Alaska,02122,3866,18 2021-01-28,Ketchikan Gateway Borough,Alaska,02130,272,1 2021-01-28,Kodiak Island Borough,Alaska,02150,1021,5 2021-01-28,Kusilvak Census Area,Alaska,02158,1099,3 2021-01-28,Lake and Peninsula Borough,Alaska,02164,5,0 2021-01-28,Matanuska-Susitna Borough,Alaska,02170,7406,27 2021-01-28,Nome Census Area,Alaska,02180,307,0 2021-01-28,North Slope Borough,Alaska,02185,973,3 2021-01-28,Northwest Arctic Borough,Alaska,02188,567,1 2021-01-28,Petersburg Borough,Alaska,02195,43,0

字段含义如下:date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)。
1. 案例一: 各州累计病例数量统计
统计美国2021-01-28,每个州state累计确诊案例数、累计死亡案例数。
1.需求分析
  1. 自定义对象CovidCountBean,用于封装每个县的确诊病例数和死亡病例数。
  2. 注意自定义对象需要实现Hadoop的序列化机制。
  3. 以州作为map阶段输出的key,以CovidCountBean作为value,这样属于同一个州的数据就会变成一组进行reduce处理,进行累加即可得出每个州累计确诊病例。
2. 自定义CovidCountBean
package com.uuicon.sentiment_upload.covid; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CovidCountBean implements WritableComparable { private Long cases; private Long deaths; public CovidCountBean() { }public CovidCountBean(Long cases, Long deaths) { this.cases = cases; this.deaths = deaths; }public void set(Long cases, Long deaths) { this.cases = cases; this.deaths = deaths; }@Override public String toString() { return cases + "\t" + deaths; }public Long getCases() { return cases; }public void setCases(Long cases) { this.cases = cases; }public Long getDeaths() { return deaths; }public void setDeaths(Long deaths) { this.deaths = deaths; }/** * 序列化 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(cases); out.writeLong(deaths); }/** * 反序列化,todo 反序列化的顺序需要和序列化一致 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { this.cases = in.readLong(); this.deaths = in.readLong(); }@Override public int compareTo(CovidCountBean o) { return this.cases - o.getCases() > 0 ? -1 : (this.cases - o.getCases() < 0 ? 1 : 0); } }

3. 实现mapper
package com.uuicon.sentiment_upload.covid; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class CovidMapper extends Mapper { Text outkey = new Text(); CovidCountBean outValue = https://www.it610.com/article/new CovidCountBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); outkey.set(fields[2]); String cases = ""; String death = ""; // 有一个州的数据死亡率为0 ,文件中显示为空 // 如果直接使用 fields.length - 2 和fields.length - 1 // 死亡率为0 的数据 fields.length - 2 是邮编 //所以做一下数据处理 if (fields.length == 5) { cases = fields[fields.length - 1]; death = "0"; } else { cases = fields[fields.length - 2]; death = fields[fields.length - 1]; } outValue.set(Long.parseLong(cases), Long.parseLong(death)); context.write(outkey, outValue); } }

4. 实现Reducer
package com.uuicon.sentiment_upload.covid; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CovidReduce extends Reducer { CovidCountBean outValue = https://www.it610.com/article/new CovidCountBean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long totalCases = 0; long totalDeaths = 0; for (CovidCountBean value : values) { totalCases += value.getCases(); totalDeaths += value.getDeaths(); } outValue.set(totalCases, totalDeaths); context.write(key, outValue); } }

4. 驱动程序
package com.uuicon.sentiment_upload.covid; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CovidDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创建作业实例 Job job = Job.getInstance(conf, CovidDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(CovidMapper.class); job.setReducerClass(CovidReduce.class); // 设置作业 mapper 阶段输出key value 数据类型, job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CovidCountBean.class); // 设置作业reducer 阶段输出key value 数据类型,也就是程序最终输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(CovidCountBean.class); // 配置作业的输入数据路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输出数据路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输出路径是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); } }

5. 运行程序
  1. MapReduce 驱动程序 鼠标右键运行程序,会报错:
    Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0 at com.uuicon.sentiment_upload.covid.CovidDriver.main(CovidDriver.java:34)

    Hadoop 入门笔记 十八: MapReduce基础编程
    文章图片

    没有传递运行程序的参数,导致args[0] 找不到
  2. 添加参数
【Hadoop 入门笔记 十八: MapReduce基础编程】Hadoop 入门笔记 十八: MapReduce基础编程
文章图片

  1. 运行结果
    Alabama4527347340 Alaska53524253 Arizona74597612861 Arkansas2908564784 California327220739521 Colorado3946685670 Connecticut2487657020 Delaware764951075 District of Columbia36132902 Florida168758626034 Georgia86916513404 Guam8541130 Hawaii25460403 Idaho1618631724

1. 案例一: 各州累计病例数量统计
1. 需求 将美国2021-01-28,每个州state的确诊案例数进行倒序排序。
2. 分析 如果你的需求中需要根据某个属性进行排序 ,不妨把这个属性作为key。因为MapReduce中key有默认排序行为的。但是需要进行如下考虑:
  • 如果你的需求是正序,并且数据类型是Hadoop封装好的基本类型。这种情况下不需要任何修改,直接使用基本类型作为key即可。因为Hadoop封装好的类型已经实现了排序规则。
    比如:LongWritable类型:
    Hadoop 入门笔记 十八: MapReduce基础编程
    文章图片

    Hadoop 入门笔记 十八: MapReduce基础编程
    文章图片
如果你的需求是倒序,或者数据类型是自定义对象。需要重写排序规则。需要对象实现Comparable接口,重写ComparTo方法。
compareTo方法用于将当前对象与方法的参数进行比较。
如果指定的数与参数相等返回0。
如果指定的数小于参数返回 -1。
如果指定的数2);
返回正数的话大于参数返回 1。
例如:o1.compareTo(o,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。
3. 代码实现
  1. 自定义对象排序
package com.uuicon.sentiment_upload.covid; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CovidCountBean implements WritableComparable { private Long cases; private Long deaths; public CovidCountBean() { }public CovidCountBean(Long cases, Long deaths) { this.cases = cases; this.deaths = deaths; }public void set(Long cases, Long deaths) { this.cases = cases; this.deaths = deaths; }@Override public String toString() { return cases + "\t" + deaths; }public Long getCases() { return cases; }public void setCases(Long cases) { this.cases = cases; }public Long getDeaths() { return deaths; }public void setDeaths(Long deaths) { this.deaths = deaths; }/** * 序列化 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(cases); out.writeLong(deaths); }/** * 反序列化,todo 反序列化的顺序需要和序列化一致 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { this.cases = in.readLong(); this.deaths = in.readLong(); }@Override public int compareTo(CovidCountBean o) { return this.cases - o.getCases() > 0 ? -1 : (this.cases - o.getCases() < 0 ? 1 : 0); } }

  1. Mapper类
package com.uuicon.sentiment_upload.covidsumsort; import com.uuicon.sentiment_upload.covid.CovidCountBean; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 使用上一个程序运行的结果作为输入 * KEYIN, 偏移量 * VALUEIN, 行内容 * KEYOUT,需要对州确诊人数作为排序,所以需要用 CovidCountBean 作为key, 并且需要实现WritableComparable接口 * VALUEOUT, */ public class CovidSumSortMapper extends Mapper { CovidCountBean outKey = new CovidCountBean(); Text outValue = https://www.it610.com/article/new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); System.out.println("provice="+fields[0]+"-f1--"+fields[1]+"--f2--"+fields[2]); outKey.set(Long.parseLong(fields[1]), Long.parseLong(fields[2])); outValue.set(fields[0]); context.write(outKey, outValue); } }

  1. Reduce 类
package com.uuicon.sentiment_upload.covidsumsort; import com.uuicon.sentiment_upload.covid.CovidCountBean; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * KEYIN, mapper 类型同输出key CovidCountBean * VALUEIN, mapper 类型同输出value Text * KEYOUT, Text * VALUEOUT CovidCountBean */ public class CovidSumSortReducer extends Reducer { Text outKey = new Text(); @Override protected void reduce(CovidCountBean key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value : values) { outKey.set(value); context.write(outKey,key); } } }

  1. 驱动类
package com.uuicon.sentiment_upload.covidsumsort; import com.uuicon.sentiment_upload.covid.CovidCountBean; import com.uuicon.sentiment_upload.covid.CovidMapper; import com.uuicon.sentiment_upload.covid.CovidReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CovidSumSortDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创建作业实例 Job job = Job.getInstance(conf, CovidSumSortDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidSumSortDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(CovidSumSortMapper.class); job.setReducerClass(CovidSumSortReducer.class); // 设置作业 mapper 阶段输出key value 数据类型, job.setMapOutputKeyClass(CovidCountBean.class); job.setMapOutputValueClass(Text.class); // 设置作业reducer 阶段输出key value 数据类型,也就是程序最终输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(CovidCountBean.class); // 配置作业的输入数据路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输出数据路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输出路径是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); } }

三. 各州累计病例分区统计
1. 需求 将美国每个州的疫情数据输出到各自不同的文件中,即一个州的数据在一个结果文件中。
输出到不同文件中-->reducetask有多个(>2)-->默认只有1个,如何有多个?--->可以设置,job.setNumReduceTasks(N)--->当有多个reducetask 意味着数据分区---->默认分区规则是什么? hashPartitioner--->默认分区规则符合你的业务需求么?---->符合,直接使用--->不符合自定义分区。
2. 代码实现
  1. 自定义分区器
package com.uuicon.sentiment_upload.covidpart; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import java.util.HashMap; /** * KEY, 输入的数据key类型 ,本例中是美国州 所以是Text * VALUE 输入的value 数据类型 ,本例中是行内容 也是Text */ public class CovidPartition extends Partitioner { // 模拟美国各州的数据字典; 实际中如果是分区少,可以用数据集合保存,如果分区量大可以使用radis 保存 public static HashMap stateMap = new HashMap<>(); static { stateMap.put("Alabama", 0); stateMap.put("Arkansas", 1); stateMap.put("California", 2); stateMap.put("Colorado", 3); stateMap.put("Florida", 4); stateMap.put("Georgia", 5); stateMap.put("Idaho", 6); }/** * 自定义分区规则: 只要getpartition 返回的int一样,数据就会被分到同一个分区 * 所谓的同一个分区就是数据被放入同一个文件 * * @param key州 * @param value一行文本 * @param numPartitions * @return */ @Override public int getPartition(Text key, Text value, int numPartitions) { Integer code = stateMap.get(key.toString()); if (code != null) { return code; } return 7; } }

  1. Mapper类
package com.uuicon.sentiment_upload.covidpart; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class CovidPartitionMapper extends Mapper { Text keyOut = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); keyOut.set(fields[2]); context.write(keyOut, value); } }

  1. Reduce 类
    package com.uuicon.sentiment_upload.covidpart; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CovidPartitionReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value, NullWritable.get()); } } }

  2. 驱动类
package com.uuicon.sentiment_upload.covidpart; import com.uuicon.sentiment_upload.covid.CovidCountBean; import com.uuicon.sentiment_upload.covidsumsort.CovidSumSortMapper; import com.uuicon.sentiment_upload.covidsumsort.CovidSumSortReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CovidPartDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创建作业实例 Job job = Job.getInstance(conf, CovidPartDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidPartDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(CovidPartitionMapper.class); job.setReducerClass(CovidPartitionReducer.class); // 设置作业 mapper 阶段输出key value 数据类型, job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置作业reducer 阶段输出key value 数据类型,也就是程序最终输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 设置分区 job.setNumReduceTasks(8); job.setPartitionerClass(CovidPartition.class); // 配置作业的输入数据路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输出数据路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输出路径是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); } }

    推荐阅读