Hadoop 实现了对于关系型数据库导入导出数据的支持,就是利用DBInputFormat 和DBOutputFormat。对于需要操作关系数据的任务是可以直接使用的。接下来我们演示一下:
假设需求场景
我们在mysql中有两种表,一个是user表,记录用户信息,一个是用户数据表记录用户的工资。建表语句如下:
CREATE TABLE user (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(64) NOT NULL
);
CREATE TABLE user_dat (
id INTEGER NOT NULL PRIMARY KEY,
salary INTEGER DEFAULT '0'
);
插入几条测试数据:
mysql> insert into user(id,name) values(1,'zhangsan'),(2,'lisi'),(3,'wangwu');
Query OK, 3 rows affected (0.00 sec)CREATE TABLE user (
Records: 3 Duplicates: 0 Warnings: 0
mysql> select * from user;
+----+----------+
| id | name |
+----+----------+
| 1 | zhangsan |
| 2 | lisi |
| 3 | wangwu |
+----+----------+
3 rows in set (0.00 sec)
我们假设把user表的数据导出,然后计算工资,然后把结果导入到user_dat表,这样我们就能体验DB的导入和导出操作。
具体实现
定义一个user序列化类型,存储导入的user数据,需要实现 Writable 和DBWritable两个接口,代码如下:
package com.hainiubl.hadoop.demo;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DbUserWritable implements Writable, DBWritable {
private int id;
private String name;
# DBWritable的方法,设置操作statement
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, id);
statement.setString(2, name);
}
# DBWritable的方法,把数据库记录转换成自定义对象
public void readFields(ResultSet resultSet) throws SQLException {
id = resultSet.getInt(1);
name = resultSet.getString(2);
}
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(name);
}
public void readFields(DataInput in) throws IOException {
id = in.readInt();
name = in.readUTF();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + id;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof DbUserWritable)) {
return false;
}
DbUserWritable other = (DbUserWritable) obj;
if (id != other.id) {
return false;
}
if (name == null) {
if (other.name != null) {
return false;
}
} else if (!name.equals(other.name)) {
return false;
}
return true;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
定义一个userdat序列化类型,存储导出到user_dat表的数据,需要实现 WritableComparable 和DBWritable两个接口,这个类型要作为输出key,所以要实现Comparable 接口 ,代码如下:
package com.hainiubl.hadoop.demo;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DbUserDatWritable implements WritableComparable<DbUserDatWritable>, DBWritable {
private int id;
private int salary;
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, id);
statement.setInt(2, salary);
}
public void readFields(ResultSet resultSet) throws SQLException {
id = resultSet.getInt(1);
salary = resultSet.getInt(2);
}
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeInt(salary);
}
public void readFields(DataInput in) throws IOException {
id = in.readInt();
salary = in.readInt();
}
public int compareTo(DbUserDatWritable o) {
if (id != o.id){
return (id < o.id ? -1 : (id == o.id ? 0 : 1));
}
return (salary < o.salary ? -1 : (salary == o.salary ? 0 : 1));
}
public static class Comparator extends WritableComparator {
public Comparator() {
super(DbUserDatWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int thisValue = readInt(b1, s1);
int thatValue = readInt(b2, s2);
if(thisValue != thatValue){
return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
}
thisValue = readInt(b1, s1 + 4);
thatValue = readInt(b2, s2 + 4);
return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
}
}
static { // register this comparator
WritableComparator.define(DbUserDatWritable.class, new Comparator());
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + id;
result = prime * result + salary;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof DbUserDatWritable)) {
return false;
}
DbUserDatWritable other = (DbUserDatWritable) obj;
if (id != other.id) {
return false;
}
if (salary != other.salary) {
return false;
}
return true;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getSalary() {
return salary;
}
public void setSalary(int salary) {
this.salary = salary;
}
}
Job实现代码:
package com.hainiubl.hadoop.demo;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DbIoJob extends Configured implements Tool {
public static class DbInputMapper extends Mapper<LongWritable,DbUserWritable,DbUserDatWritable,NullWritable>{
private DbUserDatWritable userdat = new DbUserDatWritable();
@Override
protected void map(LongWritable key, DbUserWritable value,
Mapper<LongWritable, DbUserWritable, DbUserDatWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
userdat.setId(value.getId());
userdat.setSalary(value.getId()*10);//随便设置一个整数,没有实际意义
context.write(userdat,NullWritable.get());
}
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "DbIoJob");
job.setJarByClass(DbIoJob.class);
job.setMapperClass(DbInputMapper.class);
job.setOutputKeyClass(DbUserDatWritable.class);
job.setOutputValueClass(NullWritable.class);
#设置输入输出格式
job.setInputFormatClass(DBInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
# 设置数据库连接信息
DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver",
"jdbc:mysql://127.0.0.1:3306/testdb", "root", "123456");
DBInputFormat.setInput(job,DbUserWritable.class,"select id,name from user","select count(1) from user");
DBOutputFormat.setOutput(job, "user_dat", "id","salary");
return job.waitForCompletion(true) ? 0 :1 ;
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
int res = ToolRunner.run(new DbIoJob(), args);
System.exit(res);
System.out.println("done");
}
}
运行
hadoop jar ./demo.jar com.hainiubl.hadoop.demo.DbIoJob -libjars ./lib/mysql-connector-java-5.1.39-bin.jar
mysql结果:
mysql> select * from user_dat;
+----+--------+
| id | salary |
+----+--------+
| 1 | 10 |
| 2 | 20 |
| 3 | 30 |
+----+--------+
3 rows in set (0.00 sec)
本帖已被设为精华帖!