yarn distributedshell client 代码分析

在网上找了distributedshell client 代码分析,分享给大家
程序流程:
1. 通过ClientRMProtocol协议向ApplicationsManager(ASM)取得ApplicationId
2. 初始化ApplicationSubmissionContext,包括application id,application name,application priority queue,另外还有一个ContainerLaunchContext,用于描述如何启动ApplicationMaster
3. ContainerLaunchContext包含了启动ApplicationMaster的所有资源,包括二进制文件(jar),配置文件,执行命令,环境变量等等;凡是文件类型的资源,都会先上传到hdfs,ContainerLaunchContext里面仅仅指定了这个文件资源的hdfs URL
4. 当job提交到ResourceManager后,Client通过ApplicationReport来向ResourceManager取得当前job的进展;

启动Client的命令:
./bin/hadoop jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar -shell_command ls -shell_script ./ignore.sh -num_containers 10 -container_memory 350 -master_memory 350 -priority 10
其中-jar用来表明运行ApplicationMaster的程序
-shell_command,-shell_script用来在每一个Container上执行的真正程序,这里为ls ./ignore.sh
-num_container用来启动多少个container来执行shell命令
-container_memoery来表明每个container需要的内存资源
-master_memoery用来表明ApplicationMaster需要的内存资源
-priority用来表明优先级
需要理解的是,ApplicationMaster是由ResourceManager里面的ASM模块来调度到某一个Container并启动它的;即ApplicationMaster是Yarn框架负责调度的;但每一个Container的调度,则需要ApplicationMaster自己负责了;ApplicationMaster里面的具体细节,留到另一篇博客里面介绍

函数分析:
1. public boolean init(String[] args) throws ParseException
该函数的主要作用是分析命令行,根据分析的结果初始化Client类的成员变量;shell_args和shell_env的使用方法重点分析

2. run里面调用了start函数,该函数的主要功能是用来建立ClientRMProtocol的RPC实体rmClient;其中的rmAddress是在YarnConfiguration里面定好的

3. 获取Application Id,
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
rmClient.getNewApplication(request);
ApplicationId appId = newApp.getApplicationId();

4. 构建AppMaster的运行环境
ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
appContext.setApplicationId(appId);
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

// 这里的FileSystem指hdfs,conf里面包含了hdfs信息    
FileSystem fs = FileSystem.get(conf);
// 本地的jar文件路径
Path src = new Path(appMasterJar);
// 本地jar文件上传到hdfs上的路径
String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";    
Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
// 将本地jar文件上传到hdfs
fs.copyFromLocalFile(false, true, src, dst);
    
// 设置这个jar的resource内容
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
// 资源类型
amJarRsrc.setType(LocalResourceType.FILE);
// 资源权限
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);    
// 资源路径
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
// 大小和时间戳用来验证资源的正确性    
amJarRsrc.setTimestamp(destStatus.getModificationTime());
amJarRsrc.setSize(destStatus.getLen());

Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
localResources.put("AppMaster.jar",  amJarRsrc);
// 这个资源作为master执行资源,主要是提交给AMS的;其它需要上传得file,应该会跟jar文件一起解在同一目录
amContainer.setLocalResources(localResources);

// 因为script脚本是master在执行的时候需要的资源,因此这些资源只需要上传到hdfs即可,而不用加入到localResources里面;localResouces里面只要干干净净提交给AMS的资源即可
// 脚本上传到hdfs里面后,采用环境变量来记录脚本的位置;AMS和Container联合在启动application master之前会先布置好环境变量再执行它
Map<String, String> env = new HashMap<String, String>();
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));

// 需要确定application master的jar包执行需要的classpath,并将其加入到env
env.put("CLASSPATH", classPathEnv.toString());

// 设置环境变量,到目前为止,启动application master的执行文件,环境变量都已经ready,剩下是确定执行命令
amContainer.setEnvironment(env);
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
vargs.add("${JAVA_HOME}" + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
//设置需要执行的application master的class
vargs.add(appMasterMainClass);
// 程序执行的其它参数,这些参数都会作为application master的args
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
vargs.add("--shell_command " + shellCommand + "");
vargs.add("--shell_args " + shellArgs + "");
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
  vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

amContainer.setCommands(commands);
// 确定application master本身需要的执行资源
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(amMemory);
amContainer.setResource(capability);
appContext.setAMContainerSpec(amContainer);
// 提交application master
super.submitApplication(appContext);

0 个评论

要回复文章请先登录注册