使用 MapReduce 导入导出 MySQL 表数据

教程 青牛 ⋅ 于 2017-01-03 18:01:03 ⋅ 6679 阅读

Hadoop 实现了对于关系型数据库导入导出数据的支持,就是利用DBInputFormat 和DBOutputFormat。对于需要操作关系数据的任务是可以直接使用的。接下来我们演示一下:

假设需求场景

我们在mysql中有两种表,一个是user表,记录用户信息,一个是用户数据表记录用户的工资。建表语句如下:

CREATE TABLE user (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(64) NOT NULL
);

CREATE TABLE user_dat (
  id INTEGER NOT NULL PRIMARY KEY,
  salary INTEGER DEFAULT '0'
);

插入几条测试数据:

mysql> insert into user(id,name) values(1,'zhangsan'),(2,'lisi'),(3,'wangwu');
Query OK, 3 rows affected (0.00 sec)CREATE TABLE user (

Records: 3  Duplicates: 0  Warnings: 0

mysql> select * from user;
+----+----------+
| id | name     |
+----+----------+
|  1 | zhangsan |
|  2 | lisi     |
|  3 | wangwu   |
+----+----------+
3 rows in set (0.00 sec)

我们假设把user表的数据导出,然后计算工资,然后把结果导入到user_dat表,这样我们就能体验DB的导入和导出操作。

具体实现

定义一个user序列化类型,存储导入的user数据,需要实现 Writable 和DBWritable两个接口,代码如下:

package com.hainiubl.hadoop.demo;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class DbUserWritable implements Writable, DBWritable {
    private int id;
    private String name;
    # DBWritable的方法,设置操作statement
    public void write(PreparedStatement statement) throws SQLException {
        statement.setInt(1, id);
        statement.setString(2, name);

    }
       # DBWritable的方法,把数据库记录转换成自定义对象
    public void readFields(ResultSet resultSet) throws SQLException {
        id = resultSet.getInt(1);
        name = resultSet.getString(2);

    }

    public void write(DataOutput out) throws IOException {
        out.writeInt(id);
        out.writeUTF(name);
    }

    public void readFields(DataInput in) throws IOException {
        id = in.readInt();
        name = in.readUTF();
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + id;
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (!(obj instanceof DbUserWritable)) {
            return false;
        }
        DbUserWritable other = (DbUserWritable) obj;
        if (id != other.id) {
            return false;
        }
        if (name == null) {
            if (other.name != null) {
                return false;
            }
        } else if (!name.equals(other.name)) {
            return false;
        }
        return true;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

定义一个userdat序列化类型,存储导出到user_dat表的数据,需要实现 WritableComparable 和DBWritable两个接口,这个类型要作为输出key,所以要实现Comparable 接口 ,代码如下:

package com.hainiubl.hadoop.demo;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class DbUserDatWritable implements WritableComparable<DbUserDatWritable>, DBWritable {

    private int id;
    private int salary;

    public void write(PreparedStatement statement) throws SQLException {
        statement.setInt(1, id);
        statement.setInt(2, salary);

    }

    public void readFields(ResultSet resultSet) throws SQLException {
        id = resultSet.getInt(1);
        salary = resultSet.getInt(2);

    }

    public void write(DataOutput out) throws IOException {
        out.writeInt(id);
        out.writeInt(salary);
    }

    public void readFields(DataInput in) throws IOException {
        id = in.readInt();
        salary = in.readInt();
    }

    public int compareTo(DbUserDatWritable o) {
        if (id != o.id){
            return (id < o.id ? -1 : (id == o.id ? 0 : 1));
        }

        return (salary < o.salary ? -1 : (salary == o.salary ? 0 : 1));
    }

    public static class Comparator extends WritableComparator {
        public Comparator() {
            super(DbUserDatWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
              int thisValue = readInt(b1, s1);
              int thatValue = readInt(b2, s2);
              if(thisValue != thatValue){
                  return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
              }

              thisValue = readInt(b1, s1 + 4);
              thatValue = readInt(b2, s2 + 4);

              return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
        }
    }

    static { // register this comparator
        WritableComparator.define(DbUserDatWritable.class, new Comparator());
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + id;
        result = prime * result + salary;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (!(obj instanceof DbUserDatWritable)) {
            return false;
        }
        DbUserDatWritable other = (DbUserDatWritable) obj;
        if (id != other.id) {
            return false;
        }
        if (salary != other.salary) {
            return false;
        }
        return true;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getSalary() {
        return salary;
    }

    public void setSalary(int salary) {
        this.salary = salary;
    }

}

Job实现代码:

package com.hainiubl.hadoop.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DbIoJob extends Configured implements Tool {

    public static class DbInputMapper extends Mapper<LongWritable,DbUserWritable,DbUserDatWritable,NullWritable>{

        private DbUserDatWritable userdat = new DbUserDatWritable();

        @Override
        protected void map(LongWritable key, DbUserWritable value,
                Mapper<LongWritable, DbUserWritable, DbUserDatWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {
            userdat.setId(value.getId());
            userdat.setSalary(value.getId()*10);//随便设置一个整数,没有实际意义

            context.write(userdat,NullWritable.get());
        }

    }

    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(getConf(), "DbIoJob");
        job.setJarByClass(DbIoJob.class);
        job.setMapperClass(DbInputMapper.class);
        job.setOutputKeyClass(DbUserDatWritable.class);
        job.setOutputValueClass(NullWritable.class);
               #设置输入输出格式
        job.setInputFormatClass(DBInputFormat.class);
        job.setOutputFormatClass(DBOutputFormat.class);

              # 设置数据库连接信息
         DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver",  
                 "jdbc:mysql://127.0.0.1:3306/testdb", "root", "123456");  
         DBInputFormat.setInput(job,DbUserWritable.class,"select id,name from user","select count(1) from user"); 
         DBOutputFormat.setOutput(job, "user_dat", "id","salary");

        return job.waitForCompletion(true) ? 0 :1 ;
    }

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        int res = ToolRunner.run(new DbIoJob(), args);
        System.exit(res);
        System.out.println("done");

    }

}

运行

hadoop jar ./demo.jar  com.hainiubl.hadoop.demo.DbIoJob  -libjars ./lib/mysql-connector-java-5.1.39-bin.jar 

mysql结果:

mysql> select * from user_dat;
+----+--------+
| id | salary |
+----+--------+
|  1 |     10 |
|  2 |     20 |
|  3 |     30 |
+----+--------+
3 rows in set (0.00 sec)
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-青牛,http://hainiubl.com/topics/32
本帖已被设为精华帖!
本帖由 海牛博士 于 7年前 加精
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter