通过读快照将hfile装成orc文件的时候报错
报错提示: Can not create a Path from a null string
检查啦快照名称参数,orc在hdfs上的输出路径和临时存储路径
配置文件core-site.xml和hdfs-site.xml直接copy过来的
度快照转hfile的类(SnapshotHfile2Orcr)和任务链类(SnapshotHfile2OrcJob)自己写的代码是
详细图如下:
hdfs上的快照名字:
本地运行传参: SnapshotHfile2OrcJob -Dhbase.snapshot.name= hfile_snapshot1(快照名字) -Dtask.id=0930_jd -Dreadsanpshot.restore.path=/user/jixiaodong/hbase/restore(hdfs上的临时存储路径) -Dtask.base.dir=/user/jixiaodong/hbase/output(hdfs上ORC的输出) -Dhbase.zookeeper.quorum=nn1.hadoop:2181,nn2.hadoop:2181,s1.hadoop:2181 -Dzookeeper.znode.parent=/hbase1
package com.hainiu.mr;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.io.orc.OrcNewOutputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.hainiu.mr.base.BaseMR;
import com.hainiu.util.Constants;
import com.hainiu.util.OrcFormat;
import com.hainiu.util.OrcUtil;
import com.hainiu.util.Utils;
自己写的两个代码如下:
SnapshotHfile2Orc类:
/**
- TODO(读快照转orc格式的文件)
- TODO()
- @author 纪晓东
-
@Date 2020年9月29日
*/
public class SnapshotHfile2Orc extends BaseMR {public static class SanpshotHfile2OrcMapper extends Mapper<ImmutableBytesWritable,Result,NullWritable,Writable>{
// 后续使用对象,写除orc格式的数据
OrcUtil orcUtil = new OrcUtil();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
orcUtil.setOrcTypeWriteSchema(OrcFormat.SCHEMA_WRITE_AORC);
}
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
NullWritable nw = null;
Writable w = null;
// key中储存的是rowkey aid日期 value中存的是吗rowkey对应的列族信息;
String rowkey = key.toString();
String [] arr = rowkey.split("");
String aid = arr[0];
//struct<aid:string,pkgname:string,uptime:bigint,type:int,country:string,gpcategory:string>
String typestr = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("type")));
String pkgname = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("pkgname")));
String uptimestr = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("uptime")));
String country = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("country")));
String gpcategory = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("gpcategory")));// 输出查看 System.out.println("aid " + aid ); System.out.println("pkgname " + pkgname ); System.out.println("uptimestr " + uptimestr ); System.out.println("typestr " + typestr ); System.out.println("country " + country ); System.out.println("gpcategory" + gpcategory ); System.out.println("------------------------"); // 判空并进行 类型转换type 以及 uptaime long uptime = -1L; int type = -1; if(Utils.isNotEmpty(uptimestr)){ uptime = Long.parseLong(uptimestr); } if(Utils.isNotEmpty(typestr)){ type = Integer.parseInt(typestr); } // 写orc格式的文件 aid ,pkgname ,uptime ,type ,country ,gpcategory orcUtil.addAttr(aid,pkgname,uptime,type,country,gpcategory); w = orcUtil.serialize(); System.out.println("测试"); System.out.println("nw=" + nw); System.out.println("Nullwritable=" + NullWritable.get()); context.write(nw, w); }
}
@Override
public Job getJob(Configuration conf) throws IOException {
// 获得公共配置
Job job = Job.getInstance(conf, this.getJobNameWithTaskId());
job.setJarByClass(ScanHfile2Orc.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(OrcNewOutputFormat.class);// 范围 Scan scan = new Scan(); scan.setStartRow(toBytes("8d303")); scan.setStartRow(toBytes("8d305z")); // 范围设置 // SCAN = "hbase.mapreduce.scan" /user/jixiaodong/hbase/restore job.getConfiguration().set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan));
// Path restoreDir = new Path(conf.get(Constants.HBASE_TABLE_SNAPSHOT_RESTORE_PATH_ATTR));
Path restoreDir = new Path(conf.get(Constants.HBASE_TABLE_SNAPSHOT_PATH_ATTTR));
String snapshotName = conf.get(Constants.HBASE_SNAPSOT_NAME_ATTR);
// FileOutputFormat.setOutputPath(job, getJobOutputPath(getJobNameWithTaskId()));
FileOutputFormat.setOutputPath(job, getJobOutputPath(getJobNameWithTaskId()));
TableSnapshotInputFormat.setInput(job, snapshotName, restoreDir);
return job;
}
@Override
public String getJobName() {
return null;
}
}
SnapshotHfile2OrcJob类 任务链
package com.hainiu.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.hainiu.util.JobRunResult;
import com.hainiu.util.JobRunUtil;
/**
- TODO(读快照转hfile成orc文件的任务链类)
- TODO(这里描述这个类补充说明 – 可选)
- @author 纪肖东
-
@Date 2020年9月30日
*/
public class SnapshotHfile2OrcJob extends Configured implements Tool{@Override
public int run(String[] args) throws Exception {
// 获取配置对象
Configuration conf = getConf();// 创建任务连对象和任务对象 JobControl jobC = new JobControl("SnapshotHfile2Orc"); SnapshotHfile2Orc orc = new SnapshotHfile2Orc(); // 设置配置 orc.setConf(conf); // 获得受控作业对象 ControlledJob orcJob = orc.getControlledJob(); // 添加任务到任务链上 jobC.addJob(orcJob); // 设置任务依赖(任务A依赖任务B)
// A.addDependingJob(B);
// 运行任务连;接收返回结果 JobRunResult result = JobRunUtil.run(jobC); result.print(true); return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new SnapshotHfile2OrcJob(), args));
}
}