Apache Flink - 作业中无法识别自定义 java 选项
Apache Flink - custom java options are not recognized inside job
我已将以下行添加到 flink-conf.yaml:
env.java.opts: "-Ddy.props.path=/PATH/TO/PROPS/FILE"
当启动 jobmanager(jobmanager.sh 启动集群)时,我在日志中看到确实识别了 jvm 选项
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - JVM Options:
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xms256m
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xmx256m
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -XX:MaxPermSize=256m
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog4j.configuration=file:/srv/flink-1.2.0/conf/log4j.properties
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlogback.configurationFile=file:/srv/flink-1.2.0/conf/logback.xml
但是当我运行一个flink作业(flink 运行 -d PROG.JAR),System.getProperty("dy.props.path")returns null (打印系统属性时,我看到它确实不存在。)
真正的问题是 - 如何设置 flink-job 代码中可用的系统属性?
这个问题跟运行Flink的时间架构有很大关系[1]。
我了解到您运行在独立集群中完成您的工作。请记住 JobManager
和 TaskManager
s 运行 在不同的 jvm 实例中。您必须考虑每个代码块将在何处执行。
例如,map
或 filter
等转换中的代码在 TaskManager
上执行。
您的条目 class 的 main
方法中的代码在命令行工具 flink
中执行,当然没有系统 属性 设置,因为它会生成一个临时(-d)jvm 仅用于作业提交。
如果您通过 WebUI
提交作业,您的 main
方法中的代码将在 JobManager
上执行,因此 属性 将被设置。
总的来说,我宁愿不鼓励通过系统属性传递程序参数,因为这是一种不好的做法。
下面有一个简单的例子:
我开始了:
- 一个
JobManager
和 env.java.opts:"-Ddy.props.path=jobmanager"
- 一个
TaskManager
和 env.java.opts:"-Ddy.props.path=taskmanager"
我的工作代码如下:
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(1 to 4)
val prop = System.getProperty("dy.props.path")
stream.map(_ => System.getProperty("dy.props.path") + " mainArg: " + prop).print()
env.execute("stream")
}
}
当我通过flink
工具提交代码时输出如下:
taskmanager mainArg: null
taskmanager mainArg: null
taskmanager mainArg: null
taskmanager mainArg: null
当通过 WebUI
提交时,我得到:
taskmanager mainArg: jobmanager
taskmanager mainArg: jobmanager
taskmanager mainArg: jobmanager
taskmanager mainArg: jobmanager
我已将以下行添加到 flink-conf.yaml:
env.java.opts: "-Ddy.props.path=/PATH/TO/PROPS/FILE"
当启动 jobmanager(jobmanager.sh 启动集群)时,我在日志中看到确实识别了 jvm 选项
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - JVM Options:
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xms256m
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xmx256m
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -XX:MaxPermSize=256m
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog4j.configuration=file:/srv/flink-1.2.0/conf/log4j.properties
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlogback.configurationFile=file:/srv/flink-1.2.0/conf/logback.xml
但是当我运行一个flink作业(flink 运行 -d PROG.JAR),System.getProperty("dy.props.path")returns null (打印系统属性时,我看到它确实不存在。)
真正的问题是 - 如何设置 flink-job 代码中可用的系统属性?
这个问题跟运行Flink的时间架构有很大关系[1]。
我了解到您运行在独立集群中完成您的工作。请记住 JobManager
和 TaskManager
s 运行 在不同的 jvm 实例中。您必须考虑每个代码块将在何处执行。
例如,map
或 filter
等转换中的代码在 TaskManager
上执行。
您的条目 class 的 main
方法中的代码在命令行工具 flink
中执行,当然没有系统 属性 设置,因为它会生成一个临时(-d)jvm 仅用于作业提交。
如果您通过 WebUI
提交作业,您的 main
方法中的代码将在 JobManager
上执行,因此 属性 将被设置。
总的来说,我宁愿不鼓励通过系统属性传递程序参数,因为这是一种不好的做法。
下面有一个简单的例子:
我开始了:
- 一个
JobManager
和env.java.opts:"-Ddy.props.path=jobmanager"
- 一个
TaskManager
和env.java.opts:"-Ddy.props.path=taskmanager"
我的工作代码如下:
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(1 to 4)
val prop = System.getProperty("dy.props.path")
stream.map(_ => System.getProperty("dy.props.path") + " mainArg: " + prop).print()
env.execute("stream")
}
}
当我通过flink
工具提交代码时输出如下:
taskmanager mainArg: null
taskmanager mainArg: null
taskmanager mainArg: null
taskmanager mainArg: null
当通过 WebUI
提交时,我得到:
taskmanager mainArg: jobmanager
taskmanager mainArg: jobmanager
taskmanager mainArg: jobmanager
taskmanager mainArg: jobmanager