kerberos 环境 MapReduce 跨平台提交

教程 犀牛 ⋅ 于 2021-06-28 20:17:07 ⋅ 1947 阅读

配置文件

cdh yarn界面下载客户端配置文件

file

下载后放在代码的conf目录中,下载krb5.conf与keytab文件放入conf文件夹中,如下:

file

代码结构

file

pom文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.hainiu</groupId>
  <artifactId>coolniu</artifactId>
  <version>1.0</version>

  <name>coolniu</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>
  <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>
  <dependencies>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.0.0-cdh6.3.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.0.0-cdh6.3.2</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

MapReduce代码

  • com.hainiu.mr.CoolNiuETL

本地调试使用

package com.hainiu.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * etl主类,本地端
 */
public class CoolNiuETL {

    public static void main(String[] args) {

        try {
            Configuration conf = new Configuration();
            conf.set("inputPath",args[0]);
            conf.set("outputPath",args[1]);
            Job job = Job.getInstance(conf);
            job.setJobName("CoolNiuETL");
            job.setJarByClass(CoolNiuETL.class);

            job.setJarByClass(InitMapReduceJob.class);

            job.setMapperClass(com.hainiu.mr.CoolNiuMapper.class);
            job.setReducerClass(com.hainiu.mr.CoolNiuReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, args[0]);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(new Path(conf.get("outputPath")))){
                fs.delete(new Path(conf.get("outputPath")),true);
            }
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            boolean res = job.waitForCompletion(true);
            System.exit(res ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • com.hainiu.mr.CoolNiuETLwithKB

kerberos认证、集群客户端提交、跨平台提交代码

package com.hainiu.mr;

import com.hainiu.utils.ConfigurationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.File;

/**
 * ETL主类,跨平台提交代码
 */
public class CoolNiuETLwithKB {
    public static String confPath = System.getProperty("user.dir") + File.separator + "conf";
    public static void main(String[] args) {
        try {
            String krb5conf = confPath + File.separator + "krb5.conf";
            String keytab = confPath + File.separator + "hdfs.keytab";

            System.setProperty("java.security.krb5.conf", krb5conf);
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            //Kerberos Debug模式
            System.setProperty("sun.security.krb5.debug", "true");
            //跨平台提交的时候打开本配置
//            Configuration conf = ConfigurationUtil.getConfiguration(confPath);
            //本地提交时使用如下配置
            Configuration conf = ConfigurationUtil.getConfigurationLocal(confPath);
            conf.set("inputPath",args[0]);
            conf.set("outputPath",args[1]);
            //登录Kerberos账号
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab("hdfs@HAINIU.COM", keytab);
            UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();

            userGroupInformation.reloginFromKeytab();

            Job job = InitMapReduceJob.initJob(conf);
            job.setJarByClass(CoolNiuETLwithKB.class);
            job.setJobName("CoolNiuETL");

            //调用job对象的waitForCompletion()方法,提交作业。
            boolean res = job.waitForCompletion(true);
            System.exit(res ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • com.hainiu.mr.CoolNiuMapper

mapper类

package com.hainiu.mr;

import com.hainiu.pojo.*;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class CoolNiuMapper extends Mapper<LongWritable, Text,Text,Text> {

    final static String VIEW_HOMEPAGE = "GET /index.jsp";
    final static String VIEW_HOMEPAGE_ACTION = "VIEW_HOMEPAGE";
    final static String LOGOUT = "GET /logout.action";
    final static String LOGOUT_ACTION = "LOGOUT";
    final static String LOGIN = "GET /login.jsp";
    final static String LOGIN_ACTION = "LOGIN";
    final static String SEARCH_GOODS_BY_TYPE = "GET /search.action?search.goodsTypeId";
    final static String SEARCH_GOODS_ACTION = "SEARCH_GOODS_BY_TYPE";
    final static String LOOK_GOODS_DETAIL = "GET /lookDetail.action";
    final static String LOOK_GOODS_DETAIL_ACTION = "LOOK_GOODS_DETAIL";
    final static String GO_SHOP_CAR = "GET /purchase/shop_car";
    final static String GO_SHOP_CAR_ACTION = "GO_SHOP_CAR";
    final static String GO_ORDER_PAGE = "POST /member/toOrderPage";
    final static String GO_ORDER_PAGE_ACTION = "GO_ORDER_PAGE";
    final static String GO_PAY_PAGE = "GET /member/toPayOnline";
    final static String GO_PAY_PAGE_ACTION = "GO_PAY_PAGE";
    final static String PAY_SUCCESS = "GET /bank/pay_success";
    final static String PAY_SUCCESS_ACTION = "PAY_SUCCESS";
    final static String ADD_COMMENT = "POST /member/addComment.action";
    final static String ADD_COMMENT_ACTION = "ADD_COMMENT";
    private Text keyOut = new Text();
    private Text valueOut = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] cols = line.split("\u0001");

        if (cols.length != 12){
            return;
        }

        String actionType = cols[3];

        //处理浏览首页逻辑
        if (actionType.startsWith(VIEW_HOMEPAGE)){
            ViewHomePage viewHomePage = new ViewHomePage();
            keyOut.set(VIEW_HOMEPAGE_ACTION);
            valueOut.set(viewHomePage.toStr(line));
            context.write(keyOut,valueOut);
        }

//            //处理logout逻辑
        if (actionType.startsWith(LOGOUT)){
            Logout logout = new Logout();
            keyOut.set(LOGOUT_ACTION);
            valueOut.set(logout.toStr(line));
            context.write(keyOut,valueOut);
        }

//            //处理登陆逻辑
        if (actionType.startsWith(LOGIN)){
            Login login = new Login();
            keyOut.set(LOGIN_ACTION);
            valueOut.set(login.toStr(line));
            context.write(keyOut,valueOut);
        }

        //处理按商品类别查询逻辑
        if (actionType.startsWith(SEARCH_GOODS_BY_TYPE)){
            SearchGoodsByType searchGoodsByType = new SearchGoodsByType();
            keyOut.set(SEARCH_GOODS_ACTION);
            valueOut.set(searchGoodsByType.toStr(line));
            context.write(keyOut,valueOut);
        }

        //处理浏览商品详情逻辑
        if (actionType.startsWith(LOOK_GOODS_DETAIL)){
            LookGoodsDetail lookGoodsDetail = new LookGoodsDetail();
            keyOut.set(LOOK_GOODS_DETAIL_ACTION);
            valueOut.set(lookGoodsDetail.toStr(line));
            context.write(keyOut,valueOut);
        }

        //处理前往购物车逻辑
        if (actionType.startsWith(GO_SHOP_CAR)){
            GoShopCar goShopCar = new GoShopCar();
            keyOut.set(GO_SHOP_CAR_ACTION);
            valueOut.set(goShopCar.toStr(line));
            context.write(keyOut,valueOut);
        }

        //处理前往订单页面逻辑
        if (actionType.startsWith(GO_ORDER_PAGE)){
            GoOrderPage goOrderPage = new GoOrderPage();
            keyOut.set(GO_ORDER_PAGE_ACTION);
            valueOut.set(goOrderPage.toStr(line));
            context.write(keyOut,valueOut);
        }

        //处理前往支付页面逻辑
        if (actionType.startsWith(GO_PAY_PAGE)){
            GoPayPage goPayPage = new GoPayPage();
            keyOut.set(GO_PAY_PAGE_ACTION);
            valueOut.set(goPayPage.toStr(line));
            context.write(keyOut,valueOut);
        }

        //处理支付成功逻辑
        if (actionType.startsWith(PAY_SUCCESS)){
            PaySuccessPage paySuccessPage = new PaySuccessPage();
            keyOut.set(PAY_SUCCESS_ACTION);
            valueOut.set(paySuccessPage.toStr(line));
            context.write(keyOut,valueOut);
        }

        //处理评论逻辑
        if (actionType.startsWith(ADD_COMMENT)){
            AddCommentPage addCommentPage = new AddCommentPage();
            keyOut.set(ADD_COMMENT_ACTION);
            valueOut.set(addCommentPage.toStr(line));
            context.write(keyOut,valueOut);
        }
    }
}
  • com.hainiu.mr.CoolNiuReducer

reducer类

package com.hainiu.mr;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import java.io.File;
import java.io.IOException;

public class CoolNiuReducer extends Reducer<Text,Text, NullWritable,Text> {
    private MultipleOutputs<NullWritable, org.apache.hadoop.io.Text> mos;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        mos = new MultipleOutputs<NullWritable, org.apache.hadoop.io.Text>(context);
    }

    @Override
    protected void reduce(org.apache.hadoop.io.Text key, Iterable<org.apache.hadoop.io.Text> values, Context context) throws IOException, InterruptedException {

        org.apache.hadoop.io.Text valueOut = new org.apache.hadoop.io.Text();

        for (org.apache.hadoop.io.Text value : values) {
            valueOut.set(value);
            mos.write(NullWritable.get(),valueOut,key.toString()+ File.separator +key.toString());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }
}
  • com.hainiu.mr.InitMapReduceJob

Job配置

package com.hainiu.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InitMapReduceJob {

    public static Job initJob(Configuration conf) {
        Job job = null;
        try {
            //设置跨平台提交作业
            conf.setBoolean("mapreduce.app-submission.cross-platform", true);  
            job = Job.getInstance(conf);
            job.setJobName("coolniu");

            //设置job中的资源所在的jar包
            job.setJarByClass(InitMapReduceJob.class);
            //设置map类
            job.setMapperClass(com.hainiu.mr.CoolNiuMapper.class);
            //设置reduce类
            job.setReducerClass(com.hainiu.mr.CoolNiuReducer.class);

            //job的mapper类输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            //job的reducer类输出的kv数据类型
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);

            //指定要处理的原始数据所存放的路径
            FileInputFormat.setInputPaths(job, conf.get("inputPath"));
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(new Path(conf.get("outputPath")))){
                fs.delete(new Path(conf.get("outputPath")));
            }
            //指定处理之后的结果输出到哪个路径
            FileOutputFormat.setOutputPath(job, new Path(conf.get("outputPath")));

        } catch (Exception e) {
            e.printStackTrace();
        }
        return job;
    }
}

实体类

  • com.hainiu.pojo.AddCommentPage
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class AddCommentPage {

    private String ip;
    private String commentTime;
    private String commentUrl;
    private String orderId;
    private String goodsId;
    private String commentContent;
    private String refer;
    private String browse;
    private String userId;
    private String userName;

    public String toStr(String line){

        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.commentTime = cols[2];
        this.commentUrl = cols[6];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        this.orderId = this.commentUrl.split("comment\\.orderId=|&comment\\.commentBank")[1];
        this.goodsId = this.commentUrl.split("=|&")[1];
        this.commentContent = this.commentUrl.split("=")[this.commentUrl.split("=").length-1];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.commentTime))
                .append(separator)
                .append(this.goodsId)
                .append(separator)
                .append(orderId)
                .append(separator)
                .append(commentContent)
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(this.browse)
                .append(separator)
                .append(this.userId)
                .append(separator)
                .append(this.userName);
        return sb.toString();
    }
}
  • com.hainiu.pojo.GoOrderPage
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class GoOrderPage {
    private String ip;
    private String goOrderPageTime;
    private String refer;
    private String browse;
    private String userId;
    private String userName;
    public String toStr(String line){
        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.goOrderPageTime = cols[2];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.goOrderPageTime))
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(this.browse)
                .append(separator)
                .append(this.userId)
                .append(separator)
                .append(this.userName);
        return sb.toString();
    }
}
  • com.hainiu.pojo.GoPayPage
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class GoPayPage {
    private String ip;
    private String goPayPageTime;
    private String action;
    private String orderId;
    private String refer;
    private String browse;
    private String userId;
    private String userName;

    public String toStr(String line){
        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.goPayPageTime = cols[2];
        this.action = cols[3];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        this.orderId = this.action.split("=| ")[2];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.goPayPageTime))
                .append(separator)
                .append(this.orderId)
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(this.browse)
                .append(separator)
                .append(this.userId)
                .append(separator)
                .append(this.userName);
        return sb.toString();
    }
}
  • com.hainiu.pojo.GoShopCar
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class GoShopCar {
    private String ip;
    private String goShopCarTime;
    private String refer;
    private String browse;
    private String userId;
    private String userName;

    public String toStr(String line){
        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.goShopCarTime = cols[2];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.goShopCarTime))
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(this.browse)
                .append(separator)
                .append(this.userId)
                .append(separator)
                .append(this.userName);
        return sb.toString();
    }
}
  • com.hainiu.pojo.Login
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class Login {
    private String ip;
    private String loginTime;
    private String refer;
    private String browse;
    private String userId;
    private String userName;

    public String toStr(String line){
        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.loginTime = cols[2];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.loginTime))
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(this.browse)
                .append(separator)
                .append(this.userId)
                .append(separator)
                .append(this.userName);
        return sb.toString();
    }
}
  • com.hainiu.pojo.Logout
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class Logout {
    private String ip;
    private String logoutTime;
    private String refer;
    private String browse;
    private String userId;
    private String userName;

    public String toStr(String line){
        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.logoutTime = cols[2];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.logoutTime))
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(this.browse)
                .append(separator)
                .append(this.userId)
                .append(separator)
                .append(this.userName);
        return sb.toString();
    }
}
  • com.hainiu.pojo.LookGoodsDetail
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class LookGoodsDetail {
    private String ip;
    private String lookTime;
    private String action;
    private String goodsId;
    private String refer;
    private String browse;
    private String userId;
    private String userName;

    public String toStr(String line){
        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.lookTime = cols[2];
        this.action = cols[3];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        this.goodsId = this.action.split("=| ")[2];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.lookTime))
                .append(separator)
                .append(this.goodsId)
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(this.browse)
                .append(separator)
                .append(this.userId)
                .append(separator)
                .append(this.userName);
        return sb.toString();
    }
}
  • com.hainiu.pojo.PaySuccessPage
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class PaySuccessPage {
    private String ip;
    private String paySuccessTime;
    private String orderId;
    private String totalPay;
    private String refer;
    private String browse;
    private String userId;
    private String userName;

    public String toStr(String line){
        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.paySuccessTime = cols[2];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        this.orderId = this.refer.split("orderStr=|&total=|&key=")[1];
        this.totalPay = this.refer.split("orderStr=|&total=|&key=")[2];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.paySuccessTime))
                .append(separator)
                .append(this.orderId)
                .append(separator)
                .append(totalPay)
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(this.browse)
                .append(separator)
                .append(this.userId)
                .append(separator)
                .append(this.userName);
        return sb.toString();
    }
}
  • com.hainiu.pojo.SearchGoodsByType
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class SearchGoodsByType {
    private String ip;
    private String searchTime;
    private String action;
    private String goodsTypeId;
    private String refer;
    private String browse;
    private String userId;
    private String userName;

    public String toStr(String line){
        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.searchTime = cols[2];
        this.action = cols[3];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        this.goodsTypeId = this.action.split("=|&")[1];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.searchTime))
                .append(separator)
                .append(this.goodsTypeId)
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(this.browse)
                .append(separator)
                .append(this.userId)
                .append(separator)
                .append(this.userName);
        return sb.toString();
    }
}
  • com.hainiu.pojo.ViewHomePage
package com.hainiu.pojo;

import com.hainiu.utils.DateFormat;

public class ViewHomePage {
    private String ip;
    private String viewTime;
    private String refer;
    private String browse;
    private String userId;
    private String userName;

    public String toStr(String line){
        String[] cols = line.split("\u0001");
        this.ip = cols[0];
        this.viewTime = cols[2];
        this.refer = cols[7];
        this.browse = cols[8];
        this.userId = cols[10];
        this.userName = cols[11];
        String separator = "\t";
        StringBuilder sb = new StringBuilder();
        sb.append(this.ip)
                .append(separator)
                .append(DateFormat.dateFormat(this.viewTime))
                .append(separator)
                .append(this.refer)
                .append(separator)
                .append(browse)
                .append(separator)
                .append(userId)
                .append(separator)
                .append(userName);
        return sb.toString();
    }
}

工具类

  • com.hainiu.utils.ConfigurationUtil
package com.hainiu.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.io.File;

public class ConfigurationUtil {

    public static Configuration getConfiguration(String confPath) {
        Configuration configuration = new YarnConfiguration();
        configuration.addResource(new Path(confPath + File.separator + "core-site.xml"));
        configuration.addResource(new Path(confPath + File.separator + "hdfs-site.xml"));
        configuration.addResource(new Path(confPath + File.separator + "mapred-site.xml"));
        configuration.addResource(new Path(confPath + File.separator + "yarn-site.xml"));
        configuration.setBoolean("dfs.support.append", true);
        configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
        return configuration;
    }

    public static Configuration getConfigurationLocal(String confPath) {
        Configuration configuration = new YarnConfiguration();
        configuration.setBoolean("dfs.support.append", true);
        configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
        return configuration;
    }
}
  • com.hainiu.utils.DateFormat
package com.hainiu.utils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

public class DateFormat {
    /**
     * 格式化日期字符串(24/Jun/2021:03:56:02 +0800)为YYYY-MM-dd HH:mm:ss格式
     * @param dateStr
     * @return 返回YYYY-MM-dd HH:mm:ss格式日期字符串
     * @throws Exception
     */
    public static String dateFormat(String dateStr) {
        String dateFormat = "dd/MMM/YYYY:HH:mm:ss +0800";
        SimpleDateFormat sdf1 = new SimpleDateFormat(dateFormat, Locale.US);
        Date parse = null;
        try {
            parse = sdf1.parse(dateStr);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        String standardFormat = "YYYY-MM-dd HH:mm:ss";
        SimpleDateFormat sdf2 = new SimpleDateFormat(standardFormat);
        String standardDate = sdf2.format(parse);
        return standardDate;
    }

    /**
     * 格式化日期字符串(24/Jun/2021:03:56:02 +0800)为YYYY-MM-dd格式
     * @param dateStr
     * @return 返回YYYY-MM-dd格式
     * @throws Exception
     */
    public static String getDate(String dateStr) {
        String dateFormat = "dd/MMM/YYYY:HH:mm:ss +0800";
        SimpleDateFormat sdf1 = new SimpleDateFormat(dateFormat, Locale.US);
        Date parse = null;
        try {
            parse = sdf1.parse(dateStr);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        String standardFormat = "YYYY-MM-dd";
        SimpleDateFormat sdf2 = new SimpleDateFormat(standardFormat);
        return sdf2.format(parse);
    }
}

日志样例

链接: https://pan.baidu.com/s/1eo5kt0UrbY2ciPGTvfoeaQ 密码: 7331

打包运行

集群运行

hadoop jar coolniu.jar /input/access_shop.log /output/

file

可以将输出目录替换为跑批日期,代码已经实现按不同模块多目录输出。

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-犀牛,http://hainiubl.com/topics/75733
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter