配置文件
cdh yarn界面下载客户端配置文件
下载后放在代码的conf目录中,下载krb5.conf与keytab文件放入conf文件夹中,如下:
代码结构
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hainiu</groupId>
<artifactId>coolniu</artifactId>
<version>1.0</version>
<name>coolniu</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0-cdh6.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0-cdh6.3.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
MapReduce代码
- com.hainiu.mr.CoolNiuETL
本地调试使用
package com.hainiu.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* etl主类,本地端
*/
public class CoolNiuETL {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
conf.set("inputPath",args[0]);
conf.set("outputPath",args[1]);
Job job = Job.getInstance(conf);
job.setJobName("CoolNiuETL");
job.setJarByClass(CoolNiuETL.class);
job.setJarByClass(InitMapReduceJob.class);
job.setMapperClass(com.hainiu.mr.CoolNiuMapper.class);
job.setReducerClass(com.hainiu.mr.CoolNiuReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(conf.get("outputPath")))){
fs.delete(new Path(conf.get("outputPath")),true);
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- com.hainiu.mr.CoolNiuETLwithKB
kerberos认证、集群客户端提交、跨平台提交代码
package com.hainiu.mr;
import com.hainiu.utils.ConfigurationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.File;
/**
* ETL主类,跨平台提交代码
*/
public class CoolNiuETLwithKB {
public static String confPath = System.getProperty("user.dir") + File.separator + "conf";
public static void main(String[] args) {
try {
String krb5conf = confPath + File.separator + "krb5.conf";
String keytab = confPath + File.separator + "hdfs.keytab";
System.setProperty("java.security.krb5.conf", krb5conf);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
//Kerberos Debug模式
System.setProperty("sun.security.krb5.debug", "true");
//跨平台提交的时候打开本配置
// Configuration conf = ConfigurationUtil.getConfiguration(confPath);
//本地提交时使用如下配置
Configuration conf = ConfigurationUtil.getConfigurationLocal(confPath);
conf.set("inputPath",args[0]);
conf.set("outputPath",args[1]);
//登录Kerberos账号
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("hdfs@HAINIU.COM", keytab);
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
userGroupInformation.reloginFromKeytab();
Job job = InitMapReduceJob.initJob(conf);
job.setJarByClass(CoolNiuETLwithKB.class);
job.setJobName("CoolNiuETL");
//调用job对象的waitForCompletion()方法,提交作业。
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- com.hainiu.mr.CoolNiuMapper
mapper类
package com.hainiu.mr;
import com.hainiu.pojo.*;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CoolNiuMapper extends Mapper<LongWritable, Text,Text,Text> {
final static String VIEW_HOMEPAGE = "GET /index.jsp";
final static String VIEW_HOMEPAGE_ACTION = "VIEW_HOMEPAGE";
final static String LOGOUT = "GET /logout.action";
final static String LOGOUT_ACTION = "LOGOUT";
final static String LOGIN = "GET /login.jsp";
final static String LOGIN_ACTION = "LOGIN";
final static String SEARCH_GOODS_BY_TYPE = "GET /search.action?search.goodsTypeId";
final static String SEARCH_GOODS_ACTION = "SEARCH_GOODS_BY_TYPE";
final static String LOOK_GOODS_DETAIL = "GET /lookDetail.action";
final static String LOOK_GOODS_DETAIL_ACTION = "LOOK_GOODS_DETAIL";
final static String GO_SHOP_CAR = "GET /purchase/shop_car";
final static String GO_SHOP_CAR_ACTION = "GO_SHOP_CAR";
final static String GO_ORDER_PAGE = "POST /member/toOrderPage";
final static String GO_ORDER_PAGE_ACTION = "GO_ORDER_PAGE";
final static String GO_PAY_PAGE = "GET /member/toPayOnline";
final static String GO_PAY_PAGE_ACTION = "GO_PAY_PAGE";
final static String PAY_SUCCESS = "GET /bank/pay_success";
final static String PAY_SUCCESS_ACTION = "PAY_SUCCESS";
final static String ADD_COMMENT = "POST /member/addComment.action";
final static String ADD_COMMENT_ACTION = "ADD_COMMENT";
private Text keyOut = new Text();
private Text valueOut = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] cols = line.split("\u0001");
if (cols.length != 12){
return;
}
String actionType = cols[3];
//处理浏览首页逻辑
if (actionType.startsWith(VIEW_HOMEPAGE)){
ViewHomePage viewHomePage = new ViewHomePage();
keyOut.set(VIEW_HOMEPAGE_ACTION);
valueOut.set(viewHomePage.toStr(line));
context.write(keyOut,valueOut);
}
// //处理logout逻辑
if (actionType.startsWith(LOGOUT)){
Logout logout = new Logout();
keyOut.set(LOGOUT_ACTION);
valueOut.set(logout.toStr(line));
context.write(keyOut,valueOut);
}
// //处理登陆逻辑
if (actionType.startsWith(LOGIN)){
Login login = new Login();
keyOut.set(LOGIN_ACTION);
valueOut.set(login.toStr(line));
context.write(keyOut,valueOut);
}
//处理按商品类别查询逻辑
if (actionType.startsWith(SEARCH_GOODS_BY_TYPE)){
SearchGoodsByType searchGoodsByType = new SearchGoodsByType();
keyOut.set(SEARCH_GOODS_ACTION);
valueOut.set(searchGoodsByType.toStr(line));
context.write(keyOut,valueOut);
}
//处理浏览商品详情逻辑
if (actionType.startsWith(LOOK_GOODS_DETAIL)){
LookGoodsDetail lookGoodsDetail = new LookGoodsDetail();
keyOut.set(LOOK_GOODS_DETAIL_ACTION);
valueOut.set(lookGoodsDetail.toStr(line));
context.write(keyOut,valueOut);
}
//处理前往购物车逻辑
if (actionType.startsWith(GO_SHOP_CAR)){
GoShopCar goShopCar = new GoShopCar();
keyOut.set(GO_SHOP_CAR_ACTION);
valueOut.set(goShopCar.toStr(line));
context.write(keyOut,valueOut);
}
//处理前往订单页面逻辑
if (actionType.startsWith(GO_ORDER_PAGE)){
GoOrderPage goOrderPage = new GoOrderPage();
keyOut.set(GO_ORDER_PAGE_ACTION);
valueOut.set(goOrderPage.toStr(line));
context.write(keyOut,valueOut);
}
//处理前往支付页面逻辑
if (actionType.startsWith(GO_PAY_PAGE)){
GoPayPage goPayPage = new GoPayPage();
keyOut.set(GO_PAY_PAGE_ACTION);
valueOut.set(goPayPage.toStr(line));
context.write(keyOut,valueOut);
}
//处理支付成功逻辑
if (actionType.startsWith(PAY_SUCCESS)){
PaySuccessPage paySuccessPage = new PaySuccessPage();
keyOut.set(PAY_SUCCESS_ACTION);
valueOut.set(paySuccessPage.toStr(line));
context.write(keyOut,valueOut);
}
//处理评论逻辑
if (actionType.startsWith(ADD_COMMENT)){
AddCommentPage addCommentPage = new AddCommentPage();
keyOut.set(ADD_COMMENT_ACTION);
valueOut.set(addCommentPage.toStr(line));
context.write(keyOut,valueOut);
}
}
}
- com.hainiu.mr.CoolNiuReducer
reducer类
package com.hainiu.mr;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.File;
import java.io.IOException;
public class CoolNiuReducer extends Reducer<Text,Text, NullWritable,Text> {
private MultipleOutputs<NullWritable, org.apache.hadoop.io.Text> mos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<NullWritable, org.apache.hadoop.io.Text>(context);
}
@Override
protected void reduce(org.apache.hadoop.io.Text key, Iterable<org.apache.hadoop.io.Text> values, Context context) throws IOException, InterruptedException {
org.apache.hadoop.io.Text valueOut = new org.apache.hadoop.io.Text();
for (org.apache.hadoop.io.Text value : values) {
valueOut.set(value);
mos.write(NullWritable.get(),valueOut,key.toString()+ File.separator +key.toString());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
- com.hainiu.mr.InitMapReduceJob
Job配置
package com.hainiu.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InitMapReduceJob {
public static Job initJob(Configuration conf) {
Job job = null;
try {
//设置跨平台提交作业
conf.setBoolean("mapreduce.app-submission.cross-platform", true);
job = Job.getInstance(conf);
job.setJobName("coolniu");
//设置job中的资源所在的jar包
job.setJarByClass(InitMapReduceJob.class);
//设置map类
job.setMapperClass(com.hainiu.mr.CoolNiuMapper.class);
//设置reduce类
job.setReducerClass(com.hainiu.mr.CoolNiuReducer.class);
//job的mapper类输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//job的reducer类输出的kv数据类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//指定要处理的原始数据所存放的路径
FileInputFormat.setInputPaths(job, conf.get("inputPath"));
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(conf.get("outputPath")))){
fs.delete(new Path(conf.get("outputPath")));
}
//指定处理之后的结果输出到哪个路径
FileOutputFormat.setOutputPath(job, new Path(conf.get("outputPath")));
} catch (Exception e) {
e.printStackTrace();
}
return job;
}
}
实体类
- com.hainiu.pojo.AddCommentPage
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class AddCommentPage {
private String ip;
private String commentTime;
private String commentUrl;
private String orderId;
private String goodsId;
private String commentContent;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.commentTime = cols[2];
this.commentUrl = cols[6];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
this.orderId = this.commentUrl.split("comment\\.orderId=|&comment\\.commentBank")[1];
this.goodsId = this.commentUrl.split("=|&")[1];
this.commentContent = this.commentUrl.split("=")[this.commentUrl.split("=").length-1];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.commentTime))
.append(separator)
.append(this.goodsId)
.append(separator)
.append(orderId)
.append(separator)
.append(commentContent)
.append(separator)
.append(this.refer)
.append(separator)
.append(this.browse)
.append(separator)
.append(this.userId)
.append(separator)
.append(this.userName);
return sb.toString();
}
}
- com.hainiu.pojo.GoOrderPage
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class GoOrderPage {
private String ip;
private String goOrderPageTime;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.goOrderPageTime = cols[2];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.goOrderPageTime))
.append(separator)
.append(this.refer)
.append(separator)
.append(this.browse)
.append(separator)
.append(this.userId)
.append(separator)
.append(this.userName);
return sb.toString();
}
}
- com.hainiu.pojo.GoPayPage
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class GoPayPage {
private String ip;
private String goPayPageTime;
private String action;
private String orderId;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.goPayPageTime = cols[2];
this.action = cols[3];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
this.orderId = this.action.split("=| ")[2];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.goPayPageTime))
.append(separator)
.append(this.orderId)
.append(separator)
.append(this.refer)
.append(separator)
.append(this.browse)
.append(separator)
.append(this.userId)
.append(separator)
.append(this.userName);
return sb.toString();
}
}
- com.hainiu.pojo.GoShopCar
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class GoShopCar {
private String ip;
private String goShopCarTime;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.goShopCarTime = cols[2];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.goShopCarTime))
.append(separator)
.append(this.refer)
.append(separator)
.append(this.browse)
.append(separator)
.append(this.userId)
.append(separator)
.append(this.userName);
return sb.toString();
}
}
- com.hainiu.pojo.Login
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class Login {
private String ip;
private String loginTime;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.loginTime = cols[2];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.loginTime))
.append(separator)
.append(this.refer)
.append(separator)
.append(this.browse)
.append(separator)
.append(this.userId)
.append(separator)
.append(this.userName);
return sb.toString();
}
}
- com.hainiu.pojo.Logout
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class Logout {
private String ip;
private String logoutTime;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.logoutTime = cols[2];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.logoutTime))
.append(separator)
.append(this.refer)
.append(separator)
.append(this.browse)
.append(separator)
.append(this.userId)
.append(separator)
.append(this.userName);
return sb.toString();
}
}
- com.hainiu.pojo.LookGoodsDetail
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class LookGoodsDetail {
private String ip;
private String lookTime;
private String action;
private String goodsId;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.lookTime = cols[2];
this.action = cols[3];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
this.goodsId = this.action.split("=| ")[2];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.lookTime))
.append(separator)
.append(this.goodsId)
.append(separator)
.append(this.refer)
.append(separator)
.append(this.browse)
.append(separator)
.append(this.userId)
.append(separator)
.append(this.userName);
return sb.toString();
}
}
- com.hainiu.pojo.PaySuccessPage
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class PaySuccessPage {
private String ip;
private String paySuccessTime;
private String orderId;
private String totalPay;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.paySuccessTime = cols[2];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
this.orderId = this.refer.split("orderStr=|&total=|&key=")[1];
this.totalPay = this.refer.split("orderStr=|&total=|&key=")[2];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.paySuccessTime))
.append(separator)
.append(this.orderId)
.append(separator)
.append(totalPay)
.append(separator)
.append(this.refer)
.append(separator)
.append(this.browse)
.append(separator)
.append(this.userId)
.append(separator)
.append(this.userName);
return sb.toString();
}
}
- com.hainiu.pojo.SearchGoodsByType
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class SearchGoodsByType {
private String ip;
private String searchTime;
private String action;
private String goodsTypeId;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.searchTime = cols[2];
this.action = cols[3];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
this.goodsTypeId = this.action.split("=|&")[1];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.searchTime))
.append(separator)
.append(this.goodsTypeId)
.append(separator)
.append(this.refer)
.append(separator)
.append(this.browse)
.append(separator)
.append(this.userId)
.append(separator)
.append(this.userName);
return sb.toString();
}
}
- com.hainiu.pojo.ViewHomePage
package com.hainiu.pojo;
import com.hainiu.utils.DateFormat;
public class ViewHomePage {
private String ip;
private String viewTime;
private String refer;
private String browse;
private String userId;
private String userName;
public String toStr(String line){
String[] cols = line.split("\u0001");
this.ip = cols[0];
this.viewTime = cols[2];
this.refer = cols[7];
this.browse = cols[8];
this.userId = cols[10];
this.userName = cols[11];
String separator = "\t";
StringBuilder sb = new StringBuilder();
sb.append(this.ip)
.append(separator)
.append(DateFormat.dateFormat(this.viewTime))
.append(separator)
.append(this.refer)
.append(separator)
.append(browse)
.append(separator)
.append(userId)
.append(separator)
.append(userName);
return sb.toString();
}
}
工具类
- com.hainiu.utils.ConfigurationUtil
package com.hainiu.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.File;
public class ConfigurationUtil {
public static Configuration getConfiguration(String confPath) {
Configuration configuration = new YarnConfiguration();
configuration.addResource(new Path(confPath + File.separator + "core-site.xml"));
configuration.addResource(new Path(confPath + File.separator + "hdfs-site.xml"));
configuration.addResource(new Path(confPath + File.separator + "mapred-site.xml"));
configuration.addResource(new Path(confPath + File.separator + "yarn-site.xml"));
configuration.setBoolean("dfs.support.append", true);
configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
return configuration;
}
public static Configuration getConfigurationLocal(String confPath) {
Configuration configuration = new YarnConfiguration();
configuration.setBoolean("dfs.support.append", true);
configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
return configuration;
}
}
- com.hainiu.utils.DateFormat
package com.hainiu.utils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
public class DateFormat {
/**
* 格式化日期字符串(24/Jun/2021:03:56:02 +0800)为YYYY-MM-dd HH:mm:ss格式
* @param dateStr
* @return 返回YYYY-MM-dd HH:mm:ss格式日期字符串
* @throws Exception
*/
public static String dateFormat(String dateStr) {
String dateFormat = "dd/MMM/YYYY:HH:mm:ss +0800";
SimpleDateFormat sdf1 = new SimpleDateFormat(dateFormat, Locale.US);
Date parse = null;
try {
parse = sdf1.parse(dateStr);
} catch (ParseException e) {
e.printStackTrace();
}
String standardFormat = "YYYY-MM-dd HH:mm:ss";
SimpleDateFormat sdf2 = new SimpleDateFormat(standardFormat);
String standardDate = sdf2.format(parse);
return standardDate;
}
/**
* 格式化日期字符串(24/Jun/2021:03:56:02 +0800)为YYYY-MM-dd格式
* @param dateStr
* @return 返回YYYY-MM-dd格式
* @throws Exception
*/
public static String getDate(String dateStr) {
String dateFormat = "dd/MMM/YYYY:HH:mm:ss +0800";
SimpleDateFormat sdf1 = new SimpleDateFormat(dateFormat, Locale.US);
Date parse = null;
try {
parse = sdf1.parse(dateStr);
} catch (ParseException e) {
e.printStackTrace();
}
String standardFormat = "YYYY-MM-dd";
SimpleDateFormat sdf2 = new SimpleDateFormat(standardFormat);
return sdf2.format(parse);
}
}
日志样例
链接: https://pan.baidu.com/s/1eo5kt0UrbY2ciPGTvfoeaQ 密码: 7331
打包运行
集群运行
hadoop jar coolniu.jar /input/access_shop.log /output/
可以将输出目录替换为跑批日期,代码已经实现按不同模块多目录输出。