如何使用新的 API 以编程方式获取 Hadoop 集群中的所有 运行 作业?
How to programmatically get all running jobs in a Hadoop cluster using the new API?
我有一个软件组件可以将 MR 作业提交到 Hadoop。我现在想在提交之前检查是否还有其他工作运行。我发现在新的 API 中有一个 Cluster
对象,可用于查询集群中的 运行 作业,获取它们的配置并从中提取相关信息。但是我在使用它时遇到问题。
只是做 new Cluster(conf)
其中 conf
是一个有效的 Configuration
可以用来访问这个集群(例如,向它提交作业)留下未配置的对象,并且getAllJobStatuses()
Cluster
的方法 returns null
.
从配置中提取 mapreduce.jobtracker.address
,从中构造一个 InetSocketAddress
并使用 Cluster
的另一个构造函数抛出 Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
.
使用旧的 api,做类似 new JobClient(conf).getAllJobs()
的事情会抛出 NPE。
我在这里错过了什么?我如何以编程方式获得 运行 职位?
我进一步调查,我解决了它。 Thomas Jungblut 是对的,这是因为迷你集群。我在 this blog post which turned out to work for MR jobs, but set up the mini cluster in a deprecated way with an incomplete configuration. The Hadoop Wiki has a page on how to develop unit tests 之后使用了 mini cluster,它也解释了如何正确设置 mini cluster。
基本上,我按照以下方式设置迷你集群:
// Create a YarnConfiguration for bootstrapping the minicluster
final YarnConfiguration bootConf = new YarnConfiguration();
// Base directory to store HDFS data in
final File hdfsBase = Files.createTempDirectory("temp-hdfs-").toFile();
bootConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBase.getAbsolutePath());
// Start Mini DFS cluster
final MiniDFSCluster hdfsCluster = new MiniDFSCluster.Builder(bootConf).build();
// Configure and start Mini MR YARN cluster
bootConf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 64);
bootConf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
final MiniMRYarnCluster yarnCluster = new MiniMRYarnCluster("test-cluster", 1);
yarnCluster.init(bootConf);
yarnCluster.start();
// Get the "real" Configuration to use from now on
final Configuration conf = yarnCluster.getConfig();
// Get the filesystem
final FileSystem fs = new Path ("hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/").getFileSystem(conf);
现在,我可以使用 conf
和 fs
来提交作业和访问 HDFS,并且 new Cluster(conf)
和 cluster.getAllJobStatuses
可以正常工作。
当一切都完成后,关闭并清理,我调用:
yarnCluster.stop();
hdfsCluster.shutdown();
FileUtils.deleteDirectory(hdfsBase); // from Apache Commons IO
注意:JAVA_HOME
必须设置才能生效。在 Jenkins 上构建时,确保 JAVA_HOME
设置为默认值 JDK。或者,您可以明确声明要使用的 JDK,然后 Jenkins 将自动设置 JAVA_HOME
。
我这样试过,对我有用,但是是在提交作业之后
JobClient jc = new JobClient(job.getConfiguration());
for(JobStatus js: jc.getAllJobs())
{
if(js.getState().getValue() == State.RUNNING.getValue())
{
}
}
jc.close();
否则我们可以从作业 api 中获取集群,并且有一些方法可以 return 所有作业,作业状态
cluster.getAllJobStatuses();
我有一个软件组件可以将 MR 作业提交到 Hadoop。我现在想在提交之前检查是否还有其他工作运行。我发现在新的 API 中有一个 Cluster
对象,可用于查询集群中的 运行 作业,获取它们的配置并从中提取相关信息。但是我在使用它时遇到问题。
只是做 new Cluster(conf)
其中 conf
是一个有效的 Configuration
可以用来访问这个集群(例如,向它提交作业)留下未配置的对象,并且getAllJobStatuses()
Cluster
的方法 returns null
.
从配置中提取 mapreduce.jobtracker.address
,从中构造一个 InetSocketAddress
并使用 Cluster
的另一个构造函数抛出 Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
.
使用旧的 api,做类似 new JobClient(conf).getAllJobs()
的事情会抛出 NPE。
我在这里错过了什么?我如何以编程方式获得 运行 职位?
我进一步调查,我解决了它。 Thomas Jungblut 是对的,这是因为迷你集群。我在 this blog post which turned out to work for MR jobs, but set up the mini cluster in a deprecated way with an incomplete configuration. The Hadoop Wiki has a page on how to develop unit tests 之后使用了 mini cluster,它也解释了如何正确设置 mini cluster。
基本上,我按照以下方式设置迷你集群:
// Create a YarnConfiguration for bootstrapping the minicluster
final YarnConfiguration bootConf = new YarnConfiguration();
// Base directory to store HDFS data in
final File hdfsBase = Files.createTempDirectory("temp-hdfs-").toFile();
bootConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBase.getAbsolutePath());
// Start Mini DFS cluster
final MiniDFSCluster hdfsCluster = new MiniDFSCluster.Builder(bootConf).build();
// Configure and start Mini MR YARN cluster
bootConf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 64);
bootConf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
final MiniMRYarnCluster yarnCluster = new MiniMRYarnCluster("test-cluster", 1);
yarnCluster.init(bootConf);
yarnCluster.start();
// Get the "real" Configuration to use from now on
final Configuration conf = yarnCluster.getConfig();
// Get the filesystem
final FileSystem fs = new Path ("hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/").getFileSystem(conf);
现在,我可以使用 conf
和 fs
来提交作业和访问 HDFS,并且 new Cluster(conf)
和 cluster.getAllJobStatuses
可以正常工作。
当一切都完成后,关闭并清理,我调用:
yarnCluster.stop();
hdfsCluster.shutdown();
FileUtils.deleteDirectory(hdfsBase); // from Apache Commons IO
注意:JAVA_HOME
必须设置才能生效。在 Jenkins 上构建时,确保 JAVA_HOME
设置为默认值 JDK。或者,您可以明确声明要使用的 JDK,然后 Jenkins 将自动设置 JAVA_HOME
。
我这样试过,对我有用,但是是在提交作业之后
JobClient jc = new JobClient(job.getConfiguration());
for(JobStatus js: jc.getAllJobs())
{
if(js.getState().getValue() == State.RUNNING.getValue())
{
}
}
jc.close();
否则我们可以从作业 api 中获取集群,并且有一些方法可以 return 所有作业,作业状态
cluster.getAllJobStatuses();