上次在做老苏的ETL项目的时候,为了方便,为了操作方便,老苏提供都是.gz文件和.tar.gz文件(至于这些怎么用xargs提取的,这里就不再详细说明) 大家都知道hdfs的命令并没有解压操作,如果我想在hdfs上解压400多个文件,该如何操作呢?此时我们就可以用一个非常牛逼的类叫做CompressionCodecFactory,里面提供的api可以帮我们实现
1.首先这个api只能对单个文件进行解压,并不能对一个文件夹下的所有文件操作,而且解压后的文件文件名和路径名也要给全才可以,因此我们需要先将这些文件追加到本地文件中:
lee2文件内容如下:
用sz命令导出lee2 文件到windows 中 再用notpad++ 操作一波:
去掉第一行数据,并将所有分隔符换成\t格式,
为了简化mr操作,我们把\也换成\t 分割,得到的文件是这样的
2.编写一个Mapreduce程序,将这个文件中的最后一个元素输出:
public class MR_LOG_CUT extends Configured implements Tool {
//-Dmapreduce.job.reduces=4
@Override
public int run(String[] args) throws Exception {
// 创建hadoop配置文件的加载对象
Configuration myConf = this.getConf();
Job job = Job.getInstance(myConf, "etl_cut");
job.setJarByClass(MR_LOG_CUT.class);
//数据片 -- 输入格式化 -- M --- R ---输出格式化
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
//reduce设置
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置输出格式化 文本
job.setOutputFormatClass(TextOutputFormat.class);
//设置I/O路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
//输出路径必须不存在
FileSystem fs = FileSystem.get(myConf);
if (fs.exists(out)) {
//true 表示 hadoop fs -rm -r -skipTrash
fs.delete(out, true);
System.out.println(job.getJobName() + "'s outputDir has been deleted ");
}
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
long start = System.currentTimeMillis();
//运行代码
boolean con = job.waitForCompletion(true);
long end = System.currentTimeMillis();
System.out.println(con ? "JOB_STATUS : SUCCESS" : "JOB_STATUS : FILED");
//运行时间
System.out.println("JOB_COST : " + (end - start) / 1000 + " SECONDS");
return 0;
}
private static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Text outKey = new Text(); //创建 key 对象 text类型
private NullWritable outval = NullWritable.get();
private String[] strs = null; //存储切割的字符串数组
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
strs = value.toString().split("\t"); //不需要key值,key值这是行号,无用
//最后一个元素就是我们需要的
outKey.set(strs[strs.length - 1]);
context.write(outKey, outval);
}
}
//reduce的输入是map的输出,输出根据实际情况定
private static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
//聚合输出,因为没重复的key所以不会聚合咯
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new MR_LOG_CUT(), args));
}
}
输出后的文件是这样的,然后我们新建一个解压类,放在resource目录下进行加压操作:
为什么要这么做呢,因为该方法只能一个文件一个文件地解压,所以我们需要一个一个遍历文件全路径名来读取每一个.gz/.tar.tar.gz文件,解压完成后我们还需要根据文件名来拼每一个文件的输出路径,所以先写一个根据文件名生成输出路径的工具类如图所示:
- 进行解压逻辑操作,主要思路是:先依次读取我们的压缩文件名文件 part-r-00000 ,再创建url 根据CompressionCodecFactory 来判断文件的压缩格式,进行解压,设置我们的解压后文件名,用IOUtils完成解压后输出,具体代码如下:
import com.lee.utils.FileOutputUtil;
import jline.internal.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
public class LeeFileDecompressor {
//实现批量解压数据
public static void main(String[] args) {
LeeFileDecompressor fd = new LeeFileDecompressor();
// 将文件加载到内存
InputStream in = null;
String filePath;
BufferedReader reader = null;
try {
//加载资源
in = LeeFileDecompressor.class.getClassLoader().getResourceAsStream("part-r-00000");
// 因为是字符流 所以用 BufferedReader 进行解析
reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
String line;
while ((line = reader.readLine()) != null) {
// 获取到数据
if (!"".equals(line.trim())) {
//构建文件路径,args[0]是输出路径名,需带 ‘/’
filePath = (args[0] + line).trim();
fd.UnzipOne(filePath);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != reader) {
reader.close();
}
if (null != in) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void UnzipOne(String fileDir) {
InputStream in = null;
OutputStream out = null;
String outputUri ;
FileSystem fs;
try {
Configuration conf = new Configuration();
// 输入路径,args[0] 给出
String uri = fileDir;
System.out.println(uri);
// 得到输入文件系统
fs = FileSystem.get(URI.create(uri), conf);
// 得到输入路径的Path格式
Path inputPath = new Path(uri);
// System.out.println("输入路径是:" + inputPath);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
// 检测输入路径是否存在压缩类型
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("未发现压缩格式哦: " + uri);
System.exit(1);
}
//注意,这里写的getOutputFile() 是获得在input的同级目录下新建一个output目录存放解压文件
if (uri.endsWith(".tar.gz")) {
//如果是 .tar.gz结尾 则直接去掉
outputUri =
CompressionCodecFactory.removeSuffix(FileOutputUtil.getOutputFile(uri), ".tar.gz");
} else{
outputUri =
//否则就是.gz结尾的直接去除即可
CompressionCodecFactory.removeSuffix(FileOutputUtil.getOutputFile(uri), ".gz");
}
// 创建输入流
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
//输出到input同级目录下的output文件中
IOUtils.copyBytes(in, out, conf);
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}
- 指明mainClass, 打jar包,直接运行,需要给出文件的输出路径,需要带上\:
查看运行的控制台输出:
去hdfs查看我们的输出文件,.tar.gz和 .gz 结尾的文件全部去掉了后缀,
至此,大功告成!
总结: 因为解压过程不需要跑mapreduce,所以速度比较快,如有疑问,请联系我