Fork me on GitHub

MapReduce基础学习-一

MapReduce概述

MapReduce定义

image

MapReduce优缺点

优点

image
image

缺点

image

MapReduce核心思想

image

  • 1)分布式的运算程序往往需要分成至少2个阶段。
  • 2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
  • 3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
  • 4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
  • 总结:分析WordCount数据流走向深入理解MapReduce核心思想。

MapReduce进程

image

官方WordCount源码

用反编译工具反编译源码,发现WordCount案例有Map类、Reduce类和驱动类。且数据的类型是Hadoop自身封装的序列化类型。

image
image
image
image

常用数据序列化类型

常用的数据类型对应的Hadoop数据序列化类型
Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable

MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

image
image

WordCount案列实操

需求

在给定的文本文件中统计输出每一个单词出现的总次数

  1. 输入数据(hello.txt)
1
2
3
4
5
6
7
atyp atyp
ss ss
cls cls
jiao
banzhang
xue
hadoop
  1. 期望输出数据
1
2
3
4
5
6
7
atyp        2
ss 2
cls 2
jiao 1
banzhang 1
xue 1
hadoop 1
需求分析

按照MapReduce编程规范,分别编写Mapper,Reducer,Driver
image

环境准备
  1. 创建maven工程(mr-0529)
  2. 在pom.xml文件中添加如下依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
</dependencies>
  1. 在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
编写程序
  1. 编写Mapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.atyp.mr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

// map阶段
// KEYIN 输入数据的key(偏移量)
// VALUEIN 输入数据的value(Text)
// KEYOUT 输出数据的key的类型(数据格式:atyp,1 ss,1)前面是key后面是value
// VALUEOUT 输出数据的value的类型
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// atyp atyp
// 1.获取一行
String line = value.toString();

// 2.切割单词
String[] words = line.split(" ");

// 3.循环写出
for (String word : words) {
// atyp
//Text k = new Text();
k.set(word);
// 1
//IntWritable v = new IntWritable();
//v.set(1);
context.write(k, v);
}
}
}

image

  1. 编写Reducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.atyp.mr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

// Reducer阶段
// Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
// KEYIN,VALUEIN map阶段输出的key和value(注意:是manp阶段的输出的k和v)
// KEYOUT,VALUEOUT 输出的格式是 atyp,2 所以是 Text, IntWritable
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// atyp,1
// atyp,1

int sum = 0;
// 1.累加求和
for (IntWritable value : values) {
sum += value.get();
}

v.set(sum);

// 2.写出 atyp 2
context.write(key,v);
}
}
注意:这里reducer阶段的前两个泛型是map阶段输出的k和v

image

  1. 编写Driver驱动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.atyp.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 1.获取Job对象
Job job = Job.getInstance(conf);

// 2.设置jar存储位置
job.setJarByClass(WordCountDriver.class);

// 3.关联Map和Reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 4.设置Mapper阶段数据输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 5.设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7.提交Job
//job.submit();
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}

image
image
image

本地测试

image
image
image
image

集群上测试
  1. 用maven打jar包,需要添加的打包插件依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin </artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.atyp.mr.wordcount.WordCountDriver</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

注意:全类名

  1. 将程序打成jar包,然后拷贝到Hadoop集群中

image
image
image
image

  1. 启动Hadoop集群
  2. 执行WordCount程序

image
image
image
image
image