Kafka-stream:如何通过从值列表中选择键来重新键入流
Kafka-stream : How to re-key a stream by selecting key from a list of values
我有一个对象 A :
public class A {
String id;
List<String> otherIds;
SomeOtherObject object;
}
我有一个 kafka 流,它看起来像:
KStream<Integer, A> inputStream
我需要重新键入此 inputStream 流,现在应该是:
KStream<String, A> newStream
其中 newStream 的关键部分是 otherId from A.otherIds.
举个例子
Let's say, A is like : { id:1, otherIds:[ "ab","bc","ca"],OtherObject: obj1}.
And inputStream if like <1,A>,
Then the newStream should have:
<"ab",A>
<"bc", A>
<"ca",A>
大致来说,要了解我正在尝试的是:
KStream<String, A> newStream =
inputStream
.map((key,val) ->
val.getOtherIds().stream().forEach(e->
KeyValue.pair(e,val))
);
有没有办法做到这一点(通过从值列表中选择键重新键入)?
如要将单条记录拆分为多条记录,应使用flatMap()
而不是map()
。 map()
是 1:1 操作,而 flatMap()
是 1:n。
对于 return 多个 KeyValue
对,您的 flatMap()
可以 return
一个 List<KevValue>
例如(任何其他 Collection<KeyValue>
类型都可以,也是)。
根据@Matthias的建议,我是这样做的(有效):
KStream<String, A> newStream =
inputStream
.flatMap((key,A)->{
return A.getOtherIdsList().stream().map( id ->{
return KeyValue.pair(id, A);
})
.collect(Collectors.toSet());
});
谢谢
我有一个对象 A :
public class A {
String id;
List<String> otherIds;
SomeOtherObject object;
}
我有一个 kafka 流,它看起来像:
KStream<Integer, A> inputStream
我需要重新键入此 inputStream 流,现在应该是:
KStream<String, A> newStream
其中 newStream 的关键部分是 otherId from A.otherIds.
举个例子
Let's say, A is like : { id:1, otherIds:[ "ab","bc","ca"],OtherObject: obj1}.
And inputStream if like <1,A>,
Then the newStream should have:
<"ab",A>
<"bc", A>
<"ca",A>
大致来说,要了解我正在尝试的是:
KStream<String, A> newStream =
inputStream
.map((key,val) ->
val.getOtherIds().stream().forEach(e->
KeyValue.pair(e,val))
);
有没有办法做到这一点(通过从值列表中选择键重新键入)?
如要将单条记录拆分为多条记录,应使用flatMap()
而不是map()
。 map()
是 1:1 操作,而 flatMap()
是 1:n。
对于 return 多个 KeyValue
对,您的 flatMap()
可以 return
一个 List<KevValue>
例如(任何其他 Collection<KeyValue>
类型都可以,也是)。
根据@Matthias的建议,我是这样做的(有效):
KStream<String, A> newStream =
inputStream
.flatMap((key,A)->{
return A.getOtherIdsList().stream().map( id ->{
return KeyValue.pair(id, A);
})
.collect(Collectors.toSet());
});
谢谢