Hbase的计算和bulkload
15.mr操作Hbase
15.1 mr读取hbase数据
首先在hbase中准备数据
# 创建分数表
create 'score','info'
# 增加学生数据
put 'score','001','info:name','zhangsan'
put 'score','001','info:score','100'
put 'score','001','info:class','1'
put 'score','002','info:name','lisi'
put 'score','002','info:score','95'
put 'score','002','info:class','1'
put 'score','003','info:name','wangwu'
put 'score','003','info:score','98'
put 'score','003','info:class','2'
put 'score','004','info:name','zhaosi'
put 'score','004','info:score','92'
put 'score','004','info:class','2'
下面读取hbase的案例,使用mr读取数据并且求出每个班级的平均分
读取HBASE中的数据需要继承TableMapper类
class HMapper extends TableMapper<outkey,outvalue>
获取score表的数据然后输出到reducer端进行分组,将班级作为key,然后这班级的数据都会按照key分在一起,我们就可以计算出来这个班级的平均分了
首先我们引入maven依赖
在hbase中去除hadoop的所有依赖,这样就不会出现冲突问题
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hainiu.hbase</groupId>
<artifactId>TestHBase</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.4.13</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.4.13</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.13</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
</project>
整体代码如下:
package com.hainiu.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class MapreduceRead {
public static class HMapper extends TableMapper<Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable();
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
byte[] classBytes = value.getValue(Bytes.toBytes("info"), Bytes.toBytes("class"));
byte[] scoreBytes = value.getValue(Bytes.toBytes("info"), Bytes.toBytes("score"));
k.set(Bytes.toString(classBytes));
String score = Bytes.toString(scoreBytes);
v.set(Integer.valueOf(score));
System.out.println(k);
System.out.println(v);
context.write(k,v);
}
}
public static class HReducer extends Reducer<Text,IntWritable,Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
for (IntWritable value : values) {
sum += value.get();
count ++;
}
double avg = sum * 1.0 / count;
context.write(key,new DoubleWritable(avg));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
// conf.set("HADOOP_HOME","/hadoop");
// conf.set("HBASE_HOME","/hbase");
Job job = Job.getInstance(conf);
job.setJarByClass(MapreduceRead.class);
TableMapReduceUtil.initTableMapperJob(
"score",new Scan(),HMapper.class,Text.class,IntWritable.class,job
);
job.setReducerClass(HReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
TextOutputFormat.setOutputPath(job,new Path("res"));
job.waitForCompletion(true);
}
}
结果如下
15.2 mr写出数据到hbase中
首先在本地data文件下面准备数据a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
统计每个单词的出现次数并且将结果存储到hbase的表中
#在hbase中创建存储单词出现次数的表
create 'wordcount','info'
# 存储数据的时候rowkey设定为单词,info:count 记录单词出现次数
这个时候要存储数据到hbase中那么我们需要在reducer中增加TableReducer的类用于插入hbase中数据
class HReducer extends TableReducer
整体代码如下:
package com.hainiu.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
public class MapreduceWrite {
public static class HMapper extends Mapper<LongWritable,Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split(" ");
for (String str : strs) {
k.set(str);
context.write(k,v);
}
}
}
public static class HReducer extends TableReducer<Text,IntWritable, NullWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += 1;
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"),Bytes.toBytes(sum));
context.write(NullWritable.get(),put);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf);
job.setJarByClass(MapreduceWrite.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(HMapper.class);
TextInputFormat.addInputPath(job,new Path("data/a.txt"));
TableMapReduceUtil.initTableReducerJob("wordcount",HReducer.class,job);
job.waitForCompletion(true);
}
}
查看hbase中的数据
16.hive操作hbase
首先我们的需要安装hive但是这个hive已经在我们准备的分布式环境中存在了,我们可以直接使用,想学习hive的同学可以去海牛的hive教程中具体学习,我们这边不做概述
一般在查询hbase的数据的时候我们可以直接使用hbase的命令行或者是api进行查询就行了,但是在日常的计算过程中我们一般都不是为了查询,都是在查询的基础上进行二次计算,所以使用hbase的命令是没有办法进行数据计算的,并且对于hbase的压力也会增加很多,hbase的本身并没有提供任何的计算逻辑,所以我们要依赖于mapreducer进行计算,这个代码上面我们已经实现过了,但是后续开发过程中很少有人会直接开发mr程序,这个代码的复杂程度比较高,并且会非常大的拖慢我们的开发速度,所以一般我们都会使用hive以外表的形式操作hbase中的数据,进行多表的管理查询计算或者是进行数据的导入和导出
首先在hive中增加hbase的链接信息
修改hive-site.xml中的值
<property>
<name>hive.zookeeper.quorum</name>
<value>nn1,nn2,s1</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
/usr/local/hadoop/etc/hadoop/mapred-site.xml
<property>
<name>hbase.zookeeper.quorum</name>
<value>nn1,nn2,s1</value>
</property>
在hive/conf目录中增加log4j.properties文件输入日志级别设置
log4j.rootLogger=error,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n
然后启动hive就可以直接连接hbase了
16.1 创建hive的内部表
create table student_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "student_hbase");
# 删除hive中的表
drop table student_hive;
# 内部表在删除的时候hbase的表也会被删除
16.2创建外部表
有的时候在hbase中已经存在一个表并且其中存在数据,我们需要使用hive进行分析,那么我们就需要创建一个外部表进行映射
# 首先在hbase中创建表
create 'student_hbase','info'
# 增加数据
put 'student_hbase','1','info:name','zhangsan'
put 'student_hbase','1','info:age','20'
put 'student_hbase','2','info:name','lisi'
put 'student_hbase','2','info:age','30'
# 这个时候就需要创建外部表进行映射
create external table student_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "student_hbase");
删除表,因为hive对应的是外部表所以hbase的表不会被删除掉
drop table student_hive;
16.3 关联计算表的值
hbase中创建工资表
#创建salary工资表
create 'salary','info'
put 'salary','001','info:id','1'
put 'salary','002','info:id','1'
put 'salary','003','info:id','1'
put 'salary','004','info:id','2'
put 'salary','005','info:id','2'
put 'salary','006','info:id','2'
put 'salary','001','info:salary','1000'
put 'salary','002','info:salary','2000'
put 'salary','003','info:salary','3000'
put 'salary','004','info:salary','4000'
put 'salary','005','info:salary','5000'
put 'salary','006','info:salary','6000'
#创建hive的表映射
create external table salary_hive(salary_id string,id int,salary int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:id,info:salary")
TBLPROPERTIES ("hbase.table.name" = "salary");
现在实现关联查询,每个用户的平均工资是多少,以及人名
select a.name,avg(b.salary) as avg
from student_hive a join salary_hive b
on a.id = b.id
group by a.name
可以根据计算得出最终结果
16.4 hbase的数据导入导出
hbase的数据导出
# 使用hive的导出命令可以直接导出数据
insert overwrite local directory '/home/hadoop/salary.txt' select * from salary_hive;
可以通过外表的形式直接将数据导出到文件夹中
结果数据查看
默认使用^A进行文件分割
导入数据
不能会用hive的load方式直接将数据导入到hbase中,但是可以通过中间表的形式导入进行
# 首先在本地创建teacher.txt 输入以下内容
1,yeniu,20
2,xinniu,30
3,qingniu,35
# 在hive中创建临时表
create table teacher_tmp(id int,name string,age int)
row format delimited fields terminated by ',';
# 将数据加载到临时表中
load data local inpath '/home/hadoop/teacher.txt' into table teacher_tmp;
# 创建和hbase的外部映射表
create table teacher_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "teacher_hbase");
#从临时表使用mr将数据导入到hbase中
insert into teacher_hive select * from teacher_tmp;
数据导入成功
17.hbase的bulkload
在大数据的场景计算中,有时候我们会遇见将大量数据一次性导入到hbase的情况,但是这个时候hbase是不能够容纳的,因为插入的数据首先会进入到memstore中如果大量插入数据会造成memstore的内存压力急剧增大,这个时候机器的其他进程是没有办法执行的,并且还会出现非常严重的问题,比如hbase在大量插入数据的时候首先这个region会急剧增加,后续region会按照拆分策略进行region拆分,当前region下线,插入程序会直接卡死造成hbase宕机等严重问题,为了解决这个问题,hbase给用户提供了一种新的插入数据的方式bulkload方式,这个方式中会跳过hbase本身的过程,首先在使用hbase的提供的mapreduce程序按照插入数据的格式和hbase的表格式生成hfile文件,然后我们将hfile文件一次性插入到hbase对应的hdfs的文件夹中,这种方式是最快捷并且对于hbase的压力最小的方式
过程如下:
# 首先在本地创建文件a.txt 输入以下内容
1,zhangsan,20
2,lisi,30
3,wangwu,40
5 zhaosi,50
# 然后将数据上传到hdfs中
hdfs dfs -put a.txt /
# 在hbase中创建表
create 't','info'
# 然后将id当成是rowkey,info:name存放名称 info:age存放年龄
执行importTSV方法,产生hfile文件
-Dimporttsv.separator :指定分隔符
-Dimporttsv.columns :指定列映射
HBASE_ROW_KEY强制要求写
cf:pk指定rowkey字段
其他字段与hive表中对应
-Dimporttsv.skip.bad.lines:是否跳过无效行
-Dimporttsv.bulk.output:hfile输出路径
hbase表名
hdfs://worker-1:8020/data/hainiu/t2 :用于生成hfile文件的输入目录
具体执行命令如下:
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.separator=',' \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age \
-Dimporttsv.skip.bad.lines=false \
-Dimporttsv.bulk.output=/t \
default:t hdfs://ns1/a.txt
查看hdfs文件
发现hfile文件已经生成,然后我们将数据导入到hdfs对应的目录中
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /t default:t
扫描结果发现数据已经存在