公开课回放地址:https://www.bilibili.com/video/BV1wG411Y7dd
酷牛商城商品点击率 1.数据源准备
1.1 酷牛商城启动
启动酷牛商城镜像
地址:http://cloud.hainiubl.com/#/privateImageDetail?id=14167&imageType=private
添加共有镜像kettle、hive、zeepline
1.2 检查酷牛商城配置信息
1.2.1 检查nginx配置文件
#将ip改为coolniushop主机名
vim /usr/local/nginx/conf/sites-enabled/hainiushop.com
#启动nginx
/usr/local/nginx/sbin/nginx
1.2.2 修改项目配置
修改hainiushop_service项目和hainiushop_sso项目的mysql连接,改为你自己的数据库地址
vim /usr/local/apache-tomcat-service-8.0.20/webapps/hainiushop_service/WEB-INF/proxool.xml
vim /usr/local/apache-tomcat-shop-8.0.20/webapps/hainiushop_sso/WEB-INF/proxool.xml
1.2.3 启动tomcat
/usr/local/apache-tomcat-image-8.0.20/bin/startup.sh
/usr/local/apache-tomcat-service-8.0.20/bin/startup.sh
/usr/local/apache-tomcat-shop-8.0.20/bin/startup.sh
# 关闭tomcat
/usr/local/apache-tomcat-image-8.0.20/bin/shutdown.sh
/usr/local/apache-tomcat-service-8.0.20/bin/shutdown.sh
/usr/local/apache-tomcat-shop-8.0.20/bin/shutdown.sh
1.2.4 配置hosts文件
配置coolniushop 和windowss hosts文件如下:
浏览器访问www.hainiushop.com 效果如下:
日志产生目录:
浏览日志:
寻找第四个字段是以search.action?search.goodsTypeId=101开头的数据
点击日志:
1.3 将日志传输到hdfs目录,方便kettle机器进行下载并ETL处理
点击日志每天需要进行剪切备份,并每天重新生成新的文件用来接收新一天的请求
浏览日志在一个文件中
1.3.1 编写shell 脚本
创建shell脚本存放目录
mkdir -p /data/xinniu/coolniu/access/shell
编写公共配置脚本
vim log_cut_config.sh
#! /bin/bash
# 公共的配置
#access.log 日志位置
ACCESS_LOG_PATH=/data/hainiu_shop_access_log/access_shop.log
ACCESS_BASE_PATH=/data/xinniu/coolniu/access
# 输入点击日志备份目录
ACCESS_LOG_BAK_PATH=${ACCESS_BASE_PATH}/shop_bak
#输入service日志备份目录
# flume监控输入目录的根目录
FLUME_INPUT_BASE_PATH=${ACCESS_BASE_PATH}/work
编写剪切脚本
#! /bin/bash
# 将日志切割
echo "-----------start: "`date +%Y%m%d%H%M%S`" -----------"
echo 'step1:加载配置文件log_cut_config'
#确定当前脚本的位置
cd `dirname $0`
shell_base_path=`pwd`
#加载log_cut_config 文件
. ${shell_base_path}/log_cut_config.sh
echo 'step2:校验log_cut_config 文件的param 是否有空的'
#校验log_cut_config 文件的param 是否有空的,如果有,就终止脚本
#1:无效;0:有效
params_invalid=0
if [ "${ACCESS_LOG_PATH}x" == "x" ]; then
params_invalid=1
fi
if [ "${ACCESS_LOG_BAK_PATH}x" == "x" ]; then
params_invalid=1
fi
if [ "${FLUME_INPUT_BASE_PATH}x" == "x" ]; then
params_invalid=1
fi
#如果有参数没配置,就停止脚本
if [ ${params_invalid} -eq 1 ]; then
echo "log_cut_config shell config params error"
exit
fi
echo 'step3:创建需要的目录'
#校验目录存不存在,如果不存在创建
#日志切割工作目录
if [ ! -d ${ACCESS_LOG_BAK_PATH} ]; then
mkdir -p ${ACCESS_LOG_BAK_PATH}
fi
if [ ! -d ${FLUME_INPUT_BASE_PATH} ]; then
mkdir -p ${FLUME_INPUT_BASE_PATH}
fi
TIMESTAMP=`date +%Y%m%d%H%M%S`
shop_file_name=shop_access_${TIMESTAMP}.log
echo 'step4:copy 日志数据到备份的目录'
cp ${ACCESS_LOG_PATH} ${ACCESS_LOG_BAK_PATH}/${shop_file_name}
echo 'step5:mv 日志数据到flume监控的目录(.tmp文件'
mkdir -p ${FLUME_INPUT_BASE_PATH}/shop
mv ${ACCESS_LOG_PATH} ${FLUME_INPUT_BASE_PATH}/shop/access.log.tmp
echo 'step6:mv 日志数据将tmp去掉'
mv ${FLUME_INPUT_BASE_PATH}/shop/access.log.tmp ${FLUME_INPUT_BASE_PATH}/shop/access.log
echo 'step7:kill -USR nginx master进程,让nginx重新生成日志'
PID=`ps -aux | grep nginx | grep master | awk '{print $2}'`
if [ "${PID}x" != "x" ]; then
kill -USR1 $PID
fi
echo 'step7:删除2天前的备份数据'
#删除2天前的备份文件, 21号的删19号的
delet_date=`date -d 2' day ago' +%Y%m%d`
rm -rf ${ACCESS_LOG_BAK_PATH}/shop_access_${delet_date}*.log
echo "-----------end: "`date +%Y%m%d%H%M%S`" -----------"
1.3.2 安装flume,并配置hadoop环境
将flume安装包解压到到/usr/local目录下
tar -zxvf /public/software/bigdata/apache-flume-1.10.1-bin.tar.gz -C /usr/local/
创建flume的软链接
ln -s /usr/local/apache-flume-1.10.1-bin/ /usr/local/flume
修改flume中log4j2.xml配置文件,将LogFile改为Console
修改flume运行时占用jvm堆内存大小,修改flume-env.sh.template
mv /usr/local/flume/conf/flume-env.sh.template /usr/local/flume/conf/flume-env.sh
#添加
export JAVA_OPTS="-Xms512m -Xmx1024m -Dcom.sun.management.jmxremote"
将hive机器中的hadoop安装包远程拷贝到coolniushop机器的/opt目录下
scp -r /opt/jdk1.8.0_144/ coolniushop:/opt/
scp -r /opt/hadoop-2.7.3/ coolniushop:/opt/
配置环境变量:
export JAVA_HOME=/opt/jdk1.8.0_144/
export PATH=$PATH:$JAVA_HOME/bin
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
export HADOOP_HOME=/opt/hadoop-2.7.3
export HADOOP_CONF_HOME=/opt/hadoop-2.7.3/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
#让环境变量生效
source /etc/profile
1.3.3 编写flume agent将日志收集到hdfs
点击日志文件配置如下:
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#定义source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/data/xinniu/coolniu/access/work/shop
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.deletePolicy = immediate
# 跳过.tmp结尾的文件
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100
#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/data/xinniu/coolniu/etlbefore/%Y%m%d/LOOK_GOODS_DETAIL_INFO
a1.sinks.k1.hdfs.filePrefix = access_LOOK_GOODS_DETAIL_INFO
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
# 当达到50M,文件关闭,创建新文件写入
a1.sinks.k1.hdfs.rollSize = 52428800
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.callTimeout = 0
a1.sinks.k1.hdfs.idleTimeout = 52428800
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
浏览日志如下:
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#定义source
a1.sources.r1.type=taildir
a1.sources.r1.positionFile = /root/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/hainiu_shop_service_log/service.log
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100
#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/data/xinniu/coolniu/etlbefore/%Y%m%d/SHOW_GOODS_BRAND_INFO
a1.sinks.k1.hdfs.filePrefix = access_SHOW_GOODS_BRAND_INFO
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
# 当达到50M,文件关闭,创建新文件写入
a1.sinks.k1.hdfs.rollSize = 52428800
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.callTimeout = 0
a1.sinks.k1.hdfs.idleTimeout = 52428800
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume
#启动两个agent
flume-ng agent -n a1 -c /usr/local/flume/conf/ -f /data/xinniu/coolniu/access/flume/access_shop_log.agent -Dflume.root.logger=INFO,console
flume-ng agent -n a1 -c /usr/local/flume/conf/ -f /data/xinniu/coolniu/access/flume/access_shop_service.agent -Dflume.root.logger=INFO,console
hdfs数据收集如下:
2 kettle服务器数据处理
2.1 安装flume 并配置hadoop
将flume安装包解压到到/usr/local目录下
scp -r coolniushop:/usr/local/apache-flume-1.10.1-bin /usr/local/
创建flume的软链接
ln -s /usr/local/apache-flume-1.10.1-bin/ /usr/local/flume
将hive机器中的hadoop安装包远程拷贝到coolniushop机器的/opt目录下
scp -r coolniushop:/opt/hadoop-2.7.3 /opt/
scp -r coolniushop:/opt/jdk1.8.0_144 /opt/
配置环境变量:
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
export HADOOP_HOME=/opt/hadoop-2.7.3
export HADOOP_CONF_HOME=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH:/opt/kettle/data-integration
#让环境变量生效
source /etc/profile
2.2 kettle ETL处理
#首先确定kettle处理的数据输入路径
mkdir -p /data/xinniu/coolniu/access/input/shop
mkdir -p /data/xinniu/coolniu/access/input/service
mkdir -p /data/xinniu/coolniu/access/shell
2.2.1 编写脚本从hadoop拉去数据文件
#!/bin/bash
batch_date=`date +%Y%m%d`
file_log=/data/xinniu/coolniu/access/input/shop/access.log
file_show=/data/xinniu/coolniu/access/input/service/service.log
if [ -f "$file_log" ];then
rm -rf $file_log
echo "$file_log 已删除"
else
echo "$file_log 不存在可以下载数据了"
fi
if [ -f "$file_show" ];then
rm -rf $file_show
echo "$file_show 已删除"
else
echo "$file_show 不存在可以下载数据了"
fi
hadoop fs -get hdfs://ns1/data/xinniu/coolniu/etlbefore/${batch_date}/SHOW_GOODS_BRAND_INFO/access_SHOW_GOODS_BRAND_INFO* /data/xinniu/coolniu/access/input/service/service.log
hadoop fs -get hdfs://ns1/data/xinniu/coolniu/etlbefore/${batch_date}/LOOK_GOODS_DETAIL_INFO/access_LOOK_GOODS_DETAIL_INFO* /data/xinniu/coolniu/access/input/shop/access.log
2.2.2 kettle ETL 处理流程
点击日志:
文本文件输入:
重要字段筛选:
增加常量:
java代码:
import java.util.Date;
import java.util.Locale;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
//--用来进行日期格式化的 sdf1 日期转化成date类型 sdf2 date类型转化成string类型
SimpleDateFormat sdf1;
SimpleDateFormat sdf2;
// 不能加泛型 保存areacode文件汇总的地址码
List areaList;
//-用来坐标记
boolean first = true;
// 一行执行一次
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// 拿到一行数据
Object[] r = getRow();
// If the row object is null, we are done processing.
//
if (r == null) {
setOutputDone();
return false;
}
if (first) {
// 读第一行时执行
sdf1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
sdf2 = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
first=false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
// 创建要输出的一行
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());
// 获取timestr的值 14/Sep/2020:14:43:38 +0800
String timestr = get(Fields.In, "req_time").getString(r);
//--str 14/Sep/2020:14:43:38
String str = timestr.substring(0, timestr.indexOf(" "));
// timestr.split(" ")[0]
// YYYY-MM-dd HH:mm:ss 格式的数据
//--2023-08-17 12:45:22
String format = "";
try {
Date date = sdf1.parse(str);
format = sdf2.format(date);
} catch (ParseException e) {
// e.printStackTrace();
}
// 给 req_time 字段设置值
get(Fields.Out, "req_time").setValue(outputRow, format);
String reqstr = get(Fields.In, "req_url").getString(r);
reqstr = reqstr.substring(0, reqstr.lastIndexOf(" "));
get(Fields.Out, "req_url").setValue(outputRow, reqstr);
//--http://www.hainiushop.com/search.action?search.goodsTypeId=101&typeListStyle=1
String refer = get(Fields.In, "refer").getString(r);
if(reqstr.startsWith("GET /lookDetail.action") && refer.equals("http://www.hainiushop.com/search.action?search.goodsTypeId=101&typeListStyle=1")){
String[] arr = reqstr.split(" ");
// arr2: [/lookDetail.action?id, 31]
String[] arr2 = arr[1].split("=|&");
if(arr2.length >= 2){
get(Fields.Out, "goods_id").setValue(outputRow, arr2[1]);
}
}
// putRow will send the row on to the default output hop.
//
putRow(data.outputRowMeta, outputRow);
return true;
}
过滤数据:
数据导出:
展示日志:
文件输入
过滤 服 类点击日志:
剪切成json数组:
数据输出:
2.2.3 上传ktr文件到服务器的access目录下,通过pan命令执行
pan.sh -file=/data/xinniu/coolniu/access/access.ktr -level=Detailed
pan.sh -file=/data/xinniu/coolniu/access/access_service.ktr -level=Detailed
观察output目录是否有数据产生:
点击日志:
展示日志:
2.3 编写脚本,备份ETL输出数据
log_cut_config.sh
#! /bin/bash
# 公共的配置
ACCESS_BASE_PATH=/data/xinniu/coolniu/access
# 输入日志的备份目录,只备份最近3天的日志
ACCESS_OUTPUT_SHOP_PATH=${ACCESS_BASE_PATH}/output/look_goods_detail_info
ACCESS_OUTPUT_SERVICE_PATH=${ACCESS_BASE_PATH}/output/show_goods_brand_info
FLUME_INPUT_SHOP_PATH=${ACCESS_BASE_PATH}/work/look_goods_detail_info
FLUME_INPUT_SERVICE_PATH=${ACCESS_BASE_PATH}/work/show_goods_brand_info
ACCESS_OUTPUT_BAK_SHOP_PATH=${ACCESS_BASE_PATH}/bak/look_goods_detail_info
ACCESS_OUTPUT_BAK_SERVICE_PATH=${ACCESS_BASE_PATH}/bak/show_goods_brand_info
log_mv.sh
#! /bin/bash
# 将日志切割
echo "-----------start: "`date +%Y%m%d%H%M%S`" -----------"
echo 'step1:加载配置文件log_cut_config'
#确定当前脚本的位置
cd `dirname $0`
shell_base_path=`pwd`
#加载log_cut_config 文件
. ${shell_base_path}/log_cut_config.sh
echo 'step2:校验log_cut_config 文件的param 是否有空的'
#校验log_cut_config 文件的param 是否有空的,如果有,就终止脚本
#1:无效;0:有效
params_invalid=0
if [ "${ACCESS_OUTPUT_SHOP_PATH}x" == "x" ]; then
params_invalid=1
fi
if [ "${ACCESS_OUTPUT_SERVICE_PATH}x" == "x" ]; then
params_invalid=1
fi
if [ "${FLUME_INPUT_SHOP_PATH}x" == "x" ]; then
params_invalid=1
fi
if [ "${FLUME_INPUT_SERVICE_PATH}x" == "x" ]; then
params_invalid=1
fi
if [ "${ACCESS_OUTPUT_BAK_SHOP_PATH}x" == "x" ]; then
params_invalid=1
fi
if [ "${ACCESS_OUTPUT_BAK_SERVICE_PATH}x" == "x" ]; then
params_invalid=1
fi
#如果有参数没配置,就停止脚本
if [ ${params_invalid} -eq 1 ]; then
echo "log_cut_config shell config params error"
exit
fi
mkdir -p ${ACCESS_OUTPUT_SHOP_PATH}
mkdir -p ${ACCESS_OUTPUT_SERVICE_PATH}
mkdir -p ${FLUME_INPUT_SHOP_PATH}
mkdir -p ${FLUME_INPUT_SERVICE_PATH}
mkdir -p ${ACCESS_OUTPUT_BAK_SHOP_PATH}
mkdir -p ${ACCESS_OUTPUT_BAK_SERVICE_PATH}
echo 'step4:copy 结构化数据到flume监控的目录(.tmp文件)+ 修改777'
cp ${ACCESS_OUTPUT_SHOP_PATH}/look_goods_detail_info.csv ${FLUME_INPUT_SHOP_PATH}/look_goods_detail_info.csv.tmp
cp ${ACCESS_OUTPUT_SERVICE_PATH}/show_goods_brand_info.csv ${FLUME_INPUT_SERVICE_PATH}/show_goods_brand_info.csv.tmp
echo 'step5:重命名flume监控的目录的(.tmp文件)'
mv ${FLUME_INPUT_SHOP_PATH}/look_goods_detail_info.csv.tmp ${FLUME_INPUT_SHOP_PATH}/look_goods_detail_info.csv
mv ${FLUME_INPUT_SERVICE_PATH}/show_goods_brand_info.csv.tmp ${FLUME_INPUT_SERVICE_PATH}/show_goods_brand_info.csv
echo 'step6:mv 结构化数据到bak 目录下'
TIMESTAMP=`date +%Y%m%d%H%M%S`
mv ${ACCESS_OUTPUT_SHOP_PATH}/look_goods_detail_info.csv ${ACCESS_OUTPUT_BAK_SHOP_PATH}/${TIMESTAMP}_look_goods_detail_info.csv
mv ${ACCESS_OUTPUT_SERVICE_PATH}/show_goods_brand_info.csv ${ACCESS_OUTPUT_BAK_SERVICE_PATH}/${TIMESTAMP}_show_goods_brand_info.csv
echo 'step7:删除2天前的备份数据'
#删除2天前的备份文件, 21号的删19号的
delet_date=`date -d 2' day ago' +%Y%m%d`
rm -rf ${ACCESS_OUTPUT_BAK_SHOP_PATH}/${delet_date}*.csv
rm -rf ${ACCESS_OUTPUT_BAK_SERVICE_PATH}/${delet_date}*.csv
echo "-----------end: "`date +%Y%m%d%H%M%S`" -----------"
2.4 编写flume agent将ETL的数据上传到hdfs
运行agent之前需要将编译好的flume-ng-core包上传到lib目录下
运行agent
flume-ng agent -n a1 -c /usr/local/flume/conf/ -f /data/xinniu/coolniu/access/flume/access_shop_log.agent -Dflume.root.logger=INFO,console
access_shop_log.agent
# Name the components on this agent
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2
#-------r1--c1--k1-----login_table---------
# r1 sources类型
a1.sources.r1.type = spooldir
# 指定监控的目录
a1.sources.r1.spoolDir = /data/xinniu/coolniu/access/work/look_goods_detail_info/
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.deletePolicy = immediate
# 跳过.tmp结尾的文件
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
# hdfs sink-k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/data/xinniu/coolniu/etlout/%Y%m%d/LOOK_GOODS_DETAIL_INFO
a1.sinks.k1.hdfs.filePrefix = access_LOOK_GOODS_DETAIL_INFO
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
# 当达到50M,文件关闭,创建新文件写入
a1.sinks.k1.hdfs.rollSize = 52428800
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.callTimeout = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000000
a1.channels.c1.transactionCapacity = 1000000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#-------r2--c2--k2-----look_goods_detail_info---------
# r2 sources类型
a1.sources.r2.type = spooldir
# 指定监控的目录
a1.sources.r2.spoolDir = /data/xinniu/coolniu/access/work/show_goods_brand_info/
a1.sources.r2.fileHeader = true
a1.sources.r2.interceptors = i1
a1.sources.r2.interceptors.i1.type = timestamp
#如果想传完删掉,可以配置
#如果打开这个配置,cp mv 同样的文件到监控的目录下,也可以归集
#如果关闭这个配置,cp mv 同样的文件到监控的目录下,抛异常
a1.sources.r2.deletePolicy = immediate
# 跳过.tmp结尾的文件
a1.sources.r2.ignorePattern = ^(.)*\\.tmp$
# hdfs sink-k2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://ns1/data/xinniu/coolniu/etlout/%Y%m%d/SHOW_GOODS_BRAND_INFO
a1.sinks.k2.hdfs.filePrefix = access_SHOW_GOODS_BRAND_INFO
a1.sinks.k2.hdfs.fileSuffix = .log
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.rollInterval = 0
# 当达到50M,文件关闭,创建新文件写入
a1.sinks.k2.hdfs.rollSize = 52428800
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.callTimeout = 0
a1.sinks.k2.hdfs.idleTimeout = 60
a1.channels.c2.type = memory
a1.channels.c2.capacity = 100000000
a1.channels.c2.transactionCapacity = 1000000
a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2
3 hive数仓搭建
3.0 安装sqoop
1 将sqoop安装包解压到/usr/local目录下
tar -zxvf /public/software/bigdata/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /usr/local/
2 创建软链接
ln -s /usr/local/sqoop-1.4.7.bin__hadoop-2.6.0/ /usr/local/sqoop
#安装hadoop环境
scp -r coolniushop:/opt/hadoop-2.7.3 /opt/
export SQOOP_HOME=/usr/local/sqoop
export PATH=$SQOOP_HOME/bin:$PATH
export HADOOP_HOME=/opt/hadoop-2.7.3
export HADOOP_CONF_HOME=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
3.1 T层脚本
创建目录;/data/xinniu/coolniu/shell/
mkdir -p /data/xinniu/coolniu/access/log/step1-etl
并在hive中创建所需库
xinniu_tmp
xinniu_itl
xinniu_iol
xinniu_iml
xinniu_icl
access_look_goods_detail_info.sh脚本如下:
batch_date=$1
now_date=`date -d "${batch_date} 1day" +%Y%m%d`
# look_goods_detail_info
hive -e"
drop table if exists xinniu_tmp.look_goods_detail_info;
create external table if not exists xinniu_tmp.look_goods_detail_info(
ip string,
looktime string,
goodsid string,
refer string,
browse string,
userid string,
username string
)
row format delimited fields terminated by '\t'
location '/data/xinniu/coolniu/etlout/${now_date}/LOOK_GOODS_DETAIL_INFO/'
;
"
res=$?
if [ ${res} != 0 ];then
echo 'load look_goods_detail_info data error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
exit 1
else
echo 'load look_goods_detail_info data successful! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
fi
# look_goods_detail_info建表语句
hive -e" create table if not exists xinniu_itl.look_goods_detail_info (
ip string,
looktime string,
goodsid string,
refer string,
browse string,
userid string,
username string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
"
res=$?
if [ ${res} != 0 ];then
echo 'load look_goods_detail_info ddl error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
exit 1
else
echo 'load look_goods_detail_info ddl successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
fi
hive -e "
set hive.exec.dynamic.partition.mode=nonstrict ;
insert into xinniu_itl.look_goods_detail_info partition (pt) select t.* , '${batch_date}' pt from xinniu_tmp.look_goods_detail_info t;
"
res=$?
if [ ${res} != 0 ];then
echo 'load log2xinniu_itl error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
exit 1
else
echo 'load log2xinniu_itl successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
fi
access_show_goods_brand_info.sh脚本如下:
batch_date=$1
now_date=`date -d "${batch_date} 1day" +%Y%m%d`
# look_goods_detail_info
hive -e"
drop table if exists xinniu_tmp.show_goods_brand_info;
create external table if not exists xinniu_tmp.show_goods_brand_info(
json string
)
row format delimited fields terminated by '\t'
location '/data/xinniu/coolniu/etlout/${now_date}/SHOW_GOODS_BRAND_INFO/'
;
"
res=$?
if [ ${res} != 0 ];then
echo 'load show_goods_brand_info data error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
exit 1
else
echo 'load show_goods_brand_info data successful! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
fi
hive -e" create table if not exists xinniu_itl.show_goods_brand_info (
goodsTagNames string,
goodsSn string,
goodsTypeId string,
brandId string,
saleState string,
isGift string,
isPromote string,
isCod string,
isReturns string,
isMerger string,
extendAttr string,
keyValue string,
goodsId string,
goodsCname string,
goodsEname string,
goodsImg string,
goodsPrice string,
goodsColor string,
goodsSize string,
locality string,
brandSizeId string,
stockNumber string,
warmNumber string,
collectNumber string,
brandCname string,
brandEname string,
giftId string,
giftNumberMax string,
giftPrice string,
giftNumberRemain string,
endDate string,
remainHour string,
typeCname string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
"
res=$?
if [ ${res} != 0 ];then
echo 'load show_goods_brand_info ddl error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
exit 1
else
echo 'load show_goods_brand_info ddl successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
fi
hive -e "
alter table xinniu_itl.show_goods_brand_info drop if exists partition (pt = '${batch_date}');
insert into xinniu_itl.show_goods_brand_info partition (pt='${batch_date}')
select json_tuple(json,'goodsTagNames','goodsSn','goodsTypeId','brandId','saleState','isGift',
'isPromote','isCod','isReturns','isMerger','extendAttr','keyValue','goodsId','goodsCname',
'goodsEname','goodsImg','goodsPrice','goodsColor','goodsSize','locality','brandSizeId','stockNumber','warmNumber',
'collectNumber','brandCname','brandEname','giftId','giftNumberMax','giftPrice','giftNumberRemain','endDate',
'remainHour','typeCname') as (goodsTagNames,goodsSn,goodsTypeId,brandId,saleState,isGift,
isPromote,isCod,isReturns,isMerger,extendAttr,keyValue,goodsId,goodsCname,goodsEname,goodsImg,
goodsPrice,goodsColor,goodsSize,locality,brandSizeId,stockNumber,warmNumber,collectNumber,brandCname,
brandEname,giftId,giftNumberMax,giftPrice,giftNumberRemain,endDate,remainHour,typeCname) from
(select explode(split(regexp_replace(regexp_replace(json, '\\[|\\]',''),'\\}\\,\\{','\\}\\|\\{'),'\\|')) as json from xinniu_tmp.show_goods_brand_info) t;
"
res=$?
if [ ${res} != 0 ];then
echo 'insert show_goods_brand_info ddl error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
exit 1
else
echo 'insert show_goods_brand_info ddl successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
fi
shop_goods.sh 脚本如下:
batch_date=$1
kinit -kt /data/xinniu.keytab xinniu
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://coolniushop:3306/hainiu_shop \
--username root \
--password hainiu \
--target-dir /data/xinniu/coolniu/access/sqoop/shop_goods/${batch_date}/ \
--num-mappers 1 \
--delete-target-dir \
--hive-drop-import-delims \
--fields-terminated-by "\t" \
--split-by Id \
--query 'select shop_goods.*, current_date etldate from shop_goods where $CONDITIONS'
res=$?
if [ ${res} != 0 ];then
echo 'extract shop_goods error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
exit 1
else
echo 'extract shop_goods successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
fi
hive -e "drop table if exists xinniu_tmp.shop_goods;create external table xinniu_tmp.shop_goods (
Id string,
goods_sn string,
goods_cname string,
goods_ename string,
locality string,
goods_price string,
goods_color string,
goods_size string,
brand_size_id string,
size_details string,
stock_number string,
warm_number string,
sold_number string,
collect_number string,
goods_weight string,
goods_volume string,
tags string,
goods_attr string,
gdetails_img string,
goods_cndetails string,
goods_endetails string,
key_value string,
sale_state string,
goods_img string,
is_gift string,
is_promote string,
is_cod string,
is_returns string,
is_merger string,
extend_attr string,
last_update_time string,
add_time string,
admin_uid string,
is_delete
string,
etldate string
)
row format delimited fields terminated by '\t'
location '/data/xinniu/coolniu/access/sqoop/shop_goods/${batch_date}';
"
res=$?
if [ ${res} != 0 ];then
echo 'create shop_goods tmp table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
exit 1
else
echo 'create shop_goods tmp table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
fi
hive -e "create table if not exists xinniu_itl.shop_goods (
Id string,
goods_sn string,
goods_cname string,
goods_ename string,
locality string,
goods_price string,
goods_color string,
goods_size string,
brand_size_id string,
size_details string,
stock_number string,
warm_number string,
sold_number string,
collect_number string,
goods_weight string,
goods_volume string,
tags string,
goods_attr string,
gdetails_img string,
goods_cndetails string,
goods_endetails string,
key_value string,
sale_state string,
goods_img string,
is_gift string,
is_promote string,
is_cod string,
is_returns string,
is_merger string,
extend_attr string,
last_update_time string,
add_time string,
admin_uid string,
is_delete
string,
etldate string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;"
res=$?
if [ ${res} != 0 ];then
echo 'create shop_goods itl table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
exit 1
else
echo 'create shop_goods itl table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
fi
hive -e "alter table xinniu_itl.shop_goods drop if exists partition (pt = '${batch_date}');"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict ;insert into table xinniu_itl.shop_goods partition (pt) select a.*,'${batch_date}' from xinniu_tmp.shop_goods a;"
res=$?
if [ ${res} != 0 ];then
echo 'load shop_goods data to itl table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
exit 1
else
echo 'load shop_goods data to itl table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
fi
shop_goods_relate.sh 脚本如下:
batch_date=$1
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://coolniushop:3306/hainiu_shop \
--username root \
--password hainiu \
--target-dir /data/xinniu/coolniu/access/sqoop/shop_goods_relate/${batch_date}/ \
--delete-target-dir \
--num-mappers 1 \
--hive-drop-import-delims \
--fields-terminated-by "\t" \
--split-by goods_id \
--query 'select shop_goods_relate.*, current_date etldate from shop_goods_relate where $CONDITIONS'
res=$?
if [ ${res} != 0 ];then
echo 'extract shop_goods_relate error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
exit 1
else
echo 'extract shop_goods_relate successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
fi
kinit -kt /data/impala.keytab impala
hive -e "drop table if exists xinniu_tmp.shop_goods_relate;create external table xinniu_tmp.shop_goods_relate (
goods_id string,
brand_id string,
goods_type_id string,
brand_catalog_id string,
relate_goods_ids string,
relate_news_ids
string,
etldate string
)
row format delimited fields terminated by '\t'
location '/data/xinniu/coolniu/access/sqoop/shop_goods_relate/${batch_date}';
"
res=$?
if [ ${res} != 0 ];then
echo 'create shop_goods_relate tmp table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
exit 1
else
echo 'create shop_goods_relate tmp table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
fi
kinit -kt /data/impala.keytab impala
hive -e"create table if not exists xinniu_itl.shop_goods_relate (
goods_id string,
brand_id string,
goods_type_id string,
brand_catalog_id string,
relate_goods_ids string,
relate_news_ids
string,
etldate string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;"
res=$?
if [ ${res} != 0 ];then
echo 'create shop_goods_relate itl table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
exit 1
else
echo 'create shop_goods_relate itl table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
fi
kinit -kt /data/impala.keytab impala
hive -e "alter table xinniu_itl.shop_goods_relate drop if exists partition (pt = '${batch_date}');"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict ;insert into table xinniu_itl.shop_goods_relate partition (pt) select a.*,'${batch_date}' from xinniu_tmp.shop_goods_relate a;"
res=$?
if [ ${res} != 0 ];then
echo 'load shop_goods_relate data to itl table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
exit 1
else
echo 'load shop_goods_relate data to itl table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
fi
3.2 O层脚本
mkdir -p /data/xinniu/coolniu/access/log/step2-iol
look_goods_detail.sh脚本如下
#!/bin/bash
batch_date=$1
# 建表
hive -e"
create table if not exists xinniu_iol.look_goods_detail (
ip string,
looktime string,
goodsid string,
refer string,
browse string,
userid string,
username string,
goods_sn string,
goods_cname string,
goods_ename string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY');
"
res=$?
if [ ${res} != 0 ];then
echo 'create look_goods_detail table error! '`date` >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log
exit 1
else
echo 'create look_goods_detail table success '`date` >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log
fi
# 删除分区,查询导入数据
hive -e "
alter table xinniu_iol.look_goods_detail drop if exists partition (pt = '${batch_date}');
set hive.exec.dynamic.partition.mode=nonstrict ;
insert into table xinniu_iol.look_goods_detail partition(pt='${batch_date}')
select
t1.ip,
t1.looktime,
t1.goodsid,
t1.refer,
t1.browse,
t1.userid,
t1.username,
t2.goods_sn,
t2.goods_cname,
t2.goods_ename
from
(select * from xinniu_itl.look_goods_detail_info where pt ='${batch_date}') t1
left join
(select Id, goods_sn, goods_cname, goods_ename from xinniu_itl.shop_goods where pt='${batch_date}') t2
on t1.goodsid=t2.Id;
"
res=$?
if [ ${res} != 0 ];then
echo 'look_goods_detail drop partition and insert error! '`date` >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log
exit 1
else
echo 'look_goods_detail drop partition and insert success '`date` >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log
fi
echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log
show_goods_brand.sh脚本如下:
#!/bin/bash
batch_date=$1
# 建表
hive -e" create table if not exists xinniu_iol.show_goods_brand_info (
goodsSn string,
goodsTypeId string,
brandId string,
goodsId string,
goodsCname string,
goodsEname string,
goodsPrice string,
goodsColor string,
goodsSize string,
brandCname string,
brandEname string,
typeCname string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
"
res=$?
if [ ${res} != 0 ];then
echo 'create xinniu_iol.show_goods_brand_info table error! '`date` >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log
exit 1
else
echo 'create xinniu_iol.show_goods_brand_info table success '`date` >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log
fi
# 删除分区,查询导入数据
hive -e "
alter table xinniu_iol.show_goods_brand_info drop if exists partition (pt = '${batch_date}');
set hive.exec.dynamic.partition.mode=nonstrict ;
insert into table xinniu_iol.show_goods_brand_info partition(pt='${batch_date}')
select goodsSn,goodsTypeId,brandId,goodsId,goodsCname,goodsEname,goodsPrice,goodsColor,goodsSize,brandCname,brandEname,typeCname
from xinniu_itl.show_goods_brand_info
"
res=$?
if [ ${res} != 0 ];then
echo 'xinniu_iol.show_goods_brand_info add partition and insert error! '`date` >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log
exit 1
else
echo 'xinniu_iol.show_goods_brand_info add partition and insert success '`date` >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log
fi
echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log
3.3 M层脚本
mkdir -p /data/xinniu/coolniu/access/log/step3-iml
look_goods_detail_fact.sh脚本如下
#!/bin/bash
batch_date=$1
# 建表
hive -e"
create table if not exists xinniu_iml.look_goods_detail_fact (
ip string,
looktime string,
goodsid string,
refer string,
browse string,
userid string,
username string,
goods_sn string,
goods_cname string,
goods_ename string,
goods_type_id string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY');
"
res=$?
if [ ${res} != 0 ];then
echo 'create look_goods_detail_fact table error! '`date` >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log
exit 1
else
echo 'create look_goods_detail_fact table success '`date` >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log
fi
# 删除分区,查询导入数据
hive -e "
alter table xinniu_iml.look_goods_detail_fact drop if exists partition (pt = '${batch_date}');
set hive.exec.dynamic.partition.mode=nonstrict ;
insert into table xinniu_iml.look_goods_detail_fact partition(pt='${batch_date}')
select
t1.ip,
t1.looktime,
t1.goodsid,
t1.refer,
t1.browse,
t1.userid,
t1.username,
t1.goods_sn,
t1.goods_cname,
t1.goods_ename,
t3.goods_type_id
from
(select * from xinniu_iol.look_goods_detail where pt='${batch_date}') t1
left join
(select * from xinniu_itl.shop_goods_relate where pt='${batch_date}') t3
on t1.goodsid=t3.goods_id;
"
res=$?
if [ ${res} != 0 ];then
echo 'look_goods_detail_fact drop partition and insert error! '`date` >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log
exit 1
else
echo 'look_goods_detail_fact drop partition and insert success '`date` >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log
fi
echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log
show_goods_brand_fact.sh脚本如下:
#!/bin/bash
batch_date=$1
mkdir -p /data/xinniu/coolniu/access/log/step3-iml
# 建表
hive -e"
create table if not exists xinniu_iml.show_goods_brand_fact (
goodsSn string,
goodsTypeId string,
brandId string,
goodsId string,
goodsCname string,
goodsEname string,
goodsPrice string,
goodsColor string,
goodsSize string,
brandCname string,
brandEname string,
typeCname string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY');
"
res=$?
if [ ${res} != 0 ];then
echo 'create show_goods_brand_fact table error! '`date` >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log
exit 1
else
echo 'create show_goods_brand_fact table success '`date` >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log
fi
# 删除分区,查询导入数据
hive -e "
alter table xinniu_iml.show_goods_brand_fact drop if exists partition (pt = '${batch_date}');
set hive.exec.dynamic.partition.mode=nonstrict ;
insert into table xinniu_iml.show_goods_brand_fact partition(pt)
select * from xinniu_iol.show_goods_brand_info where pt='${batch_date}';
"
res=$?
if [ ${res} != 0 ];then
echo 'show_goods_brand_fact drop partition and insert error! '`date` >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log
exit 1
else
echo 'show_goods_brand_fact drop partition and insert success '`date` >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log
fi
echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log
3.4 C层脚本
mkdir -p /data/xinniu/coolniu/access/log/step5-idl
mkdir -p /data/xinniu/coolniu/export/
创建hainiu_dw数据库
create database hainiu_dw;
goods_click_rate.sh 脚本如下:
batch_date=$1
# mysql表名称
table_name=xinniu_shop_goods_clickrate
# 导出的文件路径名
export_file_path=/data/xinniu/coolniu/export/${table_name}_data_${batch_date}
hive -e "
select '${batch_date}' as batch_date,t2.goodsid as goodsid ,t2.goodscname as goodscname,nvl(t1.clicknum,0) as clicknum ,t2.shownum as shownum ,concat(round((nvl(t1.clicknum,0)/t2.shownum*100),2),'%') as rate from
(select goodsid,goods_cname ,count(*) as clicknum from xinniu_iml.look_goods_detail_fact where pt='${batch_date}' group by goodsid,goods_cname ) t1
right join
(select goodsid,goodscname ,count(*) as shownum from xinniu_iml.show_goods_brand_fact where pt='${batch_date}' group by goodsid,goodscname ) t2
on t1.goodsid=t1.goodsid and t1.goods_cname=t2.goodscname;
" 1> ${export_file_path} 2> /dev/null
res=$?
if [ ${res} != 0 ];then
echo 'export date: ${table_name}_data_${batch_date} error! '`date` >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log
exit 1
else
echo 'export date: ${table_name}_data_${batch_date} success '`date` >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log
fi
# 导入MySQL
mysqlDBUrl="/bin/mysql -hcoolniushop -p3306 -uroot -phainiu -Dhainiu_dw"
${mysqlDBUrl} <<EOF
CREATE TABLE if not exists ${table_name} (
id int NOT NULL AUTO_INCREMENT,
batch_date varchar(30),
goodsid varchar(10),
goodscname varchar(30) ,
clicknum varchar(30) ,
shownum varchar(256) ,
clickrate varchar(30),
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
delete from ${table_name} where batch_date=${batch_date};
LOAD DATA LOCAL INFILE "${export_file_path}" INTO TABLE ${table_name} FIELDS
TERMINATED BY '\t'
(batch_date,goodsid, goodscname, clicknum, shownum, clickrate);
EOF
res=$?
if [ ${res} != 0 ];then
echo 'import table:${table_name} error! '`date` >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log
exit 1
else
echo 'import table:${table_name} success '`date` >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log
fi
echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log
ago7date=`date -d "${batch_date} -7day" +%Y%m%d`
rm -f /data/xinniu/coolniu/export/${table_name}_data_${ago7date}
4.数据可视化
连接 mysql并创建hainiu_dw数据库
create database hainiu_dw;
4.1 mysql数据表如下:
4.2 配置zeppelin
访问zeppelin网址:http://11.147.251.157:9898/
4.2.1 编辑jdbc链接:
4.2.2 上传mysql驱动包到zeppelin lib目录下:
4.2.3 配置notebook
4.3 编写sql统计点击率:
select t.*,t.clicknum/t.shownum as rate from hainiu_dw.xinniu_shop_goods_clickrate t;