测试工具 TeraSort 如何支持可压缩??

问答 菜鸟程序狗 ⋅ 于 2018-10-19 10:58:20 ⋅ 最后回复由 青牛 2018-10-20 19:13:47 ⋅ 3097 阅读

想把Hadoop的测试工具terasort改成可压缩的格式,
TeraGen类添加了
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
job.setOutputFormatClass(TeraOutputFormat.class);

TeraOutputFormat类也已经参照TextOutputFormat类改成了可支持压缩
public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
private OutputCommitter committer = null;
private static final byte[] newline = "\n".getBytes();
/**

  • Set the requirement for a final sync before the stream is closed.
    */
    static void setFinalSync(JobContext job, boolean newValue) {
    job.getConfiguration().setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
    }

    /**

  • Does the user want a final sync at close?
    */
    public static boolean getFinalSync(JobContext job) {
    return job.getConfiguration().getBoolean(FINAL_SYNC_ATTRIBUTE, false);
    }

    static class TeraRecordWriter extends RecordWriter<Text,Text> {
    private boolean finalSync = false;
    private DataOutputStream out;

    private final byte[] keyValueSeparator;
    public TeraRecordWriter(DataOutputStream out,JobContext job,String keyValueSeparator) {
      finalSync = getFinalSync(job);
      this.keyValueSeparator = keyValueSeparator.getBytes();
      this.out = out;
    }
    
    public synchronized void write(Text key,Text value) throws IOException {
        out.write(key.getBytes(), 0, key.getLength());
        out.write(value.getBytes(), 0, value.getLength());
        out.write(newline);
    }
    public void close(TaskAttemptContext context) throws IOException {
      if (finalSync) {
          if (out instanceof Syncable) {
              ((Syncable)out).hsync();
            }
      }
      out.close();
    }

    }

    @Override
    public void checkOutputSpecs(JobContext job) throws InvalidJobConfException, IOException {
    // Ensure that the output directory is set
    Path outDir = getOutputPath(job);
    if (outDir == null) {
    throw new InvalidJobConfException("Output directory not set in JobConf.");
    }

    final Configuration jobConf = job.getConfiguration();
    
    // get delegation token for outDir's file system
    TokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { outDir }, jobConf);
    
    final FileSystem fs = outDir.getFileSystem(jobConf);
    
    if (fs.exists(outDir)) {
      // existing output dir is considered empty iff its only content is the
      // partition file.
      //
      final FileStatus[] outDirKids = fs.listStatus(outDir);
      boolean empty = false;
      if (outDirKids != null && outDirKids.length == 1) {
        final FileStatus st = outDirKids[0];
        final String fname = st.getPath().getName();
        empty =!st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);//_partition.lst
      }
      if (TeraSort.getUseSimplePartitioner(job) || !empty) {//mapreduce.terasort.simplepartitioner
        throw new FileAlreadyExistsException("Output directory " + outDir+ " already exists");
      }
    }

    }

    public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job) throws IOException {

    Configuration conf = job.getConfiguration();
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator= conf.get("mapreduce.output.teraoutputformat.separator", "\t");
    CompressionCodec codec = null;
    String extension = "";
    if (isCompressed) {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
        codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
        extension = codec.getDefaultExtension();//获取后缀名
      }
    Path file = getDefaultWorkFile(job, extension);
    FileSystem fs = file.getFileSystem(job.getConfiguration());
    if (!isCompressed) {
        FSDataOutputStream fileOut = fs.create(file);
        return new TeraRecordWriter(fileOut, job,keyValueSeparator);
      } else {
        FSDataOutputStream fileOut = fs.create(file);
        return new TeraRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)),job,keyValueSeparator);
      }

    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
    if (committer == null) {
    Path output = getOutputPath(context);
    committer = new FileOutputCommitter(output, context);
    }
    return committer;
    }

    }
    最终HDFS中还是没有实现生成.gz格式的文件
    ?????????????

成为第一个点赞的人吧 :bowtie:
回复数量: 1
  • 青牛 海汼部落创始人,80后程序员一枚,曾就职于金山,喜欢倒腾技术做产品
    2018-10-20 19:13:47

    TeraSort的输出是正确的吗

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter