Flink:java.lang.UnsupportedOperationException:无法覆盖 KeyedStream 的分区
Flink: java.lang.UnsupportedOperationException: Cannot override partitioning for KeyedStream
我收到以下异常 运行 我的小 flink 程序。该应用程序有两个来自同一个模拟源的数据流。它具有广播状态。我写这篇文章是为了做一些性能测试,但给了我例外
Caused by: java.lang.UnsupportedOperationException: Cannot override partitioning for KeyedStream.
at org.apache.flink.streaming.api.datastream.KeyedStream.setConnectionType(KeyedStream.java:251)
at org.apache.flink.streaming.api.datastream.DataStream.broadcast(DataStream.java:429)
at org.apache.flink.streaming.api.scala.DataStream.broadcast(DataStream.scala:495)
我的代码:
val testStream: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.filter(x => !x._1.equals("x"))
.map(x => x)
.uid("test stream 1")
val testStream2: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.map(x => x)
.keyBy(x => x._1)
.uid("test stream 2")
lazy val testStateDescriptor =
new MapStateDescriptor("testState", classOf[String], classOf[Tuple2[String, String]])
val testBroadcastStream = testStream.broadcast(testStateDescriptor)
val broadcastOutStream: DataStream[Tuple2[String, String]] =
testStream2
.connect(testBroadcastStream)
.process(new StateProcess)
broadcastOutStream.print()
这一行发生异常:
val testBroadcastStream = testStream.broadcast(testStateDescriptor)
我的问题是我在 testStream2
的键控流上调用了 uid
方法。我不得不将 uid
移动到地图之后,然后键入流。
我收到以下异常 运行 我的小 flink 程序。该应用程序有两个来自同一个模拟源的数据流。它具有广播状态。我写这篇文章是为了做一些性能测试,但给了我例外
Caused by: java.lang.UnsupportedOperationException: Cannot override partitioning for KeyedStream.
at org.apache.flink.streaming.api.datastream.KeyedStream.setConnectionType(KeyedStream.java:251)
at org.apache.flink.streaming.api.datastream.DataStream.broadcast(DataStream.java:429)
at org.apache.flink.streaming.api.scala.DataStream.broadcast(DataStream.scala:495)
我的代码:
val testStream: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.filter(x => !x._1.equals("x"))
.map(x => x)
.uid("test stream 1")
val testStream2: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.map(x => x)
.keyBy(x => x._1)
.uid("test stream 2")
lazy val testStateDescriptor =
new MapStateDescriptor("testState", classOf[String], classOf[Tuple2[String, String]])
val testBroadcastStream = testStream.broadcast(testStateDescriptor)
val broadcastOutStream: DataStream[Tuple2[String, String]] =
testStream2
.connect(testBroadcastStream)
.process(new StateProcess)
broadcastOutStream.print()
这一行发生异常:
val testBroadcastStream = testStream.broadcast(testStateDescriptor)
我的问题是我在 testStream2
的键控流上调用了 uid
方法。我不得不将 uid
移动到地图之后,然后键入流。