1.代码git仓库地址 https://gitee.com/alibabaMapengfei/bigdata.git
2.线程池使用了自定义线程池,excutor有事创建会造成线程池泄露
需要markdown笔记的,留言我发给你
MapRedece 视频18
1.本地hadoop环境
#1.修改jdk的路径
etc hadoop-env.cmd
#2.配置hadoop环境变量
系统环境变量下
%MAVEN_HOME%\bin
%MAVEN_HOME%\sbin
HADOOP_HOME xxxxx本地hadoop解压后路径
hadoop数据类型
Java**类型** | Hadoop Writable**类型** |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
第一个wordcount程序
package org.prac.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @Author ALIENWARE
* @Date 2021/2/22 20:35
* @Version 1.0
*/
public class MapReduce01 extends Configured implements Tool {
//定义数据分隔符
private static final String SPRLIT_STR = "\t";
//map
//inputformat -->(一行数据的起始 和一行数据的值) 0 aabbc--> a 1 a 1 b 1 b 1 c 1
private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{
// 定义map函数需要用到的变量
private Text outkey = new Text(); //map给reduce的输出key
private LongWritable outvalue = new LongWritable(1); //map给reduce得value
private String[] strs = null; //定义数据
//手动实现map方法
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException
{
//map中不要定义变量 父类中会一行一行执行,消耗资源
//1.拆分数据 value -->读取到的数据
strs = value.toString().split(SPRLIT_STR);
//2.遍历数据
for (String s : strs) {
outkey.set(s);
outvalue.set(1);
context.write(outkey, outvalue);
}
}
}
//reduce阶段
/**
* a 1 a 1 b 1 b 1 c 1
* a 1 a 1 b 1 b 1 c 1
* map --> reduce 聚合
* 相同的key value -- >list
* reduce聚合后 累加操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>
{
//定义reduce需要用到的环境变量
private LongWritable outval = new LongWritable();
private Long sum = 0L;//map传过来的累加的数据
/**
* @param values map传过来reduce聚合后的list
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void reduce(Text outkey, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//1. 初始化累加的变量
sum = 0L;
//2. 进行值的累加
for (LongWritable value : values) {
sum += value.get();
}
//3.保存累加值
outval.set(sum);
//4. 输出
context.write(outkey, outval);
}
}
//mapreduce主要执行的任务
@Override
public int run(String[] args)
{
try {
//获取已经加载好的配置的conf
Configuration conf = this.getConf();
//编写本次job
Job job = Job.getInstance(conf);
//job开始进行 固定三部配置
//1. 类的配置 主执行类设置,谁有main方法就设置谁
job.setJarByClass(MapReduce01.class);
//设置数据的输入格式化类
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);//设置map
//如果map和 reduce的输出来星一样.可以用一个输出
//job.setMapOutputKeyClass(Text.class);//map key的输出 固定的
//job.setOutputValueClass(LongWritable.class);//map的 value输出 固定的
job.setReducerClass(MyReducer.class);//设置reduce
job.setOutputKeyClass(Text.class);//reduce的key
job.setOutputValueClass(LongWritable.class);//reduce的vcalue
job.setOutputFormatClass(TextOutputFormat.class);//设置输出
//2. 路径设置
//输入路径
//FileInputFormat.addInputPath(job,new Path(args[0]));
//保证输出路径必须没有
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(out)){
fs.delete(out,true );
System.out.println(job.getJobName() + "路径已经被删除了!");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job,out);
// 3.执行配置
long start = System.currentTimeMillis();
boolean cons = job.waitForCompletion(true);
long end = System.currentTimeMillis();
String msg = "job状态" + (cons? "SUCCESS!":"FAILE!");
System.out.println(msg);
System.out.println(Math.abs(end-start)/1000+"秒!");
}catch (Exception e){
e.printStackTrace();
}
return 0;
}
//运行mapreduce
/**
* mapreduce运行流程
* 1.ToolRunner.run 获取tool.getConf() tool接口的configretion
* 2.extends Configured 获取Configuration 对象,加载hadoop配置文件
* 3.ToolRunner.run接管mapreduce执行,进行参数设置
*/
public static void main(String[] args)
{
try {
System.exit(ToolRunner.run(new MapReduce01(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
//使用idea编写
//edit configruation 添加路径
//E:\MAPREDUCEFILE\FILE\INPUT\file1 E:\MAPREDUCEFILE\FILE\OUTPUT
MapReduce优化
counter的使用
统计数据的好坏
package org.prac.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* mapreduce优化1
* @desc counter的使用 -->统计符合需求的数据
* @Author ALIENWARE
* @Date 2021/2/23 20:25
* @Version 1.0
*/
public class MyWordCount02 extends Configured implements Tool {
private static final String SPRLIT_STR = "\t";//分隔符
protected static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
//定义map需要用到的变量
private Text outkey = new Text();
private LongWritable outval = new LongWritable(1);
private String[] strs = null; //map的数据
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException
{
try{
//拆分数据
// map 进行数的拆分
strs = value.toString().split(SPRLIT_STR);
// map使用ounter
context.getCounter("Line Quality Statistics", "Total Line Count").increment(1);
// 业务判断 是好 还是 坏
if(null != strs && strs.length == 3){
// 好
context.getCounter("Line Quality Statistics", "Nice Line Count").increment(1);
outkey.set(strs[2].trim());
// 输出
context.write(outkey, outval);
}else{
// 坏
context.getCounter("Line Quality Statistics", "Bad Line Count").increment(1);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
private static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
// 创建reduce函数需要用到的变量
private LongWritable outval = new LongWritable();
private long sum = 0L;
@Override
protected void reduce(Text outkey, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
// 数据累加
sum = 0;
// 遍历累加
for (LongWritable l : values) {
sum+=l.get();
}
// 赋值
outval.set(sum);
// 输出
context.write(outkey, outval);
}
}
@Override
public int run(String[] args)
{
try {
//获取已经加载好的配置的conf
Configuration conf = this.getConf();
//编写本次job
Job job = Job.getInstance(conf);
//job开始进行 固定三部配置
//1. 类的配置 主执行类设置,谁有main方法就设置谁
job.setJarByClass(MyWordCount02.class);
//设置数据的输入格式化类
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);//设置map
//如果map和 reduce的输出来星一样.可以用一个输出
//job.setMapOutputKeyClass(Text.class);//map key的输出 固定的
//job.setOutputValueClass(LongWritable.class);//map的 value输出 固定的
job.setReducerClass(MyReducer.class);//设置reduce
job.setOutputKeyClass(Text.class);//reduce的key
job.setOutputValueClass(LongWritable.class);//reduce的vcalue
job.setOutputFormatClass(TextOutputFormat.class);//设置输出
//2. 路径设置
//输入路径
//FileInputFormat.addInputPath(job,new Path(args[0]));
//保证输出路径必须没有
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
fs.delete(out, true);
System.out.println(job.getJobName() + "路径已经被删除了!");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 3.执行配置
long start = System.currentTimeMillis();
//
boolean cons = job.waitForCompletion(true);
long end = System.currentTimeMillis();
String msg = "job状态" + (cons ? "SUCCESS!" : "FAILE!");
// 没有log4j的情况记录counter 展示counter
if (cons) {
//map task reduce task --> ci=ounter --> 执行完成之后 applicationmaster 统计
Counters counters = job.getCounters();
System.out.println(job.getJobName() + "'s counters count : " + counters.countCounters());
for (CounterGroup counter : counters) {
System.out.println("\t"+ counter.getDisplayName());
for (Counter counter1 : counter) {
System.out.println("\t\t"+counter1.getDisplayName() + "=" + counter1.getValue());
}
}
}
System.out.println(msg);
System.out.println(Math.abs(end-start)/1000+"秒!");
}catch (Exception e){
e.printStackTrace();
}
return 0;
}
public static void main(String[] args)
{
try {
System.exit(ToolRunner.run(new MyWordCount02(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
combiner实现
package org.prac.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* @Desc combiner实现
* @Author ALIENWARE
* @Date 2021/2/22 20:35
* @Version 1.0
*/
/**
* 注意 : combiner因为聚合的map的值 所以 适用场合有限 求极限值 max min sun count
* 不能用于求平局值
* 注意 : combiner是对map端的聚合
* 1.combiner 输入 = map输出
* 2.combiner 输出 == reduce输入
* 3.reduce 输入 = mapreduce
* 4.combiner 输入 = 输出 = map 输出
*/
public class MapCombiner03 extends Configured implements Tool {
//定义数据分隔符
private static final String SPRLIT_STR = "\t";
//map
//inputformat -->(一行数据的起始 和一行数据的值) 0 aabbc--> a 1 a 1 b 1 b 1 c 1
private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{
// 定义map函数需要用到的变量
private Text outkey = new Text(); //map给reduce的输出key
private LongWritable outvalue = new LongWritable(1); //map给reduce得value
private String[] strs = null; //定义数据
//手动实现map方法
/**
*
* @param key
* @param value 读取文件的值(输入的value值)
* @param context
*/
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException
{
//map中不要定义变量 父类中会一行一行执行,消耗资源
//1.拆分数据 value -->读取到的数据
strs = value.toString().split(SPRLIT_STR);
//2.遍历数据
//outkey -->map给reduce的key值
for (String s : strs) {
outkey.set(s);
outvalue.set(1);//设置读到单词的个数计数
context.write(outkey, outvalue);
}
}
}
//comnbiner
public static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
//定义reduce需要用到的环境变量
private LongWritable outval = new LongWritable();
private Long sum = 0L;//map传过来的累加的数据
@Override
public void reduce(Text outkey, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
/*
//1. 初始化累加的变量
sum = 0L;
//2. 进行值的累加
for (LongWritable value : values) {
sum += value.get();
}
//3.保存累加值
outval.set(sum);
//4. 输出
context.write(outkey, outval);*/
}
}
//reduce阶段
/**
* a 1 a 1 b 1 b 1 c 1
* a 1 a 1 b 1 b 1 c 1
* map --> reduce 聚合
* 相同的key value -- >list
* reduce聚合后 累加操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>
{
//定义reduce需要用到的环境变量
private LongWritable outval = new LongWritable();
private Long sum = 0L;//map传过来的累加的数据
/**
* @param values map传过来reduce聚合后的list
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void reduce(Text outkey, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//1. 初始化累加的变量
sum = 0L;
//2. 进行值的累加
for (LongWritable value : values) {
sum += value.get();
}
//3.保存累加值
outval.set(sum);
//4. 输出
context.write(outkey, outval);
}
}
//mapreduce主要执行的任务
@Override
public int run(String[] args)
{
try {
//获取已经加载好的配置的conf
Configuration conf = this.getConf();
//编写本次job
Job job = Job.getInstance(conf);
//job开始进行 固定三部配置
//1. 类的配置 主执行类设置,谁有main方法就设置谁
job.setJarByClass(MapCombiner03.class);
//设置数据的输入格式化类
job.setInputFormatClass(TextInputFormat.class);
job.setCombinerClass(MyCombiner.class);
job.setMapperClass(MyMapper.class);//设置map
//如果map和 reduce的输出来星一样.可以用一个输出
//job.setMapOutputKeyClass(Text.class);//map key的输出 固定的
//job.setOutputValueClass(LongWritable.class);//map的 value输出 固定的
job.setReducerClass(MyReducer.class);//设置reduce
job.setOutputKeyClass(Text.class);//reduce的key
job.setOutputValueClass(LongWritable.class);//reduce的vcalue
job.setOutputFormatClass(TextOutputFormat.class);//设置输出
//2. 路径设置
//输入路径
//FileInputFormat.addInputPath(job,new Path(args[0]));
//保证输出路径必须没有
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(out)){
fs.delete(out,true );
System.out.println(job.getJobName() + "路径已经被删除了!");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job,out);
// 3.执行配置
long start = System.currentTimeMillis();
//
boolean cons = job.waitForCompletion(true);
long end = System.currentTimeMillis();
String msg = "job状态" + (cons? "SUCCESS!":"FAILE!");
System.out.println(msg);
System.out.println(Math.abs(end-start)/1000+"秒!");
}catch (Exception e){
e.printStackTrace();
}
return 0;
}
//运行mapreduce
/**
* mapreduce运行流程
* 1.ToolRunner.run 获取tool.getConf() tool接口的configretion
* 2.extends Configured 获取Configuration 对象,加载hadoop配置文件
* 3.ToolRunner.run接管mapreduce执行,进行参数设置
*/
public static void main(String[] args)
{
try {
System.exit(ToolRunner.run(new MapCombiner03(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
//使用idea编写
//edit configruation 添加路径
//E:\MAPREDUCEFILE\FILE\INPUT\file1 E:\MAPREDUCEFILE\FILE\OUTPUT
partitioner应用(设置四个分区)
package org.prac.mapreduce;
import entity.StudentWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* 返回四个partitioner,给4个reduce
* partionar应用
* 开启4个reduce , map给到partionner分区,然后交给多个reduce处理
*/
public class MapPartitioner04 extends Configured implements Tool {
//定义数据分隔符
private static final String SPLIT_STR1 = "\t";
//map
//inputformat -->(一行数据的起始 和一行数据的值) 0 aabbc--> a 1 a 1 b 1 b 1 c 1
private static class MyMapper extends Mapper<LongWritable, Text, StudentWritable, NullWritable>{
// 定义map需要用到的环境变量
private StudentWritable outkey = new StudentWritable();
private String[] strs = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, StudentWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// map 进行数的拆分
strs = value.toString().split(SPLIT_STR1);
// 演示counter
context.getCounter("Line Quality Statistics", "Total Line Count").increment(1);
// 1 盖谦 2001-01-14 11480630 1 东城区第1中学 1 东城区 540
// 业务判断 是好 还是 坏
if(null != strs && strs.length == 9){
// 好
context.getCounter("Line Quality Statistics", "Nice Line Count").increment(1);
outkey.setExamNo(strs[4].trim());
outkey.setStudentName(strs[1].trim());
outkey.setBirthday(strs[2].trim());
outkey.setSchool(strs[5].trim());
outkey.setAreaName(strs[7].trim());
outkey.setScore(Integer.parseInt(strs[8].trim()));
context.write(outkey, NullWritable.get());
}else{
// 坏
context.getCounter("Line Quality Statistics", "Bad Line Count").increment(1);
}
}
}
//partitoner
private static class MyPartitioner extends Partitioner<StudentWritable, NullWritable> {
// 通过设置 getPartition 进行reduceID的获取
@Override
public int getPartition(StudentWritable key, NullWritable value, int numPartitions) {
// 一本 550 分以上 包含550分
if(key.getScore() >= 550){
return 0;
}
if(key.getScore() >=450 && key.getScore() < 550){
return 1;
}
if(key.getScore() >= 250 && key.getScore() < 450){
return 2;
}
return 3;
}
}
//mapreduce主要执行的任务
@Override
public int run(String[] args)
{
try {
//获取已经加载好的配置的conf
Configuration conf = this.getConf();
//编写本次job
Job job = Job.getInstance(conf);
//job开始进行 固定三部配置
//1. 类的配置 主执行类设置,谁有main方法就设置谁
job.setJarByClass(MapPartitioner04.class);
//设置数据的输入格式化类
job.setInputFormatClass(TextInputFormat.class);
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(4);
job.setMapperClass(MyMapper.class);//设置map
job.setOutputKeyClass(StudentWritable.class);//reduce的key
job.setOutputValueClass(NullWritable.class);//reduce的vcalue
job.setOutputFormatClass(TextOutputFormat.class);//设置输出
//2. 路径设置
//输入路径
//FileInputFormat.addInputPath(job,new Path(args[0]));
//保证输出路径必须没有
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(out)){
fs.delete(out,true );
System.out.println(job.getJobName() + "路径已经被删除了!");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job,out);
// 3.执行配置
long start = System.currentTimeMillis();
//
boolean cons = job.waitForCompletion(true);
long end = System.currentTimeMillis();
String msg = "job状态" + (cons? "SUCCESS!":"FAILE!");
System.out.println(msg);
System.out.println(Math.abs(end-start)/1000+"秒!");
}catch (Exception e){
e.printStackTrace();
}
return 0;
}
//运行mapreduce
/**
* mapreduce运行流程
* 1.ToolRunner.run 获取tool.getConf() tool接口的configretion
* 2.extends Configured 获取Configuration 对象,加载hadoop配置文件
* 3.ToolRunner.run接管mapreduce执行,进行参数设置
*/
public static void main(String[] args)
{
try {
System.exit(ToolRunner.run(new MapPartitioner04(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
//设置reduce输出压缩设置 (3种方式)
// -Dmapreduce.output.fileoutputformat.compress=true
// -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec 输入路径/文件 输出路径
/*设置一个参数有三种方法
1. -D参数传递数据 针对的是针对本次运行的job 适中
2. conf.set(key,val) 针对具体MapReduce 影响范围最小
3. xml 参数传递
他们的参数加载顺序是这样的
xml 最先被加载
-D会重写xml的同名属性
conf.set会重写-D的同名属性*/
mapreduce 去重实现
package org.prac.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* mapreduce 去重实现 ,驱虫的数据放到mapreduce的key上,value输出nullwritable空值
*/
public class MapReduceDistinct05 extends Configured implements Tool {
//定义数据分隔符
private static final String SPRLIT_STR = "\t";
private static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
// 定义map需要用到的环境变量
private Text outkey = new Text();
private String[] strs = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// map 进行数的拆分
strs = value.toString().split(SPRLIT_STR);
// 遍历
for (String s : strs) {
outkey.set(s);
context.write(outkey, NullWritable.get());
}
}
}
//reduce阶段
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable>
{
//定义reduce需要用到的环境变量
private LongWritable outval = new LongWritable();
private Long sum = 0L;//map传过来的累加的数据
@Override
public void reduce(Text outkey, Iterable<NullWritable> values,
Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
context.write(outkey, NullWritable.get());
}
}
//mapreduce主要执行的任务
@Override
public int run(String[] args)
{
try {
//获取已经加载好的配置的conf
Configuration conf = this.getConf();
//编写本次job
Job job = Job.getInstance(conf,"distinct");
//job开始进行 固定三部配置
//1. 类的配置 主执行类设置,谁有main方法就设置谁
job.setJarByClass(MapReduceDistinct05.class);
//设置数据的输入格式化类
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);//设置map
job.setReducerClass(MyReducer.class);//设置reduce
job.setOutputKeyClass(Text.class);//reduce的key
job.setOutputValueClass(NullWritable.class);//reduce的vcalue
job.setOutputFormatClass(TextOutputFormat.class);//设置输出
//2. 路径设置
//输入路径
//FileInputFormat.addInputPath(job,new Path(args[0]));
//保证输出路径必须没有
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(out)){
fs.delete(out,true );
System.out.println(job.getJobName() + "路径已经被删除了!");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job,out);
// 3.执行配置
long start = System.currentTimeMillis();
//
boolean cons = job.waitForCompletion(true);
long end = System.currentTimeMillis();
String msg = "job状态" + (cons? "SUCCESS!":"FAILE!");
System.out.println(msg);
System.out.println(Math.abs(end-start)/1000+"秒!");
}catch (Exception e){
e.printStackTrace();
}
return 0;
}
//运行mapreduce
/**
* mapreduce运行流程
* 1.ToolRunner.run 获取tool.getConf() tool接口的configretion
* 2.extends Configured 获取Configuration 对象,加载hadoop配置文件
* 3.ToolRunner.run接管mapreduce执行,进行参数设置
*/
public static void main(String[] args)
{
try {
System.exit(ToolRunner.run(new MapReduceDistinct05(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
//使用idea编写
//edit configruation 添加路径
//E:\MAPREDUCEFILE\FILE\INPUT\file1 E:\MAPREDUCEFILE\FILE\OUTPUT
mapreduce 最大值最小值实现(利用combiner获取最大最小值)
/**
1.利用map读取数据, map key设置为x, 数据给到value
2.利用combiner 获取最大最小值
3.利用reduce输出
*/
package org.prac.mapreduce;
import java.io.IOException;
import java.text.SimpleDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* mapreduce 最大值最小值实现
*/
public class MapReduceManMin06 extends Configured implements Tool {
private static final String SPLIT_STR1 = "\t";
private static final String SPLIT_STR2 = "\001";
private static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
// 定义map需要用到的环境变量 //这个x就是map传给reduce的key值
private Text outkey = new Text("x");
private Text outval = new Text();
private String[] strs = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// map 进行数的拆分
strs = value.toString().split(SPLIT_STR1);
// 2018-01-01 001616528 236701 强力VC银翘片 6.0 82.8 69.0
context.getCounter("Line Quality Statistics", "Total Line Count").increment(1);
if (null != strs && strs.length == 7) {
context.getCounter("Line Quality Statistics", "Nice Line Count").increment(1);
// 3 和 6
outval.set(strs[3].trim() + SPLIT_STR2 + strs[6].trim());
context.write(outkey, outval);
} else {
context.getCounter("Line Quality Statistics", "Bad Line Count").increment(1);
}
}
}
private static class MyCombiner extends Reducer<Text, Text, Text, Text> {
private Text outval = new Text();
// 冒泡排序
private String current_key = "";
private Double current_val = 0D;
private String max_key = "";
private Double max_val = 0D;
private String min_key = "";
private Double min_val = 0D;
@Override
protected void reduce(Text outkey, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// 遍历数据
for (Text t : values) {
current_key = t.toString().split(SPLIT_STR2)[0].trim();
current_val = Double.parseDouble(t.toString().split(SPLIT_STR2)[1].trim());
// 冒泡
if (current_val >= max_val) {
max_val = current_val;
max_key = current_key;
}
if (current_val <= min_val || min_val == 0) {
min_val = current_val;
min_key = current_key;
}
}
outval.set(max_key + SPLIT_STR2 + max_val);
context.write(outkey, outval);
outval.set(min_key + SPLIT_STR2 + min_val);
context.write(outkey, outval);
}
}
private static class MyReducer extends Reducer<Text, Text, Text, DoubleWritable> {
private Text outkey = new Text();
private DoubleWritable outval = new DoubleWritable();
// 冒泡排序
private String current_key = "";
private Double current_val = 0D;
private String max_key = "";
private Double max_val = 0D;
private String min_key = "";
private Double min_val = 0D;
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
// 遍历数据
for (Text t : values) {
current_key = t.toString().split(SPLIT_STR2)[0].trim();
current_val = Double.parseDouble(t.toString().split(SPLIT_STR2)[1].trim());
// 冒泡
if (current_val >= max_val) {
max_val = current_val;
max_key = current_key;
}
if (current_val <= min_val || min_val == 0) {
min_val = current_val;
min_key = current_key;
}
}
outkey.set(max_key);
outval.set(max_val);
context.write(outkey, outval);
//每个context.write会输出一个数据
outkey.set(min_key);
outval.set(min_val);
context.write(outkey, outval);
}
}
@Override
public int run(String[] args) throws Exception {
// 创建本次的job
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "distinct");
// 设置 job
// 第一步设置类
job.setJarByClass(MapReduceManMin06.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(MyCombiner.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 第二步 设置路径
Path in = new Path(args[0]);
Path in1 = new Path(args[0]);
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
fs.delete(out, true);
System.out.println(job.getJobName() + "'s output dir is deleted!");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 第三步 设置执行
long start = System.currentTimeMillis();
boolean con = job.waitForCompletion(true);
long end = System.currentTimeMillis();
String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
System.out.println(msg);
return 0;
}
public static void main(String[] args) {
try {
System.exit(ToolRunner.run(new MapReduceManMin06(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
//使用idea编写
//edit configruation 添加路径
//E:\MAPREDUCEFILE\FILE\INPUT\file1 E:\MAPREDUCEFILE\FILE\OUTPUT
多目录输出reduce结果
package org.prac.mapreduce;
import java.io.IOException;
import entity.DrugWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 序列化方式实现最大值最小值
* 多目录输出reduce结果
*/
public class MyMRDemo7 extends Configured implements Tool {
private static final String SPLIT_STR1 = "\t";
private static class MyMapper extends Mapper<LongWritable, Text, DrugWritable, NullWritable> {
// 定义map需要用到的环境变量
private DrugWritable outkey = new DrugWritable();
private String[] strs = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DrugWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// map 进行数的拆分
strs = value.toString().split(SPLIT_STR1);
// 2018-01-01 001616528 236701 强力VC银翘片 6.0 82.8 69.0
context.getCounter("Line Quality Statistics", "Total Line Count").increment(1);
if (null != strs && strs.length == 7) {
context.getCounter("Line Quality Statistics", "Nice Line Count").increment(1);
// 3 和 6
outkey.setName(strs[3].trim());
outkey.setPay(Double.parseDouble(strs[6].trim()));
context.write(outkey, NullWritable.get());
} else {
context.getCounter("Line Quality Statistics", "Bad Line Count").increment(1);
}
}
}
//combiner
private static class MyCombiner extends Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable> {
private DrugWritable outkey = new DrugWritable();
// 冒泡排序
private String current_key = "";
private Double current_val = 0D;
private String max_key = "";
private Double max_val = 0D;
private String min_key = "";
private Double min_val = 0D;
@Override
protected void reduce(DrugWritable key, Iterable<NullWritable> values, Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
current_key = key.getName();
current_val = key.getPay();
if(current_val >= max_val){
max_key = current_key;
max_val = current_val;
}
if(current_val <= min_val || min_val == 0){
min_val = current_val;
min_key = current_key;
}
}
@Override
protected void cleanup(Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
outkey.setName(max_key);
outkey.setPay(max_val);
// 统一输出
context.write(outkey, NullWritable.get());
outkey.setName(min_key);
outkey.setPay(min_val);
// 统一输出
context.write(outkey, NullWritable.get());
}
}
//reduce
private static class MyReducer extends Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable> {
private DrugWritable outkey = new DrugWritable();
//多目录输出
private MultipleOutputs<DrugWritable,NullWritable> outputs = null;
@Override
protected void setup(Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// 实例化 多目录输出第一步
outputs = new MultipleOutputs<>(context);
}
// 冒泡排序
private String current_key = "";
private Double current_val = 0D;
private String max_key = "";
private Double max_val = 0D;
private String min_key = "";
private Double min_val = 0D;
@Override
protected void reduce(DrugWritable key, Iterable<NullWritable> values, Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
current_key = key.getName();
current_val = key.getPay();
if(current_val >= max_val){
max_key = current_key;
max_val = current_val;
}
if(current_val <= min_val || min_val == 0){
min_val = current_val;
min_key = current_key;
}
}
//MapReduce使用cleanup()方法实现排序筛选后输出
/**
* 背景
* MapReduce的map和reduce方法有一个局限性,就是map()方法每次只处理一行,而reduce()方法每次只处理一组。
* 并且reduce一般都是将处理每一组数据后的结果都写出。但有时候想要只输出一部分结果,
* 比如在Wordcount程序中,想要输出单词数量前三的统计信息,这时就可以用cleanup()方法来实现。
*/
@Override
protected void cleanup(Reducer<DrugWritable, NullWritable, DrugWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
outkey.setName(max_key);
outkey.setPay(max_val);
//多目录输出 第二步
outputs.write(outkey, NullWritable.get(), "max/maxval");
//统一输出
context.write(outkey, NullWritable.get());
outkey.setName(min_key);
outkey.setPay(min_val);
//统一输出
context.write(outkey, NullWritable.get());
// 多目录输出
outputs.write(outkey, NullWritable.get(), "min/minval");
// 如果用完了记得关闭这个流 要不报错
if(null != outputs){
outputs.close();
}
}
}
@Override
public int run(String[] args) throws Exception {
// 创建本次的job
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "distinct");
// 设置 job
// 第一步设置类
job.setJarByClass(MyMRDemo7.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
//3.使用combiner处理输出给到reduce
//这里不要设置为reduce,否则会产生多个输出文件
job.setCombinerClass(MyCombiner.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(DrugWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 第二步 设置路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
// hadoop fs -rm -r -skipTrash --> 递归删除
fs.delete(out, true);
System.out.println(job.getJobName() + "'s output dir is deleted!");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 第三步 设置执行
boolean con = job.waitForCompletion(true);
String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
System.out.println(msg);
return 0;
}
public static void main(String[] args) {
try {
System.exit(ToolRunner.run(new MyMRDemo7(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
内连接查询
利用多目录输入设置,读取到两个map的信息,通过分隔符,利用reduce切割数据,
package org.prac.mapreduce;
import java.io.IOException;
import entity.AreaWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 内连接查询
* 利用多目录输入设置,读取到两个map的信息,通过分隔符,利用reduce切割数据,
* 然后输出对应的结果
*/
public class MyMR7 extends Configured implements Tool {
private static final String SPLIT_STR1 = "\t";
private static final String SPLIT_STR2 = "\001";
// 处理地区
private static class MyMapper1 extends Mapper<LongWritable, Text, IntWritable, Text> {
// 创建map需要用到的变量
private IntWritable outkey = new IntWritable();
private Text outval = new Text();
private String[] strs = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
// 数据整理
strs = value.toString().split(SPLIT_STR1); // [1,北京]
// 防御式编程
if(null != strs && strs.length == 2){
outkey.set(Integer.parseInt(strs[0].trim()));
outval.set("a" + SPLIT_STR2 + strs[1].trim());
// 输出
context.write(outkey, outval);
}
}
}
private static class MyMapper2 extends Mapper<LongWritable, Text, IntWritable, Text> {
// 创建map需要用到的变量
private IntWritable outkey = new IntWritable();
private Text outval = new Text();
private String[] strs = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
// 数据整理
strs = value.toString().split(SPLIT_STR1); // [1,2010,1900]
// 防御式编程
if(null != strs && strs.length == 3){
outkey.set(Integer.parseInt(strs[0].trim()));
outval.set("b" + SPLIT_STR2 + strs[1].trim() + SPLIT_STR1 + strs[2].trim());
// 输出
context.write(outkey, outval);
}
}
}
private static class MyReducer extends Reducer<IntWritable, Text, AreaWritable, NullWritable> {
// 创建reduce需要用到的变量
private AreaWritable outkey = new AreaWritable();
private String tmp = "";
private String aname = "";
private String[] strs = null;
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Reducer<IntWritable, Text, AreaWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// 初始化变量
tmp = "";
// 循环变量
for (Text t : values) {
tmp+=t.toString() + ",";
}
// 1. 判断需要的数据 同时包含a 和 b的数据
if(tmp.indexOf("a") > -1 && tmp.indexOf("b") > -1){
// 获取地区名称
aname = tmp.substring(tmp.indexOf("a"));
aname = aname.substring(2,aname.indexOf(","));
strs = tmp.substring(tmp.indexOf("b")).split(",");
// 循环
for (String s : strs) {
if(s.startsWith("b")){ // b\0012010\t1900
// 年份和就业数据
outkey.setAid(key.get());
outkey.setAname(aname);
outkey.setYear(Integer.parseInt(s.split(SPLIT_STR2)[1].trim().split(SPLIT_STR1)[0].trim()));
outkey.setCount(Long.parseLong(s.split(SPLIT_STR2)[1].trim().split(SPLIT_STR1)[1].trim()));
context.write(outkey, NullWritable.get());
}
}
}
}
}
@Override
public int run(String[] args) throws Exception {
// 创建本次的job
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "innerjoin");
// 设置 job
// 第一步设置类
job.setJarByClass(MyMRDemo7.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(AreaWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 第二步 设置路径
Path out = new Path(args[args.length-1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
// hadoop fs -rm -r -skipTrash --> 递归删除
fs.delete(out, true);
System.out.println(job.getJobName() + "'s output dir is deleted!");
}
// 多目录输入的设置
// 用于处理 地区信息
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MyMapper1.class);
// 用于处理从业人数
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MyMapper2.class);
FileOutputFormat.setOutputPath(job, out);
// 第三步 设置执行
long start = System.currentTimeMillis();
boolean con = job.waitForCompletion(true);
long end = System.currentTimeMillis();
String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
System.out.println(msg);
System.out.println("JOB_COST : " + ((end - start) / 1000) + " SECONDS!");
return 0;
}
public static void main(String[] args) {
try {
System.exit(ToolRunner.run(new MyMRDemo7(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
mapreduce数据倾斜问题
//大大表 或者 大小表 内连接产生的问题
//解决方案 利用map处理数据
HDFS的分布式缓存技术 --> 小表数据 --> HDFS的分布式缓存 --> MAP --> 数据块计算 --> HDFS --> 小表数据发送给 这个要计算的 MAP --> setup --> 小表数据进行加载 --> 大表数据匹配 --> map端对数据进行清理
大小表 semijoin 实现
package org.prac.mapreduce;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import entity.AreaWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 大小表 semijoin 实现
* 1.管理员身份启动软件
* 2.参数语法 :
* -Dmapreduce.job.cache.files=小表所在的位置 项目的输入路径 项目的输出路径
* 3.map实现setup方法 通过分布式缓存加载到map里面,
* 4.给到map方法然后输出数据
*/
public class MyMRDemo8 extends Configured implements Tool {
private static class MyMapper extends Mapper<LongWritable, Text, AreaWritable, NullWritable> {
// 加载小表的数据 HDFS 分布式缓存 --> 发送你mapper执行的本地目录 --> 通过文件名获取文件
//解析到的小表数据file1 放到map里面
private static Map<Integer,String> areaMap = new HashMap<Integer,String>(16);
@Override
protected void setup(Mapper<LongWritable, Text, AreaWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// 获取文件位置 小表
URI uri = context.getCacheFiles()[0];
// 获取文件名
String fileName = uri.getPath().toString().substring(uri.getPath().toString().lastIndexOf("/")+1);
// 解析file1
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(fileName),"UTF-8"));
String line = null;
while((line = reader.readLine()) != null){
areaMap.put(Integer.parseInt(line.split("\t")[0].trim()),line.split("\t")[1].trim());
}
// 关闭
if(null != reader){
reader.close();
}
}
private String[] strs = null;
private String aname = null;
private AreaWritable outkey = new AreaWritable();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, AreaWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// 拆分数据
strs = value.toString().split("\t");
// 防御式编程
if(null != strs && strs.length == 3){
aname = areaMap.get(Integer.parseInt(strs[0].trim()));
if(null != aname && !"".equals(aname.trim())){
// 正常
outkey.setAid(Integer.parseInt(strs[0].trim()));
outkey.setAname(aname);
outkey.setYear(Integer.parseInt(strs[1].trim()));
outkey.setCount(Long.parseLong(strs[2].trim()));
context.write(outkey, NullWritable.get());
}
}
}
}
@Override
public int run(String[] args) throws Exception {
// 获取配置文件的加载对象
Configuration conf = this.getConf();
// 设置方法的返回值
Job job = Job.getInstance(conf, "semijoin");
// 设置类
job.setJarByClass(MyMRDemo8.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(AreaWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 第二步 设置路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
// hadoop fs -rm -r -skipTrash --> 递归删除
fs.delete(out, true);
System.out.println(job.getJobName() + "'s output dir is deleted!");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 第三步 设置执行
long start = System.currentTimeMillis();
boolean con = job.waitForCompletion(true);
long end = System.currentTimeMillis();
String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
System.out.println(msg);
System.out.println("JOB_COST : " + ((end - start) / 1000) + " SECONDS!");
return 0;
}
public static void main(String[] args) {
try {
System.exit(ToolRunner.run(new MyMRDemo8(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
mapreduce实现单reduce排序查询 -- 正序排列(key简单类型)
package org.prac.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import util.MyIntComparator;
/**
* mapreduce实现单reduce排序查询
*
*/
public class MyMRDemo9 extends Configured implements Tool {
private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
private IntWritable outkey = new IntWritable();
private Text outval = new Text();
private String[] strs = null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
strs = value.toString().split("\t");
outkey.set(Integer.parseInt(strs[0].trim()));
outval.set(strs[1].trim());
context.write(outkey, outval);
}
}
private static class MyReducer extends Reducer<IntWritable, Text, IntWritable, Text>{
private Text outval = new Text();
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Reducer<IntWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {
outval.set(values.iterator().next().toString());
context.write(key,outval);
}
}
@Override
public int run(String[] args) throws Exception {
// 获取配置文件
Configuration conf = this.getConf();
// 创建job
Job job = Job.getInstance(conf, "desc sort");
// 类设置
job.setJarByClass(MyMRDemo9.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 第二步 设置路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
// hadoop fs -rm -r -skipTrash --> 递归删除
fs.delete(out, true);
System.out.println(job.getJobName() + "'s output dir is deleted!");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 第三步 设置执行
long start = System.currentTimeMillis();
boolean con = job.waitForCompletion(true);
long end = System.currentTimeMillis();
String msg = "JOB_STATUS : " + (con ? "OK!" : "FAIL!");
System.out.println(msg);
System.out.println("JOB_COST : " + ((end - start) / 1000) + " SECONDS!");
return 0;
}
public static void main(String[] args) {
try {
System.exit(ToolRunner.run(new MyMRDemo9(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
mapreduce实现单reduce排序查询 -- 倒序排列(key简单类型)
package util;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* key 实现自定义比较器的时候
* @author Su
*
*/
public class MyIntComparator extends WritableComparator {
// 1. 告诉人家你需要比