上图所示,不同的文件时 通过 两个数据有不同的列来区分的
如果 恰好两个表都有相同的列 ,我们如何区分?
上图所示,不同的文件时 通过 两个数据有不同的列来区分的
如果 恰好两个表都有相同的列 ,我们如何区分?
多目录输入配置即可
@羽翔 给你两种多目录输入的方法,你参考一下
第一种:
/**
* InnerJoin.java
* com.hainiu.mapreducer.mr
* Copyright (c) 2017, 海牛版权所有.
* @author 青牛
*/
package com.hainiu.mapreducer.mr;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* mr实现类似于sql的innerjoin
*
* @author 青牛
* @Date 2017年8月28日
*/
public class InnerJoin extends Configured implements Tool {
public static final String SIGN1 = "\t";
public static final String SIGN2 = "\001";
public static class InnerJoinMapper extends Mapper<LongWritable, Text, Text, WordWritable> {
private Text outKey = new Text();
private WordWritable outValue = new WordWritable();
/**
* 可以从这个map任务的输入文件的目录名称来判断是属于那类数据从而进行数据的分类
* map任务中使用context对象可以获得本次任务的输入地址
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String path = inputSplit.getPath().toString();
if (path.contains("minout")) {
outValue.setType("1");
} else if (path.contains("maxout")) {
outValue.setType("2");
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String strs[] = str.split(SIGN1);
if (strs.length != 2) {
return;
}
String word = strs[0];
Long num = Long.parseLong(strs[1]);
outKey.set(word);
outValue.setN(num);
outValue.setWord(outKey);
context.write(outKey, outValue);
}
}
public static class InnerJoinReducer extends Reducer<Text, WordWritable, Text, Text>{
private Text outValue = new Text();
private List<Long> firstList = new ArrayList<Long>();
private List<Long> secondList = new ArrayList<Long>();
@Override
protected void reduce(Text key, Iterable<WordWritable> value,Context context) throws IOException, InterruptedException {
//注意进行缓存的清理,不然下一个key的数据也会被追加到每次key的后面
firstList.clear();
secondList.clear();
for(WordWritable wordWritable:value){
if(wordWritable.getType().equals("1")){
firstList.add(wordWritable.getN());
}else{
secondList.add(wordWritable.getN());
}
}
//时行数据的拼接输出
for(Long max:secondList){
for(Long min:firstList){
outValue.set(max + SIGN1 + min);
context.write(key, outValue);
}
}
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
//定义Job名字并设置任务配置
Job job = Job.getInstance(conf, "innerjoin");
//设置Jar使用的Class
job.setJarByClass(InnerJoin.class);
//设置使用的Mapper Class
job.setMapperClass(InnerJoinMapper.class);
//设置使用的Reducer Class
job.setReducerClass(InnerJoinReducer.class);
//设置mapper任务的输出value类型
job.setMapOutputValueClass(WordWritable.class);
//设置任务的输出Key类型
job.setOutputKeyClass(Text.class);
//调协任务的输出Value类型
job.setOutputValueClass(Text.class);
//设置任务的输入地址,可以设置多个目录为输入,用逗号隔开/tmp/mulitipleoutmaxmin/maxout,/tmp/mulitipleoutmaxmin/minout
FileInputFormat.addInputPaths(job, args[0]);;
//设置任务的输出地址,对应的是一个目录
Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
//删除输出目录
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputDir)) {
fs.delete(outputDir, true);
System.out.println("out put delete finish");
}
//等待任务执行完成
int i = job.waitForCompletion(true) ? 0 : 1;
return i;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new InnerJoin(), args));
}
}
第二种:
/**
* InnerJoin.java
* com.hainiu.mapreducer.mr
* Copyright (c) 2017, 海牛版权所有.
* @author 青牛
*/
package com.hainiu.mapreducer.mr;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* mr实现类似于sql的innerjoin
*
* @author 青牛
* @Date 2017年8月28日
*/
public class InnerJoin2 extends Configured implements Tool {
public static final String SIGN1 = "\t";
public static final String SIGN2 = "\001";
public static class InnerJoinMaxMapper extends Mapper<LongWritable, Text, Text, WordWritable> {
private Text outKey = new Text();
private WordWritable outValue = new WordWritable();
/**
* 根据任务设置不同的数据输出类型
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
outValue.setType("2");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String strs[] = str.split(SIGN1);
if (strs.length != 2) {
return;
}
String word = strs[0];
Long num = Long.parseLong(strs[1]);
outKey.set(word);
outValue.setN(num);
outValue.setWord(outKey);
context.write(outKey, outValue);
}
}
public static class InnerJoinMinMapper extends Mapper<LongWritable, Text, Text, WordWritable> {
private Text outKey = new Text();
private WordWritable outValue = new WordWritable();
/**
* 根据任务设置不同的数据输出类型
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
outValue.setType("1");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String strs[] = str.split(SIGN1);
if (strs.length != 2) {
return;
}
String word = strs[0];
Long num = Long.parseLong(strs[1]);
outKey.set(word);
outValue.setN(num);
outValue.setWord(outKey);
context.write(outKey, outValue);
}
}
public static class InnerJoinReducer extends Reducer<Text, WordWritable, Text, Text> {
private Text outValue = new Text();
private List<Long> firstList = new ArrayList<Long>();
private List<Long> secondList = new ArrayList<Long>();
@Override
protected void reduce(Text key, Iterable<WordWritable> value, Context context)
throws IOException, InterruptedException {
//注意进行缓存的清理,不然下一个key的数据也会被追加到每次key的后面
firstList.clear();
secondList.clear();
for (WordWritable wordWritable : value) {
if (wordWritable.getType().equals("1")) {
firstList.add(wordWritable.getN());
} else {
secondList.add(wordWritable.getN());
}
}
//时行数据的拼接输出
for (Long max : secondList) {
for (Long min : firstList) {
outValue.set(max + SIGN1 + min);
context.write(key, outValue);
}
}
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
//定义Job名字并设置任务配置
Job job = Job.getInstance(conf, "innerjoin2");
//设置Jar使用的Class
job.setJarByClass(InnerJoin2.class);
//设置使用的Reducer Class
job.setReducerClass(InnerJoinReducer.class);
//设置mapper任务的输出value类型
job.setMapOutputValueClass(WordWritable.class);
//设置任务的输出Key类型
job.setOutputKeyClass(Text.class);
//调协任务的输出Value类型
job.setOutputValueClass(Text.class);
//设置任务的输入地址,并设置每个地址使用的mapper
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, InnerJoinMaxMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, InnerJoinMinMapper.class);
//设置任务的输出地址,对应的是一个目录
Path outputDir = new Path(args[2]);
FileOutputFormat.setOutputPath(job, outputDir);
//删除输出目录
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputDir)) {
fs.delete(outputDir, true);
System.out.println("out put delete finish");
}
//等待任务执行完成
int i = job.waitForCompletion(true) ? 0 : 1;
return i;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new InnerJoin2(), args));
}
}