1 sqoop原理
1.1 sqoop介绍
Sqoop是Apache旗下的一款“hadoop和关系型数据库服务器之间传送数据”的工具。
导入数据:MySQL、Oracle导入数据到hadoop的hdfs、hive、hbase等数据存储系统。
导出数据:从hadoop的文件系统中导出数据到关系型数据库中。
1.2 sqoop架构
- 导入流程
- 首先通过jdbc读取关系型数据库元数据信息,获取到表结构。
- 根据元数据信息生成Java类。
- 启动import程序,通过jdbc读取关系型数据库数据,并通过上一步的Java类进行序列化。
- MapReduce并行写数据到Hadoop中,并使用Java类进行反序列化。
- 导出流程
- sqoop通过jdbc读取关系型数据库元数据,获取到表结构信息,生成Java类,用于序列化。
- MapReduce并行读取hdfs数据,并且通过Java类进行序列化。
- export程序启动,通过Java类反序列化,同时启动多个map,通过jdbc将数据写入到关系型数据库中。
2 cdh部署sqoop
1)添加服务
2)添加gateway节点
3)完成效果
4)测试sqoop
# hdfs认证
kinit hdfs
# shell 里执行 sqoop 命令
sqoop help
查看,说明sqoop安装完成
3 sqoop常用参数
安全环境下操作需要做安全认证
- 常用命令
命令名称 | 对应类 | 命令说明 |
---|---|---|
import | ImportTool | 将关系型数据库数据导入到HDFS、HIVE、HBASE |
export | ExportTool | 将HDFS上的数据导出到关系型数据库 |
codegen | CodeGenTool | 获取数据库中某张表数据生成Java并打成Jar包 |
create-hive-table | CreateHiveTableTool | 创建hive的表 |
eval | EvalSqlTool | 查看SQL的执行结果 |
list-databases | ListDatabasesTool | 列出所有数据库 |
list-tables | ListTablesTool | 列出某个数据库下的所有表 |
help | HelpTool | 打印sqoop帮助信息 |
version | VersionTool | 打印sqoop版本信息 |
- 连接参数列表
Argument | Description |
---|---|
--connect <jdbc-uri> |
Specify JDBC connect string 指定JDBC连接字符串 |
--connection-manager <class-name> |
Specify connection manager class to use 指定要使用的连接管理器类 |
--driver <class-name> |
Manually specify JDBC driver class to use 指定要使用的JDBC驱动类 |
--hadoop-mapred-home <dir> |
Override $HADOOP_MAPRED_HOME 指定$HADOOP_MAPRED_HOME路径 |
--help |
Print usage instructions 帮助信息 |
--password-file |
Set path for a file containing the authentication password 设置用于存放认证的密码信息文件的路径 |
-P |
Read password from console 从控制台读取输入的密码 |
--password <password> |
Set authentication password 设置认证密码 |
--username <username> |
Set authentication username 设置认证用户名 |
--verbose |
Print more information while working 打印运行信息 |
--connection-param-file <filename> |
Optional properties file that provides connection parameters 指定存储数据库连接参数的属性文件 |
- 连接MySQL示例
# 查询数据库列表 对标show databases
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root --password 12345678
mysql -uroot -p12345678
# 如果想连接 jdbc:mysql://worker-1:3306, 需要创建远程root访问权限
# 创建远程root用户
CREATE USER 'root'@'%' IDENTIFIED BY '12345678';
# 给远程root用户增加数据库权限
grant all privileges on *.* to 'root'@'%' identified by '12345678';
# 更新
flush privileges;
sqoop list-databases --connect jdbc:mysql://worker-1:3306/ --username root --password 12345678
# 查询指定库下面所有表 对标show tables in cm
sqoop list-tables --connect jdbc:mysql://worker-1:3306/cm --username root --password 12345678
4 sqoop应用
4.1 准备测试数据
应用场景:
使用sqoop上传字典表数据到hive中与我们的数据进行关联查询。
以 商品表 为例:
-- 创建sqoop_db 数据库
create database sqoop_db default charset utf8 collate utf8_general_ci;
-- 导入SQL文件
mysql -uroot -P3306 -p12345678 sqoop_db < /tmp/goods_table.sql
4.2 eval 查看 sql 查询结果
# 没有where条件
sqoop eval \
--connect jdbc:mysql://worker-1:3306/sqoop_db \
--username root \
--password 12345678 \
--query "select * from goods_table limit 10"
4.3 create-hive-table创建hive表
# 基于MySQL表创建hive表
# 需要认证以及拥有hive建表权限
kinit hive
sqoop create-hive-table \
--connect jdbc:mysql://worker-1:3306/sqoop_db \
--username root \
--password 12345678 \
--table goods_table \
--hive-table xinniu.goods_table
4.4 多map条件查询导入HDFS
语法 :
sqoop import \
--connect 数据库连接字符串 \
--username 数据库用户名 \
--password 数据库密码 \
--target-dir HDFS位置 \
--delete-target-dir \
--fields-terminated-by "\t" \
--num-mappers 3 \
--split-by 切分数据依据 \
--query 'select SQL where 查询条件 and $CONDITIONS'
参数解释 :
--query或--e 将查询结果的数据导入,使用时必须伴随参--target-dir,--hive-table,如果查询中有where条件,则条件后必须加上$CONDITIONS关键字
当sqoop使用--query+sql执行多个maptask并行运行导入数据时,每个maptask将执行一部分数据的导入,原始数据需要使用'--split-by 某个字段'来切分数据,不同的数据交给不同的maptask去处理。maptask执行sql副本时,需要在where条件中添加$CONDITIONS条件,这个是linux系统的变量,可以根据sqoop对边界条件的判断,来替换成不同的值,这就是说若split-by id,则sqoop会判断id的最小值和最大值判断id的整体区间,然后根据maptask的个数来进行区间拆分,每个maptask执行一定id区间范围的数值导入任务,如下为示意图。
4.3.1 导入文本文件
#用xinniu认证
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--target-dir /data/xinniu/sqoop/data/goods_1 \
--delete-target-dir \
--fields-terminated-by "\001" \
--num-mappers 4 \
--split-by id \
--query 'select * from goods_table where id < 10 and $CONDITIONS'
查询结果 :
4.3.2 导入其他格式文件
# 导入不同格式,支持格式as-avrodatafile、as-binaryfile、as-parquetfile、as-sequencefile、as-textfile(默认格式)
# 多次导入时会报jar包已存在错误,请忽略,原因为sqoop读取源数据的schema文件创建的jar在前几次任务中已经创建了。
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--target-dir /user/xinniu/sqoop/data/goods_2_parquet \
--delete-target-dir \
--as-parquetfile \
--num-mappers 4 \
--split-by id \
--query 'select * from goods_table where id < 10 and $CONDITIONS'
结果:
4.5 导入hive表
4.5.1 导入文本表
# hive认证
kinit -kt /data/hive.keytab hive
# 导入命令
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table xinniu.goods_table1
上面过程分为两步:
1)第一步将数据导入到HDFS,默认的临时目录是/user/当前操作用户/mysql表名;
2)第二步将导入到HDFS的数据迁移到Hive表,如果hive表不存在,sqoop会自动创建内部表;(我们的是在/user/xinniu/goods_table,通过查看job的configuration的outputdir属性得知)
结果:
查询数据:
4.5.2 导入其他格式表
# hive认证
kinit -kt /data/hive.keytab hive
# 导入命令
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--num-mappers 1 \
--delete-target-dir \
--as-parquetfile \
--hive-import \
--hive-overwrite \
--hive-table xinniu.goods_table_parquet
结果:
4.6 import to hbase
# 用hdfs认证
kinit -kt /data/hdfs.keytab hdfs
hadoop fs -mkdir /user/hbase
hadoop fs -chown hbase /user/hbase
# 用hbase认证
kinit -kt /data/hbase.keytab hbase
# sqoop导入hbase
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--hbase-create-table \
--hbase-table xinniu:goods_table \
--column-family cf \
--hbase-row-key id
# --hbase-row-key: 要求MySQL表必须有主键,将主键作为rowkey,标识一行
导入后,查看:
5 应用实例一
sqoop_db 库导入 comm_area 表和数据
导入后查看:
前提:
# 给 impala 赋予hive的所有权限
grant role admin_role to group impala;
# sqoop_db库导入comm_area表
# 创建linux导出目录
/data/xinniu/extract
1) mysql --> hdfs目录 (sqoop)
2)创建hive外表指向 hdfs目录(临时表作用,先删除,后创建) (impala)
3)创建分区表(只创建一次),分区:batch_date (impala)
4) 先删除分区,再创建分区并查询导入(目的:可以使得脚本能重复执行) (impala)
vim area_op.sh
# 获取batch_date,比如:今天20211010, 那batch_date是20211009
batch_date=`date -d 1' day ago' +%Y%m%d`
# xinniu认证
kinit -kt /data/xinniu.keytab xinniu
# 用sqoop,查询表数据导入到hdfs上
# -Dorg.apache.sqoop.splitter.allow_text_splitter=true: --split-by的是字符串也可以
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://worker-1:3306/sqoop_db \
--username root \
--password 12345678 \
--target-dir /user/xinniu/comm_area/${batch_date}/ \
--delete-target-dir \
--fields-terminated-by "\t" \
--split-by area_code \
--query 'select comm_area.* from comm_area where $CONDITIONS'
# $?: 返回上个命令的结果, 0:成功, 非0:失败
res=$?
if [ ${res} != 0 ];then
echo 'extract comm_area error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'extract comm_area successful '`date` >> /data/xinniu/extract/comm_area.log
fi
# impala认证
kinit -kt /data/impala.keytab impala
# 用impala-shell, 创建hive表tmp.comm_area(临时表作用),并指向导入的hdfs目录
impala-shell -k -q "set sync_ddl = true;drop table if exists tmp.comm_area;create external table tmp.comm_area (
area_code string,
area_cname string,
area_ename string
)
row format delimited fields terminated by '\t'
location '/user/xinniu/comm_area/${batch_date}';
set sync_ddl = false;"
res=$?
if [ ${res} != 0 ];then
echo 'create comm_area tmp table error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'create comm_area tmp table successful '`date` >> /data/xinniu/extract/comm_area.log
fi
# impala认证
kinit -kt /data/impala.keytab impala
# 用impala-shell,创建hive分区表 itl.comm_area
impala-shell -k -q"set sync_ddl = true;create table if not exists itl.comm_area (
area_code string,
area_cname string,
area_ename string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
set sync_ddl = false;"
res=$?
if [ ${res} != 0 ];then
echo 'create comm_area itl table error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'create comm_area itl table successful '`date` >> /data/xinniu/extract/comm_area.log
fi
# 将临时表中的数据导入到 itl.comm_area 表中
kinit -kt /data/impala.keytab impala
impala-shell -k -q "set sync_ddl = true;alter table itl.comm_area drop if exists partition (pt = '${batch_date}');set sync_ddl = false;"
impala-shell -k -q "set sync_ddl = true;insert into table itl.comm_area partition (pt) select a.*,'${batch_date}' from tmp.comm_area a;set sync_ddl = false;"
res=$?
if [ ${res} != 0 ];then
echo 'load comm_area data to itl table error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'load comm_area data to itl table successful '`date` >> /data/xinniu/extract/comm_area.log
fi
6 应用实例二
vim goods_op.sh
kinit -kt /data/xinniu.keytab xinniu
batch_date=$1
# --hive-drop-import-delims: 在导入数据到hive时,去掉数据中的\r\n\013\010这样的字符
sqoop import \
--connect jdbc:mysql://worker-1:3306/sqoop_db \
--username root \
--password 12345678 \
--target-dir /user/xinniu/goods_table/2022-08-24/ \
--hive-drop-import-delims \
--delete-target-dir \
--fields-terminated-by "\t" \
--split-by Id \
--query 'select * from goods_table where $CONDITIONS'
res=$?
if [ ${res} != 0 ];then
echo 'extract goods_table error! '`date` >> /data/xinniu/extract/goods_table.log
exit 1
else
echo 'extract goods_table successful '`date` >> /data/xinniu/extract/goods_table.log
fi
kinit -kt /data/impala.keytab impala
impala-shell -k -q "set sync_ddl = true;drop table if exists tmp.goods_table;create external table tmp.goods_table (
Id string,
goods_sn string,
goods_cname string,
goods_ename string,
goods_price string
)
row format delimited fields terminated by '\t'
location '/user/xinniu/goods_table/2022-08-24';
set sync_ddl = false;"
res=$?
if [ ${res} != 0 ];then
echo 'create goods_table tmp table error! '`date` >> /data/xinniu/extract/goods_table.log
exit 1
else
echo 'create goods_table tmp table successful '`date` >> /data/xinniu/extract/goods_table.log
fi
kinit -kt /data/impala.keytab impala
impala-shell -k -q"set sync_ddl = true;create table if not exists itl.goods_table (
Id string,
goods_sn string,
goods_cname string,
goods_ename string,
goods_price string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
set sync_ddl = false;
"
res=$?
if [ ${res} != 0 ];then
echo 'create goods_table itl table error! '`date` >> /data/xinniu/extract/goods_table.log
exit 1
else
echo 'create goods_table itl table successful '`date` >> /data/xinniu/extract/goods_table.log
fi
# 加载数据到itl层 跑批日期使用参数传递
kinit -kt /data/impala.keytab impala
impala-shell -k -q "set sync_ddl = true;alter table itl.goods_table drop if exists partition (pt = '2022-08-24');set sync_ddl = false;"
impala-shell -q "set sync_ddl = true;insert into table itl.goods_table partition (pt) select a.*,'2022-08-24' from tmp.goods_table a;set sync_ddl = false;"
res=$?
if [ ${res} != 0 ];then
echo 'load goods_table data to itl table error! '`date` >> /data/xinniu/extract/goods_table.log
exit 1
else
echo 'load goods_table data to itl table successful '`date` >> /data/xinniu/extract/goods_table.log
fi
执行时,需要从外界将日期传递过来
# 给脚本添加执行权
chmod a+x goods_op.sh
# 执行脚本
sh -x goods_op.sh 20211010