源码分析被老师吐槽后,认真做了一遍,结合网上资源慢慢整理理出来,用到线程的地方看得有点迷,其中第二层核心面板submitJobInternal()方法中生成密钥那部分不知道在干啥,还有提交后Yarn如何进入到MapTask中的方法这段中间的细节比较模糊,希望有大佬指点。
waitForCompletion()
submit();
// 1.建立连接
connect();
// 创建提交Job的代理
new Cluster(getConfiguration());
// 判断是本地模式还是yarn模式
initialize(jobTrackAddr, conf);
// 2.准备作业资源目录
submitter.submitJobInternal(Job.this, cluster)
// 创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 获取jobid,并创建Job路径
JobID jobId = submitClient.getNewJobID();
// 建立资源目录,并拷贝相关到该目录下
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 3.计算切片,并放入资源目录下
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 4.生成job.xml文件并放入资源目录
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 5.提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
// Job提交过程在集中在这个方法中
submit();
}
if (verbose) {
// 检测并打印Job相关信息
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
// 再次确认作业状态
ensureState(JobState.DEFINE);
// 兼容旧版本API
setUseNewAPI();
// 副核心面板:为Job中的Cluster赋值,建立连接
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
// 主核心面板
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
connect():作业提交时连接集群的方法,本质上是构造Cluster的实例cluster。Cluster内部,ClientProtocolProvider的create方法创造能与集群通信的客户端通信协议ClientProtocol实例client。client存在两种模式,一个是Yarn模式中的YARNRunner,另一个是Local模式的LocalJobRunner。其中YARNRunner中的resMgrDelegate是与Yarn集群通信的核心。
T0. Cluster类成员信息
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Cluster {
@InterfaceStability.Evolving
public static enum JobTrackerStatus {INITIALIZING, RUNNING};
private ClientProtocolProvider clientProtocolProvider; // 协议生成者
private ClientProtocol client; // 客户端协议实例
private UserGroupInformation ugi;
private Configuration conf;
private FileSystem fs = null;
private Path sysDir = null;
private Path stagingAreaDir = null; // 作业资源存放路径
private Path jobHistoryDir = null;
private static final Log LOG = LogFactory.getLog(Cluster.class);
private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
ServiceLoader.load(ClientProtocolProvider.class);
static {
ConfigUtil.loadResources();
}
...
}
T1. 第一级代码
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf);// 初始化入口
}
T2. 第二级代码
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
// 配置文件判断有无YARN的配置有则YARN集群运行,无则本地运行
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: ", e);
}
}
}
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
T3.1 第三级:本地运行返回:LocalRunner
public ClientProtocol create(Configuration conf) throws IOException {
String framework = conf.get("mapreduce.framework.name", "local");
if (!"local".equals(framework)) {
return null;
} else {
conf.setInt("mapreduce.job.maps", 1);
return new LocalJobRunner(conf);
}
}
T3.2 第三级:YARN集群运行,返回YarnRunner
public ClientProtocol create(Configuration conf) throws IOException {
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
return new YARNRunner(conf);
}
return null;
}
T4. 第四级:YARNRunner类
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
ClientCache clientCache) {
this.conf = conf;
try {
// ResourceManager的代理类,与Yarn集群通信的核心
this.resMgrDelegate = resMgrDelegate;
this.clientCache = clientCache;
this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
}
}
JobSubmitter中实例成员对象
class JobSubmitter {
protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
private static final int SHUFFLE_KEY_LENGTH = 64;
private FileSystem jtFs;
private ClientProtocol submitClient;
private String submitHostName;
private String submitHostAddress;
...
}
T0. 核心面板
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
// 路径检查【1】
checkSpecs(job);
Configuration conf = job.getConfiguration();
// 添加应用框架路径到分布式缓存中
addMRFrameworkToDistributedCache(conf);
// 通过cluster.getStagingAreaDir()获得作业执行时相关资源存放路径 【2】
// 例如mapred\staging\admin768650134\.staging
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
// 配置conf记录提交作业的主机IP、主机名
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
// 生成JobId
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
// 构造提交作业路径,在jobStagingArea加上/jobId
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// 获得路径的授权令牌
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
// 获取密钥和令牌,并将它们储存到TokenCache中
populateTokenCache(conf, job.getCredentials());
// 这里并不是很懂?求大佬指点
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
// 这里创建需要提交的目录,并将所需文件copy过来 【3】
// 例如\mapred\staging\admin768650134\.staging\job_local768650134_0001在本地运行时,这个目录是空的
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 核心:数据分片 【4】
// 经过这个方法,数据分片信息将存在工作资源目录下
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// 配置文件的写入,job.xml的生成 【5】
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
// 实际提交作业,Client类提交,YARNRunner或者LocalJobRunner 【6】
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
T1. checkSpecs(job)校验输入输出路径
private void checkSpecs(Job job) throws ClassNotFoundException,
InterruptedException, IOException {
JobConf jConf = (JobConf)job.getConfiguration();
// Check the output specification
if (jConf.getNumReduceTasks() == 0 ?
jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?, ?> output =
ReflectionUtils.newInstance(job.getOutputFormatClass(),
job.getConfiguration());
output.checkOutputSpecs(job);
} else {
jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
}
}
T2. getStagingDir(cluster, conf)获得文件相关路径
public static Path getStagingDir(Cluster cluster, Configuration conf)
throws IOException,InterruptedException {
Path stagingArea = cluster.getStagingAreaDir();
FileSystem fs = stagingArea.getFileSystem(conf);
String realUser;
String currentUser;
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
if (fs.exists(stagingArea)) {
FileStatus fsStatus = fs.getFileStatus(stagingArea);
String owner = fsStatus.getOwner();
if (!(owner.equals(currentUser) || owner.equals(realUser))) {
throw new IOException("The ownership on the staging directory " +
stagingArea + " is not as expected. " +
"It is owned by " + owner + ". The directory must " +
"be owned by the submitter " + currentUser + " or " +
"by " + realUser);
}
if (!fsStatus.getPermission().equals(JOB_DIR_PERMISSION)) {
LOG.info("Permissions on staging directory " + stagingArea + " are " +
"incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
"to correct value " + JOB_DIR_PERMISSION);
fs.setPermission(stagingArea, JOB_DIR_PERMISSION);
}
} else {
fs.mkdirs(stagingArea,
new FsPermission(JOB_DIR_PERMISSION));
}
return stagingArea;
}
}
T3. copyAndConfigureFiles(job, submitJobDir)
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
JobResourceUploader rUploader = new JobResourceUploader(jtFs);
rUploader.uploadFiles(job, jobSubmitDir);
// Get the working directory. If not set, sets it to filesystem working dir
// This code has been added so that working directory reset before running
// the job. This is necessary for backward compatibility as other systems
// might use the public API JobConf#setWorkingDirectory to reset the working
// directory.
job.getWorkingDirectory();
}
T3.1 uploadFiles:创建资源目录并拷贝jar包等文件到资源目录下
public void uploadFiles(Job job, Path submitJobDir) throws IOException {
Configuration conf = job.getConfiguration();
short replication =
(short) conf.getInt(Job.SUBMIT_REPLICATION,
Job.DEFAULT_SUBMIT_REPLICATION);
if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
LOG.warn("Hadoop command-line option parsing not performed. "
+ "Implement the Tool interface and execute your application "
+ "with ToolRunner to remedy this.");
}
// get all the command line arguments passed in by the user conf
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
String jobJar = job.getJar();
// Create a number of filenames in the JobTracker's fs namespace
LOG.debug("default FileSystem: " + jtFs.getUri());
if (jtFs.exists(submitJobDir)) {
throw new IOException("Not submitting job. Job directory " + submitJobDir
+ " already exists!! This is unexpected.Please check what's there in"
+ " that directory");
}
submitJobDir = jtFs.makeQualified(submitJobDir);
submitJobDir = new Path(submitJobDir.toUri().getPath());
FsPermission mapredSysPerms =
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
// 创建目录文件夹
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
// add all the command line files/ jars and archive
// first copy them to jobtrackers filesystem
if (files != null) {
FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
String[] fileArr = files.split(",");
for (String tmpFile : fileArr) {
URI tmpURI = null;
try {
tmpURI = new URI(tmpFile);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheFile(pathURI, conf);
} catch (URISyntaxException ue) {
// should not throw a uri exception
throw new IOException("Failed to create uri for " + tmpFile, ue);
}
}
}
...
}
T4. writeSplits(job, submitJobDir)数据分片
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
// 通过反射获取InputFormat,这里需要调用TextInputFormat父类FileInputFormat的getSplits方法来帮助分片,不同类型的分片方式不一样
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
// 第二级方法入口
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
T4.1 第二级:getSplits方法,计算切片的核心代码
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
// 获取输入源所有文件
List<FileStatus> files = listStatus(job);
// 这个for循环意味着面向文件输出切片,所以一个切片读取是不能跨文件的。
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 文件允许切片则运行,有些文件无法切片,比如压缩文件
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// SPLIT_SLOP默认1.1,如果最后一个切片小于splitSize的1.1倍则直接作为一个切片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 第三级入口:切片信息与块信息是如何关联的
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// 生成切片,参数包含:切片的位置,偏移量,大小,数据块位置信息,切片内存位置
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
// 剩下的单独成片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
// 不可切片的单独成片
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
T4.1.1 第三级:切片信息与块信息的联系
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
// 通过块的偏移量夹出切片的偏移量在哪个块中
return i;
}
}
T5. 创建Job.xml,并写入配置信息
private void writeConf(Configuration conf, Path jobFile)
throws IOException {
// Write job file to JobTracker's fs
// 创建job.xml文件
FSDataOutputStream out =
FileSystem.create(jtFs, jobFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
try {
// 将配置信息写入job.xml
conf.writeXml(out);
} finally {
out.close();
}
}
T6. submitClient.submitJob()作业提交
T6.1 本地模式作业提交
public JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, Credentials credentials) throws IOException {
LocalJobRunner.Job job = new LocalJobRunner.Job(JobID.downgrade(jobid), jobSubmitDir);
job.job.setCredentials(credentials);
return job.status;
}
T6.2 YARN模式作业提交
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
addHistoryToken(ts);
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}