Flink 调用外部脚本
Calling external script from Flink
对于你们中的一些人来说,这似乎是一个非常复杂的问题。我想使用 Apache Flink 对来自 SocketStream 的数据应用一些算法。但是,这些算法是我 运行 使用 Scala 的 sys.process
包的外部可执行文件。这是我希望 Flink 做的事情:
从 SocketStream 获取单独的行:
val text = env.socketTextStream(hostName, port)
val lines = text.flatMap { _.toLowerCase.split("\n") filter { _.nonEmpty } }
使用这些行作为命令行参数调用我的可执行算法。有点像这样:
var op = "./Somefile.py "+lines!
打印我从可执行文件中获得的输出。
op.print()
显然,这不是我想做的事情的正确方法,因为 op
与 lines
不同,它不是数据接收器,因此不会打印任何内容。有什么方法可以实现吗?
如果将所有参数放入单个字符串值中,则可以从 MapFunction
.
调用外部可执行文件
这看起来像:
val args: DataStream[String] = env.socketTextStream(hostName, port)
// assume each text line has all elements
val out: DataStream[String] = args.map(new ExternalCaller())
// print result
out.print()
与
class ExternalCaller extends MapFunction[String, String] {
override def map(args: String): String = {
// call external executable with args here and return output
}
}
对于你们中的一些人来说,这似乎是一个非常复杂的问题。我想使用 Apache Flink 对来自 SocketStream 的数据应用一些算法。但是,这些算法是我 运行 使用 Scala 的 sys.process
包的外部可执行文件。这是我希望 Flink 做的事情:
从 SocketStream 获取单独的行:
val text = env.socketTextStream(hostName, port) val lines = text.flatMap { _.toLowerCase.split("\n") filter { _.nonEmpty } }
使用这些行作为命令行参数调用我的可执行算法。有点像这样:
var op = "./Somefile.py "+lines!
打印我从可执行文件中获得的输出。
op.print()
显然,这不是我想做的事情的正确方法,因为 op
与 lines
不同,它不是数据接收器,因此不会打印任何内容。有什么方法可以实现吗?
如果将所有参数放入单个字符串值中,则可以从 MapFunction
.
这看起来像:
val args: DataStream[String] = env.socketTextStream(hostName, port)
// assume each text line has all elements
val out: DataStream[String] = args.map(new ExternalCaller())
// print result
out.print()
与
class ExternalCaller extends MapFunction[String, String] {
override def map(args: String): String = {
// call external executable with args here and return output
}
}