运行 apache flink 中的源同步是否有可用的内置函数?
Is there any inbuilt function available to run the source synchronize in apache flink?
我们是 Apache Flink 和 Scala 的新手。这是我们的用例,就像我们最初使用两种类型的主题从 AMPS 服务器 (crankuptheamps) 播种数据一样。来源 #1 & #2 从主题 #1 & #2 中提取数据。
用例
我们的要求不过是最初来源#1 将在来源 2 开始之前传播(世界状况)数据。因为我们只是将源 #1 数据存储到地图状态中。然后我们只需要启动源 #2 母猪操作。最终我们需要 运行 一个一个的来源。那么是否有任何选项可用于 运行 来源一一。
def sourceConnect(environment: StreamExecutionEnvironment,topic: String, subscriptionType: SubscriptionType): DataStream[Map[String, String]] = {
val dataStream : DataStream[Map[String, String]] = environment.addSource(new RichSourceFunction[Map[String,String]]() {
var sourceClient: Client = null
override def open(parameters: Configuration): Unit = {
// .... Code Here
}
override def run(sourceContext: SourceFunction.SourceContext[Map[String, String]]): Unit = {
subscriptionType match {
case SubscriptionType.sow =>
//.... Code Here
}
}
override def getRuntimeContext: RuntimeContext = super.getRuntimeContext
override def cancel(): Unit = {
sourceClient.close()
}
override def close(): Unit = try cancel()
finally super.close()
})
dataStream }
private var environment: StreamExecutionEnvironment = null
// .... Code Here
val source1 = environment.addSource(....)
val source2 = environment.addSource(....)
val conn = source1.connect(source2)
conn.print()
environment.execute()
最终我们的用例简单到 运行 首先是 source1 和 source2,这意味着同步
Flink 在开始读取另一个流之前并没有特别好的方式来摄取一个流。这个一般话题通常被称为side inputs,还有一个FLIP(一个FLink Improvement Proposal)on this subject.
到目前为止,关于此主题的最佳资源是来自 Lyft 的 Gregory Fee 在 Flink Forward San Francisco 2018 上发表的关于 Bootstrapping State In Apache Flink 的演讲,该演讲探讨了几种可能的方法。哪一个可能最好取决于您的具体应用要求。
我之前破解了一个 UnionedSources SourceFunction
,它让我可以先发送来自一个来源的所有数据,然后再发送来自后续来源的数据。我用它来引导某些状态。也许这对您的用例有用。
--肯
我们是 Apache Flink 和 Scala 的新手。这是我们的用例,就像我们最初使用两种类型的主题从 AMPS 服务器 (crankuptheamps) 播种数据一样。来源 #1 & #2 从主题 #1 & #2 中提取数据。
用例
我们的要求不过是最初来源#1 将在来源 2 开始之前传播(世界状况)数据。因为我们只是将源 #1 数据存储到地图状态中。然后我们只需要启动源 #2 母猪操作。最终我们需要 运行 一个一个的来源。那么是否有任何选项可用于 运行 来源一一。
def sourceConnect(environment: StreamExecutionEnvironment,topic: String, subscriptionType: SubscriptionType): DataStream[Map[String, String]] = {
val dataStream : DataStream[Map[String, String]] = environment.addSource(new RichSourceFunction[Map[String,String]]() {
var sourceClient: Client = null
override def open(parameters: Configuration): Unit = {
// .... Code Here
}
override def run(sourceContext: SourceFunction.SourceContext[Map[String, String]]): Unit = {
subscriptionType match {
case SubscriptionType.sow =>
//.... Code Here
}
}
override def getRuntimeContext: RuntimeContext = super.getRuntimeContext
override def cancel(): Unit = {
sourceClient.close()
}
override def close(): Unit = try cancel()
finally super.close()
})
dataStream }
private var environment: StreamExecutionEnvironment = null
// .... Code Here
val source1 = environment.addSource(....)
val source2 = environment.addSource(....)
val conn = source1.connect(source2)
conn.print()
environment.execute()
最终我们的用例简单到 运行 首先是 source1 和 source2,这意味着同步
Flink 在开始读取另一个流之前并没有特别好的方式来摄取一个流。这个一般话题通常被称为side inputs,还有一个FLIP(一个FLink Improvement Proposal)on this subject.
到目前为止,关于此主题的最佳资源是来自 Lyft 的 Gregory Fee 在 Flink Forward San Francisco 2018 上发表的关于 Bootstrapping State In Apache Flink 的演讲,该演讲探讨了几种可能的方法。哪一个可能最好取决于您的具体应用要求。
我之前破解了一个 UnionedSources SourceFunction
,它让我可以先发送来自一个来源的所有数据,然后再发送来自后续来源的数据。我用它来引导某些状态。也许这对您的用例有用。
--肯