在 Flink 的两个不同流中使用相同的运算符
Using the same operator in two different streams in Flink
我想在两个不同的流中使用同一个运算符。但是,我收到一条错误消息,指出 UID 或该运算符不是唯一的。
lazy val opt: DataStream[Foo] => DataStream[Buzz] = src => src.flatMap(new MyFlatMapFunc).uid("opt")
lazy val pipe1 = : DataStream[Foo] => DataStream[Buzz] = src => opt(src). // Do keyBy and other logic
lazy val pipe2 = : DataStream[Foo] => DataStream[Buzz] = src => opt(src). // Do some other logic
我遇到异常:
Exception in thread "main" java.lang.IllegalArgumentException: Hash collision on user-specified ID "opt". Most likely cause is a non-unique ID. Please check that all IDs specified via uid(String)
are unique.
那是因为 uid
在将在管道中使用两次的运算符上。您有两个选择,您可以 union
将两个流合并为一个,以便仅使用一次运算符,或者您可以稍微更改逻辑,以便分配不同的 ID:
lazy val opt: (DataStream[Foo], String) => DataStream[Buzz] = (src, id) => src.flatMap(new MyFlatMapFunc).uid(id)
lazy val pipe1 = : (DataStream[Foo], String) => DataStream[Buzz] = src => opt(src, "firstOpt"). // Do keyBy and other logic
lazy val pipe2 = : (DataStream[Foo], String) => DataStream[Buzz] = src => opt(src, "secondOpt"). // Do keyBy and other logic
我想在两个不同的流中使用同一个运算符。但是,我收到一条错误消息,指出 UID 或该运算符不是唯一的。
lazy val opt: DataStream[Foo] => DataStream[Buzz] = src => src.flatMap(new MyFlatMapFunc).uid("opt")
lazy val pipe1 = : DataStream[Foo] => DataStream[Buzz] = src => opt(src). // Do keyBy and other logic
lazy val pipe2 = : DataStream[Foo] => DataStream[Buzz] = src => opt(src). // Do some other logic
我遇到异常:
Exception in thread "main" java.lang.IllegalArgumentException: Hash collision on user-specified ID "opt". Most likely cause is a non-unique ID. Please check that all IDs specified via
uid(String)
are unique.
那是因为 uid
在将在管道中使用两次的运算符上。您有两个选择,您可以 union
将两个流合并为一个,以便仅使用一次运算符,或者您可以稍微更改逻辑,以便分配不同的 ID:
lazy val opt: (DataStream[Foo], String) => DataStream[Buzz] = (src, id) => src.flatMap(new MyFlatMapFunc).uid(id)
lazy val pipe1 = : (DataStream[Foo], String) => DataStream[Buzz] = src => opt(src, "firstOpt"). // Do keyBy and other logic
lazy val pipe2 = : (DataStream[Foo], String) => DataStream[Buzz] = src => opt(src, "secondOpt"). // Do keyBy and other logic