想把Hadoop的测试工具terasort改成可压缩的格式,
TeraGen类添加了
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
job.setOutputFormatClass(TeraOutputFormat.class);
TeraOutputFormat类也已经参照TextOutputFormat类改成了可支持压缩
public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
private OutputCommitter committer = null;
private static final byte[] newline = "\n".getBytes();
/**
-
Set the requirement for a final sync before the stream is closed.
*/
static void setFinalSync(JobContext job, boolean newValue) {
job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
}/**
-
Does the user want a final sync at close?
*/
public static boolean getFinalSync(JobContext job) {
return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false);
}static class TeraRecordWriter extends RecordWriter<Text,Text> {
private boolean finalSync = false;
private DataOutputStream out;private final byte[] keyValueSeparator; public TeraRecordWriter(DataOutputStream out,JobContext job,String keyValueSeparator) { finalSync = getFinalSync(job); this.keyValueSeparator = keyValueSeparator.getBytes(); this.out = out; } public synchronized void write(Text key,Text value) throws IOException { out.write(key.getBytes(), 0, key.getLength()); out.write(value.getBytes(), 0, value.getLength()); out.write(newline); } public void close(TaskAttemptContext context) throws IOException { if (finalSync) { if (out instanceof Syncable) { ((Syncable)out).hsync(); } } out.close(); }
}
@Override
public void checkOutputSpecs(JobContext job) throws InvalidJobConfException, IOException {
// Ensure that the output directory is set
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}final Configuration jobConf = job.getConfiguration(); // get delegation token for outDir's file system TokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { outDir }, jobConf); final FileSystem fs = outDir.getFileSystem(jobConf); if (fs.exists(outDir)) { // existing output dir is considered empty iff its only content is the // partition file. // final FileStatus[] outDirKids = fs.listStatus(outDir); boolean empty = false; if (outDirKids != null && outDirKids.length == 1) { final FileStatus st = outDirKids[0]; final String fname = st.getPath().getName(); empty =!st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);//_partition.lst } if (TeraSort.getUseSimplePartitioner(job) || !empty) {//mapreduce.terasort.simplepartitioner throw new FileAlreadyExistsException("Output directory " + outDir+ " already exists"); } }
}
public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job) throws IOException {
Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get("mapreduce.output.teraoutputformat.separator", "\t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension();//获取后缀名 } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(job.getConfiguration()); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file); return new TeraRecordWriter(fileOut, job,keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file); return new TeraRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)),job,keyValueSeparator); }
}
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}}
最终HDFS中还是没有实现生成.gz格式的文件
?????????????