Fork me on GitHub

MapReduce基础学习-四

Shuffle机制

Shuffle机制机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

image

Partition分区

image
image

分区源码分析

image
image
image
image
image
image
image
image
image
image
image
image
image
image

Partition分区案例实操

需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

  • 输入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200
  • 输出数据

手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

需求分析

image

在案例2.4的基础上,增加一个分区类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.atyp.mr.flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
* Partitioner<Text, FlowBean>
* 这两个泛型时map阶段的输出
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// key 是手机号
// value 是流量信息

// 获取手机号前三位
String prePhoneNum = key.toString().substring(0, 3);

int partition = 4;

if ("136".equals(prePhoneNum)) {
partition = 0;
} else if ("137".equals(prePhoneNum)) {
partition = 1;
} else if ("138".equals(prePhoneNum)) {
partition = 2;
} else if ("139".equals(prePhoneNum)) {
partition = 3;
} else {
partition = 4;
}
return partition;
}
}

在驱动函数中增加自定义数据分区设置和ReduceTask设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.atyp.mr.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowSumDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_02","D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_02"};
Configuration conf = new Configuration();
// 1.获取Job对象
Job job = Job.getInstance(conf);

// 2.设置jar的路径
job.setJarByClass(FlowSumDriver.class);

// 3.关联Mapper和Reducer
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 4.设置mapper输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

// 5.设置最终输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 8.指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);
// 9.同时指定相应数量的reduce task
job.setNumReduceTasks(5);

// 6.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7.提交Job
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}

结果

image

分区总结

image

WritableComparable排序

image
image

排序的分类

image

自定义排序WritableComparable

bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public int compareTo(FlowBean o) {

int result;

// 按照总流量大小,倒序排列
if (sumFlow > bean.getSumFlow()) {
result = -1;
}else if (sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}

WritableComparable排序案例实操(全排序)

需求

根据案例2.3产生的结果再次对总流量进行排序。

  • (1)输入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200
  • (2)期望输出数据
1
2
3
4
5
13509468723	7335	110349	117684
13736230513 2481 24681 27162
13956435636 132 1512 1644
13846544121 264 0 264
。。。 。。。

按照总流量倒序排序的

需求分析

image

代码实现

FlowBean对象在在需求1基础上增加了比较功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.atyp.mr.sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量

public FlowBean() {
}

public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

// 比较
@Override
public int compareTo(FlowBean bean) {
// 核心比较条件判断
int result;
if (sumFlow > bean.getSumFlow()) {
//需要 倒序 ,所以这里是 -1
result = -1;
} else if (sumFlow < bean.getSumFlow()) {
result = 1;
} else {
//相等
result = 0;
}

return result;
}

// 序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
编写Mapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.atyp.mr.sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 13736230513 2481 24681 27162

// 1.获取一行
String line = value.toString();

// 2.切割
String[] fields = line.split("\t");

// 3.封装对象
String phoneNum = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
long sumFlow = Long.parseLong(fields[3]);

k.setDownFlow(downFlow);
k.setUpFlow(upFlow);
k.setSumFlow(sumFlow);

v.set(phoneNum);

// 4.写出
context.write(k,v);
}
}
编写Reducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.atyp.mr.sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//循环输出,避免总流量相同情况
for (Text value : values) {
context.write(value, key);
}
}
}
编写Driver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.atyp.mr.sort;

import jdk.nashorn.internal.scripts.JO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCountSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//输入输出路径
args = new String[]{"D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_06_FlowCountSort", "D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_06_FlowCountSort"};

// 1.获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance();

// 6.指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCountSortDriver.class);

// 2.指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);

// 3.指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

// 4.指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 5.指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7.将Job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

WritableComparable排序案例实操(区内排序)

需求

要求每个省份手机号输出的文件中按照总流量内部排序。

要求分析

基于前一个需求,增加自定义分区类,分区按照省份手机号设置。

image

案列实操

增加自定义分区类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.atyp.mr.sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;


public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {

// 按照手机号的前三位分区
String prePhoneNum = value.toString().substring(0, 3);

int partition = 4;
if ("136".equals(prePhoneNum)) {
partition = 0;
} else if ("137".equals(prePhoneNum)) {
partition = 1;
} else if ("138".equals(prePhoneNum)) {
partition = 2;
} else if ("139".equals(prePhoneNum)) {
partition = 3;
} else {
partition = 4;
}

return partition;
}
}
在驱动类中添加分区类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.atyp.mr.sort;

import jdk.nashorn.internal.scripts.JO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCountSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//输入输出路径
args = new String[]{"D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_06_FlowCountSort", "D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_06_FlowCountSort_quSort"};

// 1.获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance();

// 6.指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCountSortDriver.class);

// 2.指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);

// 3.指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

// 4.指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);

// 5.指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7.将Job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

Combiner合并

image
(6)自定义Combiner实现步骤

  • 自定义一个Combiner继承Reducer,重写Reduce方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{

@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

// 1 汇总操作
int count = 0;
for(IntWritable v :values){
count += v.get();
}

// 2 写出
context.write(key, new IntWritable(count));
}
}
  • 在Job驱动类中设置:
1
job.setCombinerClass(WordcountCombiner.class);

Combiner合并案例实操

需求

统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。

  • 输入数据
1
2
3
4
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
  • 期望输出数据

期望:Combine输入数据多,输出时经过合并,输出数据降低。

需求分析

image

案例实操-方案一

增加一个WordcountCombiner类继承Reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.atyp.mr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Reducer<Text, IntWritable, Text, IntWritable>
* 此合并案例中Reucer的参数是:前两个是map阶段的输出参数,后两个是最终要输出的参数类型
*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1.累加求和
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}

v.set(sum);

// 2.写出
context.write(key, v);
}
}
在WordcountDriver驱动类中指定Combiner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.atyp.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args = new String[]{"D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_01","D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_01"};

Configuration conf = new Configuration();
// 1.获取Job对象
Job job = Job.getInstance(conf);

// 2.设置jar存储位置
job.setJarByClass(WordCountDriver.class);

// 3.关联Map和Reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 4.设置Mapper阶段数据输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 5.设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
//job.setInputFormatClass(CombineTextInputFormat.class);
// 虚拟存储切片最大值设置为 4M
//CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

//如果 NumReduceTasks 的个数你不设置,那默认是1(任何数%1=0),那么分区就没有任何意义
//job.setNumReduceTasks(2);

// 指定需要使用 combiner ,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordCountCombiner.class);

// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7.提交Job
//job.submit();
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}

案例实操-方案二

将WordcountReducer作为Combiner在WordcountDriver驱动类中指定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.atyp.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args = new String[]{"D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_01","D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_01"};

Configuration conf = new Configuration();
// 1.获取Job对象
Job job = Job.getInstance(conf);

// 2.设置jar存储位置
job.setJarByClass(WordCountDriver.class);

// 3.关联Map和Reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 4.设置Mapper阶段数据输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 5.设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
//job.setInputFormatClass(CombineTextInputFormat.class);
// 虚拟存储切片最大值设置为 4M
//CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

//如果 NumReduceTasks 的个数你不设置,那默认是1(任何数%1=0),那么分区就没有任何意义
//job.setNumReduceTasks(2);

// 指定需要使用 combiner ,以及用哪个类作为 combiner 的逻辑
//job.setCombinerClass(WordCountCombiner.class);

//关联合并类
job.setCombinerClass(WordCountReducer.class);

// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7.提交Job
//job.submit();
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}

GroupingComparator分组(辅助排序)

对Reduce阶段的数据根据某一个或几个字段进行分组。

分组排序步骤:

  • 自定义类继承WritableComparator
  • 重写compare()方法
1
2
3
4
5
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}
  • 创建一个构造将比较对象的类传给父类
1
2
3
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}

GroupingComparator分组案例实操

需求

如下订单
image
现在需要求出每一个订单中最贵的商品。

  • 输入数据(GroupingComparator.txt)
1
2
3
4
5
6
7
0000001	Pdt_01	222.8
0000002 Pdt_05 722.4
0000001 Pdt_02 33.8
0000003 Pdt_06 232.8
0000003 Pdt_02 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
  • 输出数据
1
2
3
1	222.8
2 722.4
3 232.8

需求分析

  1. 利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。
  2. 在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品

image

代码实现

定义订单信息OrderBean类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.atyp.mr.order;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

// 订单id
private int order_id;
// 订单价格
private double price;

public OrderBean() {
}

public OrderBean(int order_id, double price) {
this.order_id = order_id;
this.price = price;
}

@Override
public int compareTo(OrderBean o) {
// 先按照订单id排序,如果相同按照价格降序排序
int result = 0;
if (order_id > o.getOrder_id()) {
result = 1;
} else if (order_id < o.getOrder_id()) {
result = -1;
} else {
result = price > o.getPrice() ? -1 : 1;
}
return result;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeDouble(price);
}

@Override
public void readFields(DataInput in) throws IOException {
order_id = in.readInt();
price = in.readDouble();
}

public int getOrder_id() {
return order_id;
}

public void setOrder_id(int order_id) {
this.order_id = order_id;
}

public double getPrice() {
return price;
}

public void setPrice(double price) {
this.price = price;
}

@Override
public String toString() {
return order_id + "\t" + price;
}
}
编写OrderMapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.atyp.mr.order;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

OrderBean k = new OrderBean();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//000000001 222.8
// 1.获取一行数据
String line = value.toString();

// 2.切割
String[] fields = line.split("\t");

// 3.封装对象
k.setOrder_id(Integer.parseInt(fields[0]));
k.setPrice(Double.parseDouble(fields[2]));

// 4.写出
context.write(k,NullWritable.get());
}
}
编写OrderReducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.atyp.mr.order;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
编写OrderGroupingComparator类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.atyp.mr.order;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator {

// 需要写一个构造
public OrderGroupingComparator() {
// 第二个参数为true,如果不是true,就会报空指针异常,可以看源码
super(OrderBean.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
// 要求只要id相同,就认为是相同的key
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = 1;
} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = -1;
} else {
// 如果是id相等,那么就会将相等的数据传入同一个reduce中
result = 0;
}
return result;
}
}
编写OrderDriver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.atyp.mr.order;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_07_order", "D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_07_order"};

// 1.获取配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2.设置Jar包加载路径
job.setJarByClass(OrderDriver.class);

// 3.加载map/reduce类
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);

// 4.加载map输出数据key和value类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);

// 5.设置最终输出数据的key和value类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);

// 6.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 8.设置reducer端的分组
job.setGroupingComparatorClass(OrderGroupingComparator.class);

// 7.提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

MapTask工作机制

image

  1. Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
  2. Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
  3. Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
  4. Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情:

  • 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
  • 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
  • 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
  1. Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

ReduceTask工作机制

ReduceTask工作机制

image

  1. Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  2. Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
  3. Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
  4. Reduce阶段:reduce()函数将计算结果写到HDFS上。

设置ReduceTask并行度(个数)

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

1
2
// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

实验:测试ReduceTask多少合适

  1. 实验环境:1个Master节点,16个Slave节点:CPU:8GHZ,内存: 2G
  2. 实验结论:

image

  1. 注意事项
    image