3.dolphinscheduler 使用

教程 阿布都的都 ⋅ 于 2023-01-07 15:30:55 ⋅ 1611 阅读

1 创建队列

  • 队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。
  • 管理员进入安全中心->队列管理页面,点击“创建队列”按钮,创建队列。

file

2 添加租户

  • 租户对应的是Linux的用户,用于worker提交作业所使用的用户。如果linux没有这个用户,worker会在执行脚本的时候创建这个用户。
  • 租户编码:租户编码是Linux上的用户,唯一,不能重复(添加租户时没创建linux用户,当用该租户执行工作流时会自动创建,如果之前创建过就不创建了)
  • 管理员进入安全中心->租户管理页面,点击“创建租户”按钮,创建租户。

file

3 创建项目

  • 点击"项目管理"进入项目管理页面,点击“创建项目”按钮,输入项目名称,项目描述,点击“提交”,创建新的项目。

file

项目首页

  • 在项目管理页面点击项目名称链接,进入项目首页,如下图所示,项目首页包含该项目的任务状态统计、流程状态统计、工作流定义统计。

file

  • 任务状态统计:在指定时间范围内,统计任务实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数

  • 流程状态统计:在指定时间范围内,统计工作流实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数

  • 工作流定义统计:统计用户创建的工作流定义及管理员授予该用户的工作流定义

4 工作流定义

4.1 创建工作流定义

  • 点击项目管理->工作流->工作流定义,进入工作流定义页面,点击“创建工作流”按钮,进入

    工作流DAG编辑

file

task1--shell:

file

task2--shell:

file

task3-shell:

file

task4--shell:

file

变量 含义
${system.biz.date} 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd,补数据时,该日期 +1
${system.biz.curdate} 日常调度实例定时的定时时间,格式为 yyyyMMdd,补数据时,该日期 +1
${system.datetime} 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss,补数据时,该日期 +1

保存后,点击 工作流定义,进入工作流定义列表

file

4.2 工作流上线

工作流定义完成后,是在“下线”状态,需要点击“上线”

file

4.3 工作流执行

上线之后,可以执行工作流,点击“执行”按钮后,会生成工作流实例 和 任务实例。

工作流实例:

file

任务实例(一个工作流里定义多个任务):

file

4.4 工作流重新编辑

如果已经上线,需要先“下线”,再“编辑”

file

4.5 可对定义的工作流进行定时

在“下线”状态下,可点击“定时”按照需求给工作流定时。

定时完成后,需要点击“定时管理”,把定时上线。

然后再将工作流“上线”,执行工作流,此时是定时执行工作流。

file

定时上线后:

file

5 调度器调度sqoop的shell脚本

5.1 sqoop的shell脚本

vim area_op.sh

# 获取batch_date,比如:今天20211010, 那batch_date是20211009
batch_date=$1
# hive认证
kinit -kt /data/hive.keytab hive
# 用sqoop,查询表数据导入到hdfs上
# -Dorg.apache.sqoop.splitter.allow_text_splitter=true: --split-by的是字符串也可以
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://worker-1:3306/sqoop_db \
--username root \
--password 12345678 \
--target-dir /user/hive/comm_area/${batch_date}/ \
--delete-target-dir \
--fields-terminated-by "\t" \
--split-by area_code \
--query 'select comm_area.* from comm_area where $CONDITIONS'
# $?: 返回上个命令的结果, 0:成功, 非0:失败
res=$?
if [ ${res} != 0 ];then
echo 'extract comm_area error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'extract comm_area successful '`date` >> /data/xinniu/extract/comm_area.log
fi

# impala认证
kinit -kt /data/impala.keytab impala
# 用impala-shell, 创建hive表tmp.comm_area(临时表作用),并指向导入的hdfs目录
impala-shell -k -q "set sync_ddl = true;drop table if exists tmp.comm_area;create external table tmp.comm_area (
area_code string,
area_cname string,
area_ename string
)
row format delimited fields terminated by '\t'
location '/user/hive/comm_area/${batch_date}';
set sync_ddl = false;"

res=$?
if [ ${res} != 0 ];then
echo 'create comm_area tmp table error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'create comm_area tmp table successful '`date` >> /data/xinniu/extract/comm_area.log
fi

# impala认证
kinit -kt /data/impala.keytab impala
# 用impala-shell,创建hive分区表 itl.comm_area
impala-shell -k -q"set sync_ddl = true;create table if not exists itl.comm_area (
area_code string,
area_cname string,
area_ename string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
set sync_ddl = false;"

res=$?
if [ ${res} != 0 ];then
echo 'create comm_area itl table error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'create comm_area itl table successful '`date` >> /data/xinniu/extract/comm_area.log
fi

# 将临时表中的数据导入到 itl.comm_area 表中
kinit -kt /data/impala.keytab impala
impala-shell -k -q "set sync_ddl = true;alter table itl.comm_area drop if exists partition (pt = '${batch_date}');set sync_ddl = false;"
impala-shell -k -q "set sync_ddl = true;insert into table itl.comm_area partition (pt) select a.*,'${batch_date}' from tmp.comm_area a;set sync_ddl = false;"

res=$?
if [ ${res} != 0 ];then
echo 'load comm_area data to itl table error! '`date` >> /data/xinniu/extract/comm_area.log
exit 1
else
echo 'load comm_area data to itl table successful '`date` >> /data/xinniu/extract/comm_area.log
fi

5.2 给脚本加执行权

chmod a+x area_op.sh

5.3 配置调度器关联cdh

当没有配置关联cdh时,调度器日志报找不到 $HADOOP_COMMON_HOME(没配置关联cdh)

file

配置关联cdh

用 dolphinscheduler 用户

vim /opt/soft/dolphinscheduler/conf/env/dolphinscheduler_env.sh 脚本

export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH/lib/hadoop/etc/hadoop

file

5.4 给sqoop 添加 lib jar

由于 sqoop 导入时,会用到json 的jar包, 如果sqoop lib 目录下没有,报如下错误:

file

上传一份到 sqoop的lib 目录。

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-阿布都的都,http://hainiubl.com/topics/76092
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter