Fork me on GitHub

MapReduce基础学习-五

OutputFormat数据输出

OutputFormat接口实现类

image

自定义OutputFormat

image

自定义OutputFormat案例实操

需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log

  • 输入数据
1
2
3
4
5
6
7
8
9
10
11
www.baidu.com
www.baidu.com
www.baidu.com
www.atguigu.com
www.atguigu.com
www.atguigu.com
www.123.com
www.345.com
www.rickyin.com
www.rickyin.com
www.atguigu.com
  • 期望输出数据

image

需求分析

image

编写代码

编写FilterMapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.atyp.mr.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// http://www.baidu.com
context.write(value, NullWritable.get());
}
}
编写FilterReducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.atyp.mr.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// http://www.baidu.com
// 防止有重复的数据
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
自定义一个OutputFormat类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.atyp.mr.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new FRecordWriter(job);
}
}
编写RecordWriter类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.atyp.mr.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class FRecordWriter extends RecordWriter<Text, NullWritable> {
FSDataOutputStream fosatguigu;
FSDataOutputStream fosother;

public FRecordWriter(TaskAttemptContext job) {
try {
// 1.获取文件系统
FileSystem fs = FileSystem.get(job.getConfiguration());

// 2.创建输出到atguigu.log的输出流
fosatguigu = fs.create(new Path("D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\atguigu.log"));

// 3.创建输出到other.log的输出流
fosother = fs.create(new Path("D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 判断 key 当中是否有 atguigu ,如果有写出到 atguigu.log ,如果没有写出到 other.log
if (key.toString().contains("atguigu")) {
// atguigu输出流
fosatguigu.write(key.toString().getBytes());
} else {
fosother.write(key.toString().getBytes());
}

}

@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(fosatguigu);
IOUtils.closeStream(fosother);
}
}
编写FilterDriver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.atyp.mr.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FilterDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_08_outputFormat", "D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_08_outputFormat"};

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(FilterDriver.class);

job.setMapperClass(FilterMapper.class);
job.setReducerClass(FilterReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 将自定的输出格式组件设置到job中
job.setOutputFormatClass(FilterOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
// 虽然我们自定义了 outputformat ,但是因为我们的outputformat继承自 fileoutputformat
// 而 fileoutputformat 要输出一个 _SUCCESS文件,所以,在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}

}

Join多种应用

Reduce Join

image

Reduce Join案例实操

需求

image

需求分析

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联

image

代码实现

创建商品和合并后的Bean类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.atyp.mr.table;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableBean implements Writable {
//订单id
private String id;
//产品id
private String pid;
//数量
private int amount;
//产品名称
private String pname;
//定义一个标记位,标记是产品表还是订单表
private String flag;

//空参构造是为反射用的
public TableBean() {
}

public TableBean(String id, String pid, int amount, String pname, String flag) {
this.id = id;
this.pid = pid;
this.amount = amount;
this.pname = pname;
this.flag = flag;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}

@Override
public void readFields(DataInput in) throws IOException {
id = in.readUTF();
pid = in.readUTF();
amount = in.readInt();
pname = in.readUTF();
flag = in.readUTF();
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getPid() {
return pid;
}

public void setPid(String pid) {
this.pid = pid;
}

public int getAmount() {
return amount;
}

public void setAmount(int amount) {
this.amount = amount;
}

public String getPname() {
return pname;
}

public void setPname(String pname) {
this.pname = pname;
}

public String getFlag() {
return flag;
}

public void setFlag(String flag) {
this.flag = flag;
}

@Override
public String toString() {
return id+"\t"+pname+"\t"+amount;
}
}
编写TableMapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.atyp.mr.table;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
String name;
TableBean tableBean = new TableBean();
Text k = new Text();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 获取文件名称
FileSplit inputSplit = (FileSplit) context.getInputSplit();

name = inputSplit.getPath().getName();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取一行
String line = value.toString();
if (name.startsWith("order")) {
// 订单表
String[] fields = line.split("\t");

// 封装 key 和 value
tableBean.setId(fields[0]);
tableBean.setPid(fields[1]);
tableBean.setAmount(Integer.parseInt(fields[2]));
// 没有不能为空,否则会报错
tableBean.setPname("");
tableBean.setFlag("order");

k.set(fields[1]);
} else {
//产品表
String[] fiedls = line.split("\t");

tableBean.setId("");
tableBean.setPid(fiedls[0]);
tableBean.setAmount(0);
tableBean.setPname(fiedls[1]);
tableBean.setFlag("pd");

k.set(fiedls[0]);
}
context.write(k, tableBean);
}
}
编写TableReducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.atyp.mr.table;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
// 存储所有订单集合
ArrayList<TableBean> orderBeans = new ArrayList<>();
// 存储产品信息(只有一个产品信息)
TableBean pdBean = new TableBean();

for (TableBean tableBean : values) {
if ("order".equals(tableBean.getFlag())) {
// 订单表
TableBean tmpBean = new TableBean();
try {
BeanUtils.copyProperties(tmpBean, tableBean);

orderBeans.add(tmpBean);

} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}else {
try {
BeanUtils.copyProperties(pdBean, tableBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}

for (TableBean orderBean : orderBeans) {
orderBean.setPname(pdBean.getPname());
context.write(orderBean, NullWritable.get());
}


}
}
编写TableDriver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.atyp.mr.table;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_09_mapJoin", "D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_09_mapJoin"};

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(TableDriver.class);

job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);

job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
总结

image

Map Join

使用场景

Map Join适用于一张表十分小、一张表很大的场景。

优点

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

具体办法:采用DistributedCache

  1. 在Mapper的setup阶段,将文件读取到缓存集合中。
  2. 在驱动函数中加载缓存。
    // 缓存普通文件到Task运行节点。
    job.addCacheFile(new URI(“file://e:/cache/pd.txt”));

Map Join案例实操

需求

image
image

需求分析

MapJoin适用于关联表中有小表的情形。

image

实现代码

先在驱动模块中添加缓存文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.atyp.mr.cache;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class DistributedCacheDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 0 根据自己电脑路径重新配置
args = new String[]{"D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_10_reduceJoin", "D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_10_reduceJoin"};

// 1 获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 设置加载jar包路径
job.setJarByClass(DistributedCacheDriver.class);

// 3 关联map(因为现在没有reduce阶段,所以只设置map)
job.setMapperClass(DistributedCacheMapper.class);

// 4 设置最终输出数据类型(没有map,所以不用设置其他,直接设置最终)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 加载缓存数据
job.addCacheFile(new URI("file:///D:/WorkSpace/IdeaWorpace/hadoop_input_output/input_10_readuceJoinCache/pd.txt"));

// 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);

// 8 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);

}
}
读取缓存的文件数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.atyp.mr.cache;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.awt.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

HashMap<String, String> pdMap = new HashMap();
Text k = new Text();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 缓存小表
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].getPath().toString();
System.out.println("-------------------------------:"+path);
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
// 要缓存的数据
/** pd.txt
* pid pname
* 01 小米
*/
// 1.切割
String[] fileds = line.split("\t");

pdMap.put(fileds[0], fileds[1]);
}
// 2.关闭资源
IOUtils.closeStream(reader);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 原始数据
/** order.txt
* id pid amount
* 1001 01 1
* 1002 02 2
*/
/** pd.txt
* pid pname
* 01 小米
*/
// 1.获取一行
String line = value.toString();

// 2.切割
String[] fileds = line.split("\t");

// 3.获取pid
String pid = fileds[1];

// 4.取出pname
String pname = pdMap.get(pid);

// 5.拼接
line = line + "\t" + pname;

k.set(line);

// 6.写出
context.write(k, NullWritable.get());

}
}

计数器应用

image

数据清洗(ETL)

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

数据清洗案例实操-简单解析版

需求

去除日志中字段长度小于等于11的日志。

  • 输入数据(web.log)
  • 期望输出数据:每行字段长度都大于11

需求分析

需要在Map阶段对输入的数据根据规则进行过滤清洗

实现代码

编写LogMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.atyp.mr.log;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.获取一行
String line = value.toString();

// 2.解析数据
boolean result = parseLog(line, context);

// 解析失败,证明数据无用
if (!result) {
return;
}

// 3.解析通过,写出
context.write(value, NullWritable.get());
}

private boolean parseLog(String line, Context context) {

String[] fields = line.split(" ");

if (fields.length > 11) {
context.getCounter("map", "true").increment(1);
return true;
} else {
context.getCounter("map", "false").increment(1);
return false;
}
}
}
编写LogDriver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.atyp.mr.log;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\input_11_log", "D:\\WorkSpace\\IdeaWorpace\\hadoop_input_output\\output_11_log" };

// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 加载jar包
job.setJarByClass(LogDriver.class);

// 3 关联map
job.setMapperClass(LogMapper.class);

// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 设置reducetask个数为0
job.setNumReduceTasks(0);

// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交
job.waitForCompletion(true);

}
}

数据清洗案例实操-复杂解析版