如何将 Spark 客户端 submitApplication 转换为 Yarn Rest API?
How can I translate a Spark Client submitApplication to Yarn Rest API?
目前我有一个使用 spark.deploy.yarn.Client 向 Yarn 提交应用程序的工作代码实现。聚合此客户需要的所有参数很复杂,但提交申请很简单:
ClientArguments cArgs = new ClientArguments(args.toArray(new String[0]));
client = new Client(cArgs, sparkConf);
applicationID = client.submitApplication();
在此之前的大部分代码都在累积 sparkConf 和 args。现在我希望停用 Client 并仅使用 Rest。 Spark 提供完整的 REST api,包括提交应用程序 - 根据 Spark documentation,这很简单 json/xml post:
POST http://<rm http address:port>/ws/v1/cluster/apps
Accept: application/json
Content-Type: application/json
{
"application-id":"application_1404203615263_0001",
"application-name":"test",
"am-container-spec":
{
"local-resources":
{
"entry":
[
{
"key":"AppMaster.jar",
"value":
{
"resource":"hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar",
"type":"FILE",
"visibility":"APPLICATION",
"size": 43004,
"timestamp": 1405452071209
}
}
]
},
"commands":
{
"command":"{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
},
"environment":
{
"entry":
[
{
"key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
"value": "1405459400754"
},
{
"key": "CLASSPATH",
"value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
},
{
"key": "DISTRIBUTEDSHELLSCRIPTLEN",
"value": "6"
},
{
"key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
"value": "hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands"
}
]
}
},
"unmanaged-AM":false,
"max-app-attempts":2,
"resource":
{
"memory":1024,
"vCores":1
},
"application-type":"YARN",
"keep-containers-across-application-attempts":false,
"log-aggregation-context":
{
"log-include-pattern":"file1",
"log-exclude-pattern":"file2",
"rolled-log-include-pattern":"file3",
"rolled-log-exclude-pattern":"file4",
"log-aggregation-policy-class-name":"org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy",
"log-aggregation-policy-parameters":""
},
"attempt-failures-validity-interval":3600000,
"reservation-id":"reservation_1454114874_1",
"am-black-listing-requests":
{
"am-black-listing-enabled":true,
"disable-failure-threshold":0.01
}
}
我试图将我的论点翻译成 POST 请求的 JSON 正文,但这似乎是不可能的。有谁知道我是否可以从我提交的 JSON 有效载荷通过 REST 发送的 运行 应用程序进行逆向工程?或者我可以使用什么映射来获取客户端参数并将它们放在 JSON?
经过一番搜索,我设法仅通过 REST API 提交了申请。这不是一个有据可查的过程,所以我把它贴在这里。
注意:如果任何时候您希望将请求的内容与客户端发送的请求进行比较,请使用调试断点来检查客户端使用的应用程序上下文.
打开 class org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
并转到方法 submitApplication(ApplicationSubmissionContext appContext)
.
首先,要用 REST API 请求替换 spark.deploy.yarn.Client
,解决方案必须确保配置中提到的所有文件在 HDFS 上可用。
稍后,它需要编写并上传一个名为 __spark_conf__.zip
的额外文件。
步骤 1
查看 SparkConf
(Client
的第二个参数)中的文件:“AllJars”标签中提到的文件, “mainJarPath”,以及“FilesList”中提到的文件。
对于每个文件,检查它是否存在于 HDFS 中,如果不存在 - 从本地计算机上传。对于每个文件,从 HDFS 获取其 FileStatus
。
聚合资源列表,它是包含这 6 个属性的每个文件的属性映射:
- 大小 = getLen()
- 时间戳 = getModificationTime()
- 类型=文件
- 能见度=PUBLIC
另外两个属性:密钥和资源。
- 来自 allJars 列表的文件:键是 spark_libs/{{filename}},资源是文件名。
- FilesList 中的文件:键是 "localEntry" 标签,资源是 "hdfsPath" 标签。
- mainJarPath 中的文件:key 是“app.jar”,resource 是文件名。
步骤 2
正在创建 __spark_conf__.zip
文件。您可以直接在 hdfs 中创建它,在通常为 {{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip
的暂存路径中。
此存档文件包含两个文件和一个空目录:一个文件 __spark_hadoop_conf__.xml
(重命名为 core-site.xml
),另一个文件名为 __spark_conf__.properties
,这是一个略微修改的版本
来自配置的 sparkConf 部分。
要创建 __spark_conf__.properties
,您需要从“sparkConf”->"org$apache$spark$SparkConf$$settings" 读取 JSON 映射,并转换 [=200] 中的每一行=] 格式 "spark.safemine.addcontrol.driverMemory": "5120M"
至 spark.safemine.addcontrol.driverMemory=5120M
在文件底部添加 6 行:
- spark.yarn.cache.confArchive={{您将在 sparkStaging
__spark_conf__.zip
中上传的位置}}
- spark.yarn.cache.visibilities={{文件的所有可见性,逗号分隔 - 基本上 "PUBLIC,PUBLIC, ... ,PUBLIC"}}
- spark.yarn.cache.timestamps={{文件的所有时间戳,以逗号分隔}}
- spark.yarn.cache.types={{文件的所有类型,逗号分隔 - 基本上 "FILE,FILE, ... ,FILE"}}
- spark.yarn.cache.filenames={{所有文件名和密钥,记录为资源#key 和逗号分隔}}
- spark.yarn.cache.sizes={{文件的所有大小,以逗号分隔}}
确保按各自的顺序编译 5 行汇总。我使用了这段代码:
String confArchive = "spark.yarn.cache.confArchive="+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
String filenames = "spark.yarn.cache.filenames=";
String sizes = "spark.yarn.cache.sizes=";
String timestamps = "spark.yarn.cache.timestamps=";
String types = "spark.yarn.cache.types=";
String visibilities = "spark.yarn.cache.visibilities=";
for (Map<String,String> localResource:localResources) {
filenames+=localResource.get("resource")+"#"+localResource.get("key")+",";
sizes+=localResource.get("size")+",";
timestamps+=localResource.get("timestamp")+",";
types+=localResource.get("type")+",";
visibilities+=localResource.get("visibility")+",";
}
properties+=confArchive+"\n";
properties+=filenames.substring(0,filenames.length()-1)+"\n";
properties+=sizes.substring(0,sizes.length()-1)+"\n";
properties+=timestamps.substring(0,timestamps.length()-1)+"\n";
properties+=types.substring(0,types.length()-1)+"\n";
properties+=visibilities.substring(0,visibilities.length()-1)+"\n";
__spark_hadoop_conf__.xml
文件是core-site.xml
的简单重命名,用它们创建的文件夹名为__hadoop_conf__
并留空。
你可以像这样直接将文件保存到hdfs:
private void generateSparkConfInHdfs(String applicationId, String userName, String sparkConfProperties, String sparkHadoopConf) throws IOException {
String path = hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
Path hdfsPath = new Path(path);
ZipOutputStream os = new ZipOutputStream(getHdfs().create(hdfsPath));
os.putNextEntry(new ZipEntry("__hadoop_conf__/"));
os.putNextEntry(new ZipEntry("__spark_conf__.properties"));
os.write(sparkConfProperties.getBytes(),0,sparkConfProperties.getBytes().length);
os.putNextEntry(new ZipEntry("__spark_hadoop_conf__.xml"));
os.write(sparkHadoopConf.getBytes(),0,sparkHadoopConf.getBytes().length);
os.close();
}
创建完文件后,将其添加到具有以下规格的资源列表中:
- 大小 = getLen()
- 时间戳 = getModificationTime()
- 类型=存档
- 可见性 = 私有
- 关键=
__spark_conf__
- 资源是暂存目录(通常
{{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip
)。
查看完整的资源列表并使用我们在 {{}} 占位符中收集的值为每个资源创建一个 XML/JSON:
<entry>
<key>{{key}}</key>
<value>
<resource>{{resource}}</resource>
<size>{{size}}</size>
<timestamp>{{timestamp}}</timestamp>
<type>{{type}}</type>
<visibility>{{visibility}}</visibility>
</value>
</entry>
累积的字符串将是您的 localResources
XML 段,如下所示。
步骤 3
正在生成 Java 命令。您需要从 SparkConfig 中提取一些元素:
- driverMemory - 来自
sparkConf
中的相同属性
- 额外Java选项 = 来自
spark.driver.extraJavaOptions
属性集合
- mainClass - 来自
sparkConf
中的相同属性
- argstr - 收集除 --class 之外的所有
ClientArgs
。
包含元素的结果命令为:
String command = "$JAVA_HOME/bin/java -server -Xmx"+driverMemory+" -Djava.io.tmpdir=$PWD/tmp "+extraJavaOptions+" -Dspark.yarn.app.container.log.dir=<LOG_DIR> "
+ "org.apache.spark.deploy.yarn.ApplicationMaster --class "+mainClass+" "+argstr+" "
+ "--properties-file $PWD/__spark_conf__/__spark_conf__.properties 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr";
步骤 4
编译请求XML。
注意:我的实现需要AM容器上的标签,所以添加了am-container-node-label-expression。这并不适用于所有情况。
从 sparkConf 到 REST 请求的映射是(在 XML 中显示,JSON 实现也受支持):
<application-submission-context>
<application-id>"+applicationId+"</application-id>
<application-name>"+appName+"</application-name>
<queue>default</queue>
<priority>0</priority>
<am-container-spec>
<local-resources>+localResources+</local-resources>
<environment>
<entry>
<key>SPARK_YARN_STAGING_DIR</key>
<value>"+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"</value>
</entry>
<entry>
<key>CLASSPATH</key>
<value>$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:/spark-non-hdfs-storage/spark-assembly-2.3.0-hadoop2.7/*:%HADOOP_CONF_DIR%:%HADOOP_COMMON_HOME%/share/hadoop/common/*:%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__</value>
</entry>
<entry>
<key>SPARK_USER</key>
<value>"+userName+"</value>
</entry>
</environment>
<commands>
<command>"+command+"</command>
</commands>
</am-container-spec>
<unmanaged-AM>false</unmanaged-AM>
<max-app-attempts>1</max-app-attempts>
<resource>
<memory>5632</memory>
<vCores>1</vCores>
</resource>
<application-type>SPARK</application-type>
<keep-containers-across-application-attempts>false</keep-containers-across-application-attempts>
<application-tags>
<tag>"+sparkYarnTag+"</tag>
</application-tags>
<am-container-node-label-expression>appMngr</am-container-node-label-expression>
<log-aggregation-context/>
<attempt-failures-validity-interval>1</attempt-failures-validity-interval>
<reservation-id/>
</application-submission-context>
第 5 步:
正在通过 REST http PUT 提交应用程序:
private void submitApplication (String body, String userName) throws SMSparkManagerException {
HttpClient client = HttpClientBuilder.create().build();
HttpPost request = new HttpPost(uri+"?user.name="+userName);
try {
request.setEntity(new StringEntity(body, ContentType.APPLICATION_XML));
HttpResponse response = client.execute(request);
if (response.getStatusLine().getStatusCode()!=202) {
throw new SMSparkManagerException("The application could not be submitted to Yarn, response http code "+response.getStatusLine().getStatusCode());
}
} catch (UnsupportedEncodingException e) {
logger.error("The application Could not be submitted due to UnsupportedEncodingException in the provided body: "+body, e );
throw new SMSparkManagerException("Error in submitting application to yarn");
} catch (ClientProtocolException e) {
logger.error("The application Could not be submitted due to ClientProtocolException", e);
throw new SMSparkManagerException("Error in submitting application to yarn");
} catch (IOException e) {
logger.error("The application Could not be submitted due to IOException", e);
throw new SMSparkManagerException("Error in submitting application to yarn");
}
}
目前我有一个使用 spark.deploy.yarn.Client 向 Yarn 提交应用程序的工作代码实现。聚合此客户需要的所有参数很复杂,但提交申请很简单:
ClientArguments cArgs = new ClientArguments(args.toArray(new String[0]));
client = new Client(cArgs, sparkConf);
applicationID = client.submitApplication();
在此之前的大部分代码都在累积 sparkConf 和 args。现在我希望停用 Client 并仅使用 Rest。 Spark 提供完整的 REST api,包括提交应用程序 - 根据 Spark documentation,这很简单 json/xml post:
POST http://<rm http address:port>/ws/v1/cluster/apps
Accept: application/json
Content-Type: application/json
{
"application-id":"application_1404203615263_0001",
"application-name":"test",
"am-container-spec":
{
"local-resources":
{
"entry":
[
{
"key":"AppMaster.jar",
"value":
{
"resource":"hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar",
"type":"FILE",
"visibility":"APPLICATION",
"size": 43004,
"timestamp": 1405452071209
}
}
]
},
"commands":
{
"command":"{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
},
"environment":
{
"entry":
[
{
"key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
"value": "1405459400754"
},
{
"key": "CLASSPATH",
"value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
},
{
"key": "DISTRIBUTEDSHELLSCRIPTLEN",
"value": "6"
},
{
"key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
"value": "hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands"
}
]
}
},
"unmanaged-AM":false,
"max-app-attempts":2,
"resource":
{
"memory":1024,
"vCores":1
},
"application-type":"YARN",
"keep-containers-across-application-attempts":false,
"log-aggregation-context":
{
"log-include-pattern":"file1",
"log-exclude-pattern":"file2",
"rolled-log-include-pattern":"file3",
"rolled-log-exclude-pattern":"file4",
"log-aggregation-policy-class-name":"org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy",
"log-aggregation-policy-parameters":""
},
"attempt-failures-validity-interval":3600000,
"reservation-id":"reservation_1454114874_1",
"am-black-listing-requests":
{
"am-black-listing-enabled":true,
"disable-failure-threshold":0.01
}
}
我试图将我的论点翻译成 POST 请求的 JSON 正文,但这似乎是不可能的。有谁知道我是否可以从我提交的 JSON 有效载荷通过 REST 发送的 运行 应用程序进行逆向工程?或者我可以使用什么映射来获取客户端参数并将它们放在 JSON?
经过一番搜索,我设法仅通过 REST API 提交了申请。这不是一个有据可查的过程,所以我把它贴在这里。
注意:如果任何时候您希望将请求的内容与客户端发送的请求进行比较,请使用调试断点来检查客户端使用的应用程序上下文.
打开 class org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
并转到方法 submitApplication(ApplicationSubmissionContext appContext)
.
首先,要用 REST API 请求替换 spark.deploy.yarn.Client
,解决方案必须确保配置中提到的所有文件在 HDFS 上可用。
稍后,它需要编写并上传一个名为 __spark_conf__.zip
的额外文件。
步骤 1
查看 SparkConf
(Client
的第二个参数)中的文件:“AllJars”标签中提到的文件, “mainJarPath”,以及“FilesList”中提到的文件。
对于每个文件,检查它是否存在于 HDFS 中,如果不存在 - 从本地计算机上传。对于每个文件,从 HDFS 获取其 FileStatus
。
聚合资源列表,它是包含这 6 个属性的每个文件的属性映射:
- 大小 = getLen()
- 时间戳 = getModificationTime()
- 类型=文件
- 能见度=PUBLIC
另外两个属性:密钥和资源。
- 来自 allJars 列表的文件:键是 spark_libs/{{filename}},资源是文件名。
- FilesList 中的文件:键是 "localEntry" 标签,资源是 "hdfsPath" 标签。
- mainJarPath 中的文件:key 是“app.jar”,resource 是文件名。
步骤 2
正在创建 __spark_conf__.zip
文件。您可以直接在 hdfs 中创建它,在通常为 {{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip
的暂存路径中。
此存档文件包含两个文件和一个空目录:一个文件 __spark_hadoop_conf__.xml
(重命名为 core-site.xml
),另一个文件名为 __spark_conf__.properties
,这是一个略微修改的版本
来自配置的 sparkConf 部分。
要创建 __spark_conf__.properties
,您需要从“sparkConf”->"org$apache$spark$SparkConf$$settings" 读取 JSON 映射,并转换 [=200] 中的每一行=] 格式 "spark.safemine.addcontrol.driverMemory": "5120M"
至 spark.safemine.addcontrol.driverMemory=5120M
在文件底部添加 6 行:
- spark.yarn.cache.confArchive={{您将在 sparkStaging
__spark_conf__.zip
中上传的位置}} - spark.yarn.cache.visibilities={{文件的所有可见性,逗号分隔 - 基本上 "PUBLIC,PUBLIC, ... ,PUBLIC"}}
- spark.yarn.cache.timestamps={{文件的所有时间戳,以逗号分隔}}
- spark.yarn.cache.types={{文件的所有类型,逗号分隔 - 基本上 "FILE,FILE, ... ,FILE"}}
- spark.yarn.cache.filenames={{所有文件名和密钥,记录为资源#key 和逗号分隔}}
- spark.yarn.cache.sizes={{文件的所有大小,以逗号分隔}}
确保按各自的顺序编译 5 行汇总。我使用了这段代码:
String confArchive = "spark.yarn.cache.confArchive="+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
String filenames = "spark.yarn.cache.filenames=";
String sizes = "spark.yarn.cache.sizes=";
String timestamps = "spark.yarn.cache.timestamps=";
String types = "spark.yarn.cache.types=";
String visibilities = "spark.yarn.cache.visibilities=";
for (Map<String,String> localResource:localResources) {
filenames+=localResource.get("resource")+"#"+localResource.get("key")+",";
sizes+=localResource.get("size")+",";
timestamps+=localResource.get("timestamp")+",";
types+=localResource.get("type")+",";
visibilities+=localResource.get("visibility")+",";
}
properties+=confArchive+"\n";
properties+=filenames.substring(0,filenames.length()-1)+"\n";
properties+=sizes.substring(0,sizes.length()-1)+"\n";
properties+=timestamps.substring(0,timestamps.length()-1)+"\n";
properties+=types.substring(0,types.length()-1)+"\n";
properties+=visibilities.substring(0,visibilities.length()-1)+"\n";
__spark_hadoop_conf__.xml
文件是core-site.xml
的简单重命名,用它们创建的文件夹名为__hadoop_conf__
并留空。
你可以像这样直接将文件保存到hdfs:
private void generateSparkConfInHdfs(String applicationId, String userName, String sparkConfProperties, String sparkHadoopConf) throws IOException {
String path = hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
Path hdfsPath = new Path(path);
ZipOutputStream os = new ZipOutputStream(getHdfs().create(hdfsPath));
os.putNextEntry(new ZipEntry("__hadoop_conf__/"));
os.putNextEntry(new ZipEntry("__spark_conf__.properties"));
os.write(sparkConfProperties.getBytes(),0,sparkConfProperties.getBytes().length);
os.putNextEntry(new ZipEntry("__spark_hadoop_conf__.xml"));
os.write(sparkHadoopConf.getBytes(),0,sparkHadoopConf.getBytes().length);
os.close();
}
创建完文件后,将其添加到具有以下规格的资源列表中:
- 大小 = getLen()
- 时间戳 = getModificationTime()
- 类型=存档
- 可见性 = 私有
- 关键=
__spark_conf__
- 资源是暂存目录(通常
{{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip
)。
查看完整的资源列表并使用我们在 {{}} 占位符中收集的值为每个资源创建一个 XML/JSON:
<entry>
<key>{{key}}</key>
<value>
<resource>{{resource}}</resource>
<size>{{size}}</size>
<timestamp>{{timestamp}}</timestamp>
<type>{{type}}</type>
<visibility>{{visibility}}</visibility>
</value>
</entry>
累积的字符串将是您的 localResources
XML 段,如下所示。
步骤 3
正在生成 Java 命令。您需要从 SparkConfig 中提取一些元素:
- driverMemory - 来自
sparkConf
中的相同属性
- 额外Java选项 = 来自
spark.driver.extraJavaOptions
属性集合 - mainClass - 来自
sparkConf
中的相同属性
- argstr - 收集除 --class 之外的所有
ClientArgs
。
包含元素的结果命令为:
String command = "$JAVA_HOME/bin/java -server -Xmx"+driverMemory+" -Djava.io.tmpdir=$PWD/tmp "+extraJavaOptions+" -Dspark.yarn.app.container.log.dir=<LOG_DIR> "
+ "org.apache.spark.deploy.yarn.ApplicationMaster --class "+mainClass+" "+argstr+" "
+ "--properties-file $PWD/__spark_conf__/__spark_conf__.properties 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr";
步骤 4
编译请求XML。
注意:我的实现需要AM容器上的标签,所以添加了am-container-node-label-expression。这并不适用于所有情况。
从 sparkConf 到 REST 请求的映射是(在 XML 中显示,JSON 实现也受支持):
<application-submission-context>
<application-id>"+applicationId+"</application-id>
<application-name>"+appName+"</application-name>
<queue>default</queue>
<priority>0</priority>
<am-container-spec>
<local-resources>+localResources+</local-resources>
<environment>
<entry>
<key>SPARK_YARN_STAGING_DIR</key>
<value>"+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"</value>
</entry>
<entry>
<key>CLASSPATH</key>
<value>$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:/spark-non-hdfs-storage/spark-assembly-2.3.0-hadoop2.7/*:%HADOOP_CONF_DIR%:%HADOOP_COMMON_HOME%/share/hadoop/common/*:%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__</value>
</entry>
<entry>
<key>SPARK_USER</key>
<value>"+userName+"</value>
</entry>
</environment>
<commands>
<command>"+command+"</command>
</commands>
</am-container-spec>
<unmanaged-AM>false</unmanaged-AM>
<max-app-attempts>1</max-app-attempts>
<resource>
<memory>5632</memory>
<vCores>1</vCores>
</resource>
<application-type>SPARK</application-type>
<keep-containers-across-application-attempts>false</keep-containers-across-application-attempts>
<application-tags>
<tag>"+sparkYarnTag+"</tag>
</application-tags>
<am-container-node-label-expression>appMngr</am-container-node-label-expression>
<log-aggregation-context/>
<attempt-failures-validity-interval>1</attempt-failures-validity-interval>
<reservation-id/>
</application-submission-context>
第 5 步:
正在通过 REST http PUT 提交应用程序:
private void submitApplication (String body, String userName) throws SMSparkManagerException {
HttpClient client = HttpClientBuilder.create().build();
HttpPost request = new HttpPost(uri+"?user.name="+userName);
try {
request.setEntity(new StringEntity(body, ContentType.APPLICATION_XML));
HttpResponse response = client.execute(request);
if (response.getStatusLine().getStatusCode()!=202) {
throw new SMSparkManagerException("The application could not be submitted to Yarn, response http code "+response.getStatusLine().getStatusCode());
}
} catch (UnsupportedEncodingException e) {
logger.error("The application Could not be submitted due to UnsupportedEncodingException in the provided body: "+body, e );
throw new SMSparkManagerException("Error in submitting application to yarn");
} catch (ClientProtocolException e) {
logger.error("The application Could not be submitted due to ClientProtocolException", e);
throw new SMSparkManagerException("Error in submitting application to yarn");
} catch (IOException e) {
logger.error("The application Could not be submitted due to IOException", e);
throw new SMSparkManagerException("Error in submitting application to yarn");
}
}