Fork me on GitHub

HBase基础学习-三之HBaseAPI操作、MapReduce操作

HBase API操作

环境准备

新建项目后在pom.xml中添加依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>

HBaseAPI

整体操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.rickyin.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;

import java.io.IOException;

/**
* 测试HBaseAPI
*/
public class TestHBaseAPI_1 {
public static void main(String[] args) throws IOException {

//通过java代码访问mysql数据库:JDBC
//1.加载数据库驱动
//2.获取数据库连接(url,user,password)
//3.获取数据库的操作对象
//4.sql
//5.执行数据库操作
//6.获取查询结果ResultSet

//通过Java代码访问HBase数据库

//0.创建配置对象,获取hbase连接
Configuration configuration = HBaseConfiguration.create();

//1.获取hbase连接对象
Connection connection = ConnectionFactory.createConnection(configuration);

//2.获取操作对象: Admin
//new HBaseAdmin(connection);这种方式过时了
Admin admin = connection.getAdmin();

//3.操作数据库:
// 3.1判断命名空间是否存在
try {
admin.getNamespaceDescriptor("rickyin");
} catch (NamespaceNotFoundException e) {
e.printStackTrace();
//创建命名空间
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("rickyin").build();
admin.createNamespace(namespaceDescriptor);
}
// 3.2判断hbase中是否存在某张表
TableName student = TableName.valueOf("rickyin:student");
boolean exists = admin.tableExists(student);
System.out.println(exists);

if (exists) {
//获取指定的表对象
Table table = connection.getTable(student);

//查询数据
//DDL(create、drop、alter),DML(update、insert、delete),DQL(select)
String rowkey = "1001";
//字符编码问题
Get get = new Get(Bytes.toBytes(rowkey));
//查询结果
Result result = table.get(get);
boolean empty = result.isEmpty();
System.out.println("1001数据是否存在:" + !empty);
if (empty) {
//新增数据
Put put = new Put(Bytes.toBytes(rowkey));
String falily = "info";
String column = "name";
String val = "zhangsan";
put.addColumn(Bytes.toBytes(falily), Bytes.toBytes(column), Bytes.toBytes(val));
table.put(put);
System.out.println("增加数据...");
} else {
//展示数据
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
//cell.
System.out.println("value:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)));
System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("column:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
}
}
} else {
//创建表
HTableDescriptor hTableDescriptor = new HTableDescriptor(student);
//增加列族
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("info");
hTableDescriptor.addFamily(hColumnDescriptor);

admin.createTable(hTableDescriptor);

System.out.println("表创建完成...");
}
//4.获取操作的结果
//5.关闭数据库连接
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.rickyin.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
* 测试HBaseAPI
*/
public class TestHBaseAPI_2 {
public static void main(String[] args) throws IOException {


Configuration configuration = HBaseConfiguration.create();

//1.获取hbase连接对象
Connection connection = ConnectionFactory.createConnection(configuration);

TableName tableName = TableName.valueOf("rickyin:student");

/**
* 删除表操作
*/
/*
Admin admin = connection.getAdmin();
if (admin.tableExists(tableName)) {
//禁用表
admin.disableTable(tableName);
//删除表
admin.deleteTable(tableName);
}
*/

/**
* 删除数据操作
*/
Table table = connection.getTable(tableName);

String rowkey = "1001";
Delete delete = new Delete(Bytes.toBytes(rowkey));
table.delete(delete);
System.out.println("删除数据");
}
}

获取Configuration对象

1
2
3
4
5
6
7
public static Configuration conf;
static{
//使用HBaseConfiguration的单例方法实例化
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.9.102");
conf.set("hbase.zookeeper.property.clientPort", "2181");
}

注意:获取Configuration对象可以这样获取,也可以向我上面写的那样获取,区别就是上面的方法讲zookeeper的配置放在了配置文件中(hbase-site.xml)放于项目的resources目录下,而这里是手动添加进去的,都可以

判断表是否存在

1
2
3
4
5
6
7
8
public static boolean isTableExist(String tableName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException{
//在HBase中管理、访问表需要先创建HBaseAdmin对象
//Connection connection = ConnectionFactory.createConnection(conf);
//HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
HBaseAdmin admin = new HBaseAdmin(conf);
return admin.tableExists(tableName);
}

创建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void createTable(String tableName, String... columnFamily) throws
MasterNotRunningException, ZooKeeperConnectionException, IOException{
HBaseAdmin admin = new HBaseAdmin(conf);
//判断表是否存在
if(isTableExist(tableName)){
System.out.println("表" + tableName + "已存在");
//System.exit(0);
}else{
//创建表属性对象,表名需要转字节
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
//创建多个列族
for(String cf : columnFamily){
descriptor.addFamily(new HColumnDescriptor(cf));
}
//根据对表的配置,创建表
admin.createTable(descriptor);
System.out.println("表" + tableName + "创建成功!");
}
}

删除表

1
2
3
4
5
6
7
8
9
10
11
public static void dropTable(String tableName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException{
HBaseAdmin admin = new HBaseAdmin(conf);
if(isTableExist(tableName)){
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("表" + tableName + "删除成功!");
}else{
System.out.println("表" + tableName + "不存在!");
}
}

向表中插入数据

1
2
3
4
5
6
7
8
9
10
11
12
public static void addRowData(String tableName, String rowKey, String columnFamily, String
column, String value) throws IOException{
//创建HTable对象
HTable hTable = new HTable(conf, tableName);
//向表中插入数据
Put put = new Put(Bytes.toBytes(rowKey));
//向Put对象中组装数据
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
hTable.close();
System.out.println("插入数据成功");
}

删除多行数据

1
2
3
4
5
6
7
8
9
10
public static void deleteMultiRow(String tableName, String... rows) throws IOException{
HTable hTable = new HTable(conf, tableName);
List<Delete> deleteList = new ArrayList<Delete>();
for(String row : rows){
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
hTable.delete(deleteList);
hTable.close();
}

获取所有数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void getAllRows(String tableName) throws IOException{
HTable hTable = new HTable(conf, tableName);
//得到用于扫描region的对象
Scan scan = new Scan();
//使用HTable得到resultcanner实现类的对象
ResultScanner resultScanner = hTable.getScanner(scan);
for(Result result : resultScanner){
Cell[] cells = result.rawCells();
for(Cell cell : cells){
//得到rowkey
System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
//得到列族
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}

获取某一行数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void getRow(String tableName, String rowKey) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
//get.setMaxVersions();显示所有版本
//get.setTimeStamp();显示指定时间戳的版本
Result result = table.get(get);
for(Cell cell : result.rawCells()){
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
}
}

获取某一行指定“列族:列”的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void getRowQualifier(String tableName, String rowKey, String family, String
qualifier) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Result result = table.get(get);
for(Cell cell : result.rawCells()){
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}

HBaseAPI封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.rickyin.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
* HBase操作工具类
*/
public class HBaseUtils {

//为了解决线程安全问题
private static ThreadLocal<Connection> connHolder = new ThreadLocal<Connection>();

private HBaseUtils() {

}

/**
* 获取HBase连接
*
* @return
* @throws IOException
*/
public static void makeHbaseConnection() throws IOException {
// Configuration conf = HBaseConfiguration.create();
// conn = ConnectionFactory.createConnection(conf);
// return conn;
Connection conn = connHolder.get();
if (conn == null) {
Configuration conf = HBaseConfiguration.create();
conn = ConnectionFactory.createConnection(conf);
connHolder.set(conn);
}
}

/**
* 增加数据
*
* @param rowKey rowkey
* @param family 列祖
* @param column 列名
* @param value 值
*/
public static void insertData(String tableName, String rowKey, String family, String column, String value) throws IOException {
Connection conn = connHolder.get();
Table table = conn.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
table.close();
}


/**
* 关闭HBase连接
*
* @throws IOException
*/
public static void close() throws IOException {
Connection conn = connHolder.get();
if (conn != null) {
conn.close();
connHolder.remove();
}
}
}

MapReduce

通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的表中,比如我们从HBase中读取一些原始数据后使用MapReduce做数据分析。

查看HBase的MapReduce任务的执行

1
$ bin/hbase mapredcp

环境变量的导入

在/etc/profile配置

1
export HBASE_HOME=/opt/module/hbase-1.3.1

并在hadoop-env.sh中配置:(注意:在for循环之后配

1
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

MR-从HBase到HBase

Table2TableApplication

1
2
3
4
5
6
7
8
9
10
11
package com.rickyin.hbase;

import com.rickyin.hbase.tool.HBaseMapperReduceTool;
import org.apache.hadoop.util.ToolRunner;

public class Table2TableApplication {
public static void main(String[] args) throws Exception {
//ToolRunner可以运行mapreduce
ToolRunner.run(new HBaseMapperReduceTool(), args);
}
}

HBaseMapperReduceTool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.rickyin.hbase.tool;

import com.rickyin.hbase.mapper.ScanDataMapper;
import com.rickyin.hbase.reducer.InsertDataReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;

public class HBaseMapperReduceTool implements Tool {
public int run(String[] strings) throws Exception {
//作业
Job job = Job.getInstance();
job.setJarByClass(HBaseMapperReduceTool.class);

//mapper
TableMapReduceUtil.initTableMapperJob(
"rickyin:student",
new Scan(),
ScanDataMapper.class,
ImmutableBytesWritable.class,
Put.class,
job);
//reducer
TableMapReduceUtil.initTableReducerJob(
"rickyin:user",
InsertDataReducer.class,
job);

//执行作业
boolean flag = job.waitForCompletion(true);

return flag ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
}

public void setConf(Configuration configuration) {

}

public Configuration getConf() {
return null;
}
}

ScanDataMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.rickyin.hbase.mapper;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;

import java.io.IOException;

public class ScanDataMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//运行mapper查询数据

//scan result -> put
Put put = new Put(key.get());
for (Cell cell : value.rawCells()) {
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
}

context.write(key, put);
}
}

InsertDataReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.rickyin.hbase.reducer;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class InsertDataReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//运行reducer,增加数据
for (Put put : values) {
context.write(NullWritable.get(), put);
}
}
}

MR-从文件到HBase

File2TableApplication

1
2
3
4
5
6
7
8
9
10
package com.rickyin.hbase.mr;

import com.rickyin.hbase.mr.tool.File2TableTool;
import org.apache.hadoop.util.ToolRunner;

public class File2TableApplication {
public static void main(String[] args) throws Exception {
ToolRunner.run(new File2TableTool(), args);
}
}

File2TableTool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.rickyin.hbase.mr.tool;

import com.rickyin.hbase.mr.mapper.ReadFileMapper;
import com.rickyin.hbase.mr.reducer.InsertDataReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;

public class File2TableTool implements Tool {
public int run(String[] strings) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(File2TableTool.class);

//hbase => hbase(util):之前的
//hdfs => hbase(util):现在的做法

//format
Path path = new Path("hdfs://hadoop101:9000/student.csv");
FileInputFormat.addInputPath(job, path);

//mapper
job.setMapperClass(ReadFileMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

//reducer
TableMapReduceUtil.initTableReducerJob(
"rickyin:student",
InsertDataReducer.class,
job);

boolean flag = job.waitForCompletion(true);
return flag ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
}

public void setConf(Configuration configuration) {

}

public Configuration getConf() {
return null;
}
}

ReadFileMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.rickyin.hbase.mr.mapper;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReadFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] split = value.toString().split(",");
String rowkey = split[0];

byte[] bs = Bytes.toBytes(rowkey);
Put put = new Put(bs);
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
context.write(new ImmutableBytesWritable(bs), put);


}
}

InsertDataReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.rickyin.hbase.mr.reducer;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class InsertDataReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//运行reducer,增加数据
for (Put put : values) {
context.write(NullWritable.get(), put);
}
}
}

从HBase到MYsql

HBase2MysqlApplication

1
2
3
4
5
6
7
8
9
10
package com.rickyin.hbase;

import com.rickyin.hbase.tool.HBase2MysqlTool;
import org.apache.hadoop.util.ToolRunner;

public class HBase2MysqlApplication {
public static void main(String[] args) throws Exception {
ToolRunner.run(new HBase2MysqlTool(), args);
}
}

HBase2MysqlTool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.rickyin.hbase.tool;

import com.rickyin.hbase.bean.CacheData;
import com.rickyin.hbase.format.MysqlOutputFormat;
import com.rickyin.hbase.mapper.ScanHbaseMapper;
import com.rickyin.hbase.reducer.Hbase2MysqlReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;

public class HBase2MysqlTool implements Tool {
public int run(String[] strings) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(HBase2MysqlTool.class);

//mapper
TableMapReduceUtil.initTableMapperJob(
"rickyin:student",
new Scan(),
ScanHbaseMapper.class,
Text.class,
CacheData.class,
job);
//reducer
job.setReducerClass(Hbase2MysqlReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CacheData.class);

job.setOutputFormatClass(MysqlOutputFormat.class);

return job.waitForCompletion(true) ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
}

public void setConf(Configuration configuration) {

}

public Configuration getConf() {
return null;
}
}

ScanHbaseMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.rickyin.hbase.mapper;

import com.rickyin.hbase.bean.CacheData;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class ScanHbaseMapper extends TableMapper<Text, CacheData> {
@Override
protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
for (Cell cell : result.rawCells()) {
String name = Bytes.toString(CellUtil.cloneValue(cell));
CacheData data = new CacheData();
data.setName(name);
data.setCount(1);

context.write(new Text(name), data);
}
}
}

CacheData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.rickyin.hbase.bean;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CacheData implements WritableComparable<CacheData> {
private String name;
private Integer count;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getCount() {
return count;
}

public void setCount(Integer count) {
this.count = count;
}

public int compareTo(CacheData data) {
return name.compareTo(data.name);
}

public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeInt(count);
}

public void readFields(DataInput dataInput) throws IOException {
name = dataInput.readUTF();
count = dataInput.readInt();
}
}

MysqlOutputFormat(需要在这个里面实现向mysql中写数据,这里我没有写,但是整体架子是这样的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.rickyin.hbase.format;

import com.rickyin.hbase.bean.CacheData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;

import java.io.IOException;

public class MysqlOutputFormat extends OutputFormat<Text, CacheData> {
public RecordWriter<Text, CacheData> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return null;
}

public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {

}

public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return null;
}
}

Hbase2MysqlReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.rickyin.hbase.reducer;

import com.rickyin.hbase.bean.CacheData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Hbase2MysqlReducer extends Reducer<Text, CacheData, Text, CacheData> {
@Override
protected void reduce(Text key, Iterable<CacheData> datas, Context context) throws IOException, InterruptedException {
int sum = 0;
for (CacheData data : datas) {
sum = sum + data.getCount();
}

CacheData sumData = new CacheData();
sumData.setName(key.toString());
sumData.setCount(sum);

context.write(key, sumData);
}
}

HBase协处理器

之前我们第一个MR操作是将HBase中一张表的数据写入到HBase的另外一张表中,这样显得会很麻烦,我们有没有什么办法在往A表中写数据的时候同时也往B表中写入数据(有点B表是A表的备份表的意思),mysql中有个trggie可以做到,HBase中有个协处理器可以做

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.rickyin.hbase.coprocesser;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

import java.io.IOException;

/**
* 协处理器(HBase自己的功能)
* 1.创建类,继承BaseRegionObserver
* 2.重写方法(里面有很多方法,得明白重写哪个方法,我们现在是往A表中插入数据的时候同时也往B表中写入数据):postPut
* 3.实现逻辑
* a.增加 rickyin:student 的数据
* b.同时增加 rickyin:user 的数据
* 4.将项目打包h后上传到hbase中,让hbase可以识别协处理器
*/
public class InsertRickStudentCoprocesser extends BaseRegionObserver {
// prePut:Put之前操作
// doPut: 正在put
// postPut:Put之后
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
//获取表
//table
Table table = e.getEnvironment().getTable(TableName.valueOf("rickyin:user"));

//增加数据
table.put(put);

//关闭表
table.close();
}
}