OutputFormat数据输出
OutputFormat接口实现类
自定义OutputFormat
自定义OutputFormat案例实操
需求
过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log
- 输入数据
1 | www.baidu.com |
- 期望输出数据
需求分析
编写代码
编写FilterMapper类
1 | package com.atyp.mr.outputformat; |
编写FilterReducer类
1 | package com.atyp.mr.outputformat; |
自定义一个OutputFormat类
1 | package com.atyp.mr.outputformat; |
编写RecordWriter类
1 | package com.atyp.mr.outputformat; |
编写FilterDriver类
1 | package com.atyp.mr.outputformat; |
Join多种应用
Reduce Join
Reduce Join案例实操
需求
需求分析
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联
代码实现
创建商品和合并后的Bean类
1 | package com.atyp.mr.table; |
编写TableMapper类
1 | package com.atyp.mr.table; |
编写TableReducer类
1 | package com.atyp.mr.table; |
编写TableDriver类
1 | package com.atyp.mr.table; |
总结
Map Join
使用场景
Map Join适用于一张表十分小、一张表很大的场景。
优点
思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
具体办法:采用DistributedCache
- 在Mapper的setup阶段,将文件读取到缓存集合中。
- 在驱动函数中加载缓存。
// 缓存普通文件到Task运行节点。
job.addCacheFile(new URI(“file://e:/cache/pd.txt”));
Map Join案例实操
需求
需求分析
MapJoin适用于关联表中有小表的情形。
实现代码
先在驱动模块中添加缓存文件
1 | package com.atyp.mr.cache; |
读取缓存的文件数据
1 | package com.atyp.mr.cache; |
计数器应用
数据清洗(ETL)
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
数据清洗案例实操-简单解析版
需求
去除日志中字段长度小于等于11的日志。
- 输入数据(web.log)
- 期望输出数据:每行字段长度都大于11
需求分析
需要在Map阶段对输入的数据根据规则进行过滤清洗
实现代码
编写LogMapper
1 | package com.atyp.mr.log; |
编写LogDriver类
1 | package com.atyp.mr.log; |