2 个连续的流-流内部连接产生错误的结果:流之间的 KStream 连接在内部真正做了什么?
2 consecutive stream-stream inner joins produce wrong results: what does KStream join between streams really do internally?
问题设置
我有一个节点流和一个边流,它们表示图形的连续更新,我想使用多个串联连接来构建由节点和边组成的模式。假设我想匹配如下模式:(node1) --[edge1]--> (node2).
我的想法是将节点流与边流连接起来,以组成 (node1) --[edge1]--> 类型的子模式流。然后获取生成的流并再次将其与节点流连接起来,以组成最终模式 (node1) --[edge1]--> (node2)。对特定类型的节点和边进行过滤并不重要。
数据模型
所以我有以 Avro 格式构建的节点、边和模式:
{
"namespace": "DataModel",
"type": "record",
"name": "Node",
"doc": "Node schema, it contains a nodeID label and properties",
"fields": [
{
"name": "nodeID",
"type": "long"
},
{
"name": "labels",
"type": {
"type": "array",
"items": "string",
"avro.java.string": "String"
}
},
{
"name": "properties",
"type": {
"type": "map",
"values": "string",
"avro.java.string": "String"
}
},
{
"name": "timestamp",
"type": "long"
}
]
}
{
"namespace": "DataModel",
"type": "record",
"name": "Edge",
"doc": "contains edgeID, a type, a list of properties, a starting node ID and an ending node ID ",
"fields": [
{
"name": "edgeID",
"type": "long"
},
{
"name": "type",
"type": "string"
},
{
"name": "properties",
"type": {
"type": "map",
"values": "string",
"avro.java.string": "String"
}
},
{
"name": "startID",
"type": "long"
},
{
"name": "endID",
"type": "long"
},
{
"name": "timestamp",
"type": "long"
}
]
}
{
"namespace": "DataModel",
"type": "record",
"name": "Pattern",
"fields": [
{
"name": "first",
"type": "long"
},
{
"name": "nextJoinID",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "timestamp",
"type": "long"
},
{
"name": "segments",
"doc": "It's the ordered list of nodes and edges that compose this sub-pattern from the leftmost node to the rightmost edge or node",
"type": {
"type": "array",
"items": [
"DataModel.Node",
"DataModel.Edge"
]
}
}
然后我有以下两个ValueJoiner:
第一个用于节点流和边缘流的内部连接。
第二个用于超模式流和节点流的内部连接。
public class NodeEdgeJoiner implements ValueJoiner<Node, Edge, Pattern> {
@Override
public Pattern apply(Node node, Edge edge) {
Object[] segments = {node,edge};
return Pattern.newBuilder()
.setFirst(node.getNodeID())
.setNextJoinID(edge.getEndID())
.setSegments(Arrays.asList(segments))
.setTimestamp(Math.min(node.getTimestamp(),edge.getTimestamp()))
.build();
}
}
public class PatternNodeJoiner implements ValueJoiner<Pattern, Node, Pattern> {
@Override
public Pattern apply(Pattern pattern, Node node) {
List<Object> segments = pattern.getSegments();
segments.add(node);
return Pattern.newBuilder()
.setFirst(pattern.getFirst())
.setNextJoinID(node.getNodeID())
.setSegments(segments)
.setTimestamp(Math.min(node.getTimestamp(),pattern.getTimestamp()))
.build();
}
}
我的目的是捕捉像这样的模式:(nodeId == 1)--[label == "related_to"]-->() where
- (nodeId == 1) 表示id=1的节点
- --[label == "related_to"]-->表示一条有向边,label = "related_to"
- () 表示通用节点。
将这些部分连接在一起的想法是使用之前的 Valuejoiners 执行两个连续的连接。我希望您关注两个 ValueJoiner 执行的第一个操作:为了构建模式,我只是简单地将节点和边附加到列表的末尾,该列表是模式的 Avro 模式的一部分。
以下是生成节点和边并将其发布在相应主题中的通用循环。每条节点记录的key对应nodeID,每条边记录的key为边入节点的nodeID。
while(true){
try (final KafkaProducer<Long, Node> nodeKafkaProducer = new KafkaProducer<Long, Node>(props)) {
final KafkaProducer<Long, Edge> edgeKafkaProducer = new KafkaProducer<Long, Edge>(props);
nodeKafkaProducer.send(new ProducerRecord<Long, Node>(nodeTopic, (long) 1,
buildNodeRecord(1, Collections.singletonList("aString"), "aString",
System.currentTimeMillis())));
edgeKafkaProducer.send(new ProducerRecord<Long, Edge>(edgesTopic, (long) 1,
buildEdgeRecord(1, 1, 4, "related_to", "aString",
System.currentTimeMillis())));
Thread.sleep(9000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
其中:
private Node buildNodeRecord(long nodeId, List<String> labelsToSet, String property, long timestamp){
Node record = new Node();
record.setNodeID(nodeId);
record.setLabels(labelsToSet);
Map<String, String> propMap = new HashMap<String, String>();
propMap.put("property", property);
record.setProperties(propMap);
record.setTimestamp(timestamp);
return record;
}
private Edge buildEdgeRecord(long edgeId,long startID, long endID, String type, String property, long timestamp) {
Edge record = new Edge();
record.setEdgeID(edgeId);
record.setStartID(startID);
record.setEndID(endID);
record.setType(type);
Map<String,String> propMap = new HashMap<String, String>();
propMap.put("property",property);
record.setProperties(propMap);
record.setTimestamp(timestamp);
return record;
}
以下部分代码描述了管道。
//configuration of specific avro serde for pattern type
final SpecificAvroSerde<Pattern> patternSpecificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
patternSpecificAvroSerde.configure(serdeConfig,false);
//the valueJoiners we need
final NodeEdgeJoiner nodeEdgeJoiner = new NodeEdgeJoiner();
final PatternNodeJoiner patternNodeJoiner = new PatternNodeJoiner();
//timestampExtractors
NodeTimestampExtractor nodeTimestampExtractor = new NodeTimestampExtractor();
SubPatternTimeStampExtractor subPatternTimeStampExtractor = new SubPatternTimeStampExtractor();
EdgeTimestampExtractor edgeTimestampExtractor = new EdgeTimestampExtractor();
//node source
final KStream<Long, Node> nodeKStream = builder.stream(envProps.getProperty("node.topic.name"),
Consumed.with(nodeTimestampExtractor));
//filter on nodes topic
nodeKStream.filter((key, value) -> value.getNodeID()==1).to(envProps.getProperty("firstnodes.topic.name"));
final KStream<Long,Node> firstFilteredNodes = builder.stream(envProps.getProperty("firstnodes.topic.name"),
Consumed.with(nodeTimestampExtractor));
//edges keyed by incoming node
final KStream<Long,Edge> edgeKstream = builder.stream(envProps.getProperty("edge.topic.name"),
Consumed.with(edgeTimestampExtractor));
//filter operation on edges for the first part of the pattern
final KStream<Long,Edge> firstEdgeFiltered = edgeKstream.filter((key, value) ->
value.getType().equals("related_to"));
//first join
firstFilteredNodes.join(firstEdgeFiltered,nodeEdgeSubJoiner,
JoinWindows.of(Duration.ofSeconds(10)))
.map((key, value) -> new KeyValue<Long, SubPattern>(value.getNextJoinID(), value))
.to(envProps.getProperty("firstJoin.topic.name"));
final KStream <Long,SubPattern> mappedFirstJoin = builder.stream(envProps.getProperty("firstJoin.topic.name"),
Consumed.with(subPatternTimeStampExtractor));
//second join
KStream <Long,Pattern> secondJoin = mappedFirstJoin
.join(nodeKStream,subPatternNodeJoiner, JoinWindows.of(Duration.ofSeconds(10)));
secondJoin.print(Printed.toSysOut()); // should print out final records
我不打算展示时间戳提取器,因为我认为它们与重点无关。
问题
所以我希望输出是模式记录流并且每个模式的列表(Avro模式中的“段”)大小相同:1个节点1条边和另一个节点。但这不会发生。相反,我得到了这个输出:
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427777, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427777, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427795, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252436822, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
.
.
.
如您所见,每条记录中有序节点和边数组的大小是不同的。特别是我总是在其中看到:一个节点和一条边,后面跟着许多节点。如果我减少 while(true){...} 中的睡眠毫秒数,它会变得更糟并生成非常长的列表,列表中有更多节点。
我保证节点-边缘连接在每种情况下都表现良好。它总是产生正确的结果。该问题似乎影响了第二次加入。但我不明白如何..
我尝试做一些测试但没有成功..
拓扑如下:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [nodes])
--> KSTREAM-WINDOWED-0000000015, KSTREAM-FILTER-0000000001
Source: KSTREAM-SOURCE-0000000013 (topics: [firstJoin])
--> KSTREAM-WINDOWED-0000000014
Processor: KSTREAM-WINDOWED-0000000014 (stores: [KSTREAM-JOINTHIS-0000000016-store])
--> KSTREAM-JOINTHIS-0000000016
<-- KSTREAM-SOURCE-0000000013
Processor: KSTREAM-WINDOWED-0000000015 (stores: [KSTREAM-JOINOTHER-0000000017-store])
--> KSTREAM-JOINOTHER-0000000017
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-JOINOTHER-0000000017 (stores: [KSTREAM-JOINTHIS-0000000016-store])
--> KSTREAM-MERGE-0000000018
<-- KSTREAM-WINDOWED-0000000015
Processor: KSTREAM-JOINTHIS-0000000016 (stores: [KSTREAM-JOINOTHER-0000000017-store])
--> KSTREAM-MERGE-0000000018
<-- KSTREAM-WINDOWED-0000000014
Processor: KSTREAM-FILTER-0000000001 (stores: [])
--> KSTREAM-SINK-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MERGE-0000000018 (stores: [])
--> KSTREAM-PRINTER-0000000019
<-- KSTREAM-JOINTHIS-0000000016, KSTREAM-JOINOTHER-0000000017
Processor: KSTREAM-PRINTER-0000000019 (stores: [])
--> none
<-- KSTREAM-MERGE-0000000018
Sink: KSTREAM-SINK-0000000002 (topic: firstFilter)
<-- KSTREAM-FILTER-0000000001
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000004 (topics: [edges])
--> KSTREAM-FILTER-0000000005
Processor: KSTREAM-FILTER-0000000005 (stores: [])
--> KSTREAM-WINDOWED-0000000007
<-- KSTREAM-SOURCE-0000000004
Source: KSTREAM-SOURCE-0000000003 (topics: [firstFilter])
--> KSTREAM-WINDOWED-0000000006
Processor: KSTREAM-WINDOWED-0000000006 (stores: [KSTREAM-JOINTHIS-0000000008-store])
--> KSTREAM-JOINTHIS-0000000008
<-- KSTREAM-SOURCE-0000000003
Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINOTHER-0000000009-store])
--> KSTREAM-JOINOTHER-0000000009
<-- KSTREAM-FILTER-0000000005
Processor: KSTREAM-JOINOTHER-0000000009 (stores: [KSTREAM-JOINTHIS-0000000008-store])
--> KSTREAM-MERGE-0000000010
<-- KSTREAM-WINDOWED-0000000007
Processor: KSTREAM-JOINTHIS-0000000008 (stores: [KSTREAM-JOINOTHER-0000000009-store])
--> KSTREAM-MERGE-0000000010
<-- KSTREAM-WINDOWED-0000000006
Processor: KSTREAM-MERGE-0000000010 (stores: [])
--> KSTREAM-MAP-0000000011
<-- KSTREAM-JOINTHIS-0000000008, KSTREAM-JOINOTHER-0000000009
Processor: KSTREAM-MAP-0000000011 (stores: [])
--> KSTREAM-SINK-0000000012
<-- KSTREAM-MERGE-0000000010
Sink: KSTREAM-SINK-0000000012 (topic: firstJoin)
<-- KSTREAM-MAP-0000000011
pom.xml
<groupId>KafkaJOINS</groupId>
<artifactId>KafkaJOINS</artifactId>
<version>1.0</version>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</pluginRepository>
</pluginRepositories>
<properties>
<log4j.version>2.13.3</log4j.version>
<avro.version>1.9.2</avro.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<confluent.version>6.0.0</confluent.version>
<kafka.version>6.0.0-ccs</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency><dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
在您的第一个 ValueJoiner
中,您创建了一个新对象:
Object[] segments = {node,edge};
在您的第二个 ValueJoiner
中,您正在获取一个列表并向其中添加内容。不过,您需要深度复制列表:
// your code
List<Object> segments = pattern.getSegments();
segments.add(node); // this effectively modifies the input object;
// if this input object joins multiple times,
// you may introduce an undesired side effect
// instead you should do
List<Object> segments = new LinkedList<>(pattern.getSegments());
segments.add(node);
问题设置
我有一个节点流和一个边流,它们表示图形的连续更新,我想使用多个串联连接来构建由节点和边组成的模式。假设我想匹配如下模式:(node1) --[edge1]--> (node2).
我的想法是将节点流与边流连接起来,以组成 (node1) --[edge1]--> 类型的子模式流。然后获取生成的流并再次将其与节点流连接起来,以组成最终模式 (node1) --[edge1]--> (node2)。对特定类型的节点和边进行过滤并不重要。
数据模型
所以我有以 Avro 格式构建的节点、边和模式:
{
"namespace": "DataModel",
"type": "record",
"name": "Node",
"doc": "Node schema, it contains a nodeID label and properties",
"fields": [
{
"name": "nodeID",
"type": "long"
},
{
"name": "labels",
"type": {
"type": "array",
"items": "string",
"avro.java.string": "String"
}
},
{
"name": "properties",
"type": {
"type": "map",
"values": "string",
"avro.java.string": "String"
}
},
{
"name": "timestamp",
"type": "long"
}
]
}
{
"namespace": "DataModel",
"type": "record",
"name": "Edge",
"doc": "contains edgeID, a type, a list of properties, a starting node ID and an ending node ID ",
"fields": [
{
"name": "edgeID",
"type": "long"
},
{
"name": "type",
"type": "string"
},
{
"name": "properties",
"type": {
"type": "map",
"values": "string",
"avro.java.string": "String"
}
},
{
"name": "startID",
"type": "long"
},
{
"name": "endID",
"type": "long"
},
{
"name": "timestamp",
"type": "long"
}
]
}
{
"namespace": "DataModel",
"type": "record",
"name": "Pattern",
"fields": [
{
"name": "first",
"type": "long"
},
{
"name": "nextJoinID",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "timestamp",
"type": "long"
},
{
"name": "segments",
"doc": "It's the ordered list of nodes and edges that compose this sub-pattern from the leftmost node to the rightmost edge or node",
"type": {
"type": "array",
"items": [
"DataModel.Node",
"DataModel.Edge"
]
}
}
然后我有以下两个ValueJoiner:
第一个用于节点流和边缘流的内部连接。
第二个用于超模式流和节点流的内部连接。
public class NodeEdgeJoiner implements ValueJoiner<Node, Edge, Pattern> {
@Override
public Pattern apply(Node node, Edge edge) {
Object[] segments = {node,edge};
return Pattern.newBuilder()
.setFirst(node.getNodeID())
.setNextJoinID(edge.getEndID())
.setSegments(Arrays.asList(segments))
.setTimestamp(Math.min(node.getTimestamp(),edge.getTimestamp()))
.build();
}
}
public class PatternNodeJoiner implements ValueJoiner<Pattern, Node, Pattern> {
@Override
public Pattern apply(Pattern pattern, Node node) {
List<Object> segments = pattern.getSegments();
segments.add(node);
return Pattern.newBuilder()
.setFirst(pattern.getFirst())
.setNextJoinID(node.getNodeID())
.setSegments(segments)
.setTimestamp(Math.min(node.getTimestamp(),pattern.getTimestamp()))
.build();
}
}
我的目的是捕捉像这样的模式:(nodeId == 1)--[label == "related_to"]-->() where
- (nodeId == 1) 表示id=1的节点
- --[label == "related_to"]-->表示一条有向边,label = "related_to"
- () 表示通用节点。
将这些部分连接在一起的想法是使用之前的 Valuejoiners 执行两个连续的连接。我希望您关注两个 ValueJoiner 执行的第一个操作:为了构建模式,我只是简单地将节点和边附加到列表的末尾,该列表是模式的 Avro 模式的一部分。 以下是生成节点和边并将其发布在相应主题中的通用循环。每条节点记录的key对应nodeID,每条边记录的key为边入节点的nodeID。
while(true){
try (final KafkaProducer<Long, Node> nodeKafkaProducer = new KafkaProducer<Long, Node>(props)) {
final KafkaProducer<Long, Edge> edgeKafkaProducer = new KafkaProducer<Long, Edge>(props);
nodeKafkaProducer.send(new ProducerRecord<Long, Node>(nodeTopic, (long) 1,
buildNodeRecord(1, Collections.singletonList("aString"), "aString",
System.currentTimeMillis())));
edgeKafkaProducer.send(new ProducerRecord<Long, Edge>(edgesTopic, (long) 1,
buildEdgeRecord(1, 1, 4, "related_to", "aString",
System.currentTimeMillis())));
Thread.sleep(9000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
其中:
private Node buildNodeRecord(long nodeId, List<String> labelsToSet, String property, long timestamp){
Node record = new Node();
record.setNodeID(nodeId);
record.setLabels(labelsToSet);
Map<String, String> propMap = new HashMap<String, String>();
propMap.put("property", property);
record.setProperties(propMap);
record.setTimestamp(timestamp);
return record;
}
private Edge buildEdgeRecord(long edgeId,long startID, long endID, String type, String property, long timestamp) {
Edge record = new Edge();
record.setEdgeID(edgeId);
record.setStartID(startID);
record.setEndID(endID);
record.setType(type);
Map<String,String> propMap = new HashMap<String, String>();
propMap.put("property",property);
record.setProperties(propMap);
record.setTimestamp(timestamp);
return record;
}
以下部分代码描述了管道。
//configuration of specific avro serde for pattern type
final SpecificAvroSerde<Pattern> patternSpecificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
patternSpecificAvroSerde.configure(serdeConfig,false);
//the valueJoiners we need
final NodeEdgeJoiner nodeEdgeJoiner = new NodeEdgeJoiner();
final PatternNodeJoiner patternNodeJoiner = new PatternNodeJoiner();
//timestampExtractors
NodeTimestampExtractor nodeTimestampExtractor = new NodeTimestampExtractor();
SubPatternTimeStampExtractor subPatternTimeStampExtractor = new SubPatternTimeStampExtractor();
EdgeTimestampExtractor edgeTimestampExtractor = new EdgeTimestampExtractor();
//node source
final KStream<Long, Node> nodeKStream = builder.stream(envProps.getProperty("node.topic.name"),
Consumed.with(nodeTimestampExtractor));
//filter on nodes topic
nodeKStream.filter((key, value) -> value.getNodeID()==1).to(envProps.getProperty("firstnodes.topic.name"));
final KStream<Long,Node> firstFilteredNodes = builder.stream(envProps.getProperty("firstnodes.topic.name"),
Consumed.with(nodeTimestampExtractor));
//edges keyed by incoming node
final KStream<Long,Edge> edgeKstream = builder.stream(envProps.getProperty("edge.topic.name"),
Consumed.with(edgeTimestampExtractor));
//filter operation on edges for the first part of the pattern
final KStream<Long,Edge> firstEdgeFiltered = edgeKstream.filter((key, value) ->
value.getType().equals("related_to"));
//first join
firstFilteredNodes.join(firstEdgeFiltered,nodeEdgeSubJoiner,
JoinWindows.of(Duration.ofSeconds(10)))
.map((key, value) -> new KeyValue<Long, SubPattern>(value.getNextJoinID(), value))
.to(envProps.getProperty("firstJoin.topic.name"));
final KStream <Long,SubPattern> mappedFirstJoin = builder.stream(envProps.getProperty("firstJoin.topic.name"),
Consumed.with(subPatternTimeStampExtractor));
//second join
KStream <Long,Pattern> secondJoin = mappedFirstJoin
.join(nodeKStream,subPatternNodeJoiner, JoinWindows.of(Duration.ofSeconds(10)));
secondJoin.print(Printed.toSysOut()); // should print out final records
我不打算展示时间戳提取器,因为我认为它们与重点无关。
问题
所以我希望输出是模式记录流并且每个模式的列表(Avro模式中的“段”)大小相同:1个节点1条边和另一个节点。但这不会发生。相反,我得到了这个输出:
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427777, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427777, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252427777}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427795, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}]}
[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252436822, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436822}, {"edgeID": 1, "type": "related_to", "properties": {"property": "aString"}, "startID": 1, "endID": 4, "timestamp": 1611252436837}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427795}, {"nodeID": 4, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252436847}]}
.
.
.
如您所见,每条记录中有序节点和边数组的大小是不同的。特别是我总是在其中看到:一个节点和一条边,后面跟着许多节点。如果我减少 while(true){...} 中的睡眠毫秒数,它会变得更糟并生成非常长的列表,列表中有更多节点。 我保证节点-边缘连接在每种情况下都表现良好。它总是产生正确的结果。该问题似乎影响了第二次加入。但我不明白如何.. 我尝试做一些测试但没有成功..
拓扑如下:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [nodes])
--> KSTREAM-WINDOWED-0000000015, KSTREAM-FILTER-0000000001
Source: KSTREAM-SOURCE-0000000013 (topics: [firstJoin])
--> KSTREAM-WINDOWED-0000000014
Processor: KSTREAM-WINDOWED-0000000014 (stores: [KSTREAM-JOINTHIS-0000000016-store])
--> KSTREAM-JOINTHIS-0000000016
<-- KSTREAM-SOURCE-0000000013
Processor: KSTREAM-WINDOWED-0000000015 (stores: [KSTREAM-JOINOTHER-0000000017-store])
--> KSTREAM-JOINOTHER-0000000017
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-JOINOTHER-0000000017 (stores: [KSTREAM-JOINTHIS-0000000016-store])
--> KSTREAM-MERGE-0000000018
<-- KSTREAM-WINDOWED-0000000015
Processor: KSTREAM-JOINTHIS-0000000016 (stores: [KSTREAM-JOINOTHER-0000000017-store])
--> KSTREAM-MERGE-0000000018
<-- KSTREAM-WINDOWED-0000000014
Processor: KSTREAM-FILTER-0000000001 (stores: [])
--> KSTREAM-SINK-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MERGE-0000000018 (stores: [])
--> KSTREAM-PRINTER-0000000019
<-- KSTREAM-JOINTHIS-0000000016, KSTREAM-JOINOTHER-0000000017
Processor: KSTREAM-PRINTER-0000000019 (stores: [])
--> none
<-- KSTREAM-MERGE-0000000018
Sink: KSTREAM-SINK-0000000002 (topic: firstFilter)
<-- KSTREAM-FILTER-0000000001
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000004 (topics: [edges])
--> KSTREAM-FILTER-0000000005
Processor: KSTREAM-FILTER-0000000005 (stores: [])
--> KSTREAM-WINDOWED-0000000007
<-- KSTREAM-SOURCE-0000000004
Source: KSTREAM-SOURCE-0000000003 (topics: [firstFilter])
--> KSTREAM-WINDOWED-0000000006
Processor: KSTREAM-WINDOWED-0000000006 (stores: [KSTREAM-JOINTHIS-0000000008-store])
--> KSTREAM-JOINTHIS-0000000008
<-- KSTREAM-SOURCE-0000000003
Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINOTHER-0000000009-store])
--> KSTREAM-JOINOTHER-0000000009
<-- KSTREAM-FILTER-0000000005
Processor: KSTREAM-JOINOTHER-0000000009 (stores: [KSTREAM-JOINTHIS-0000000008-store])
--> KSTREAM-MERGE-0000000010
<-- KSTREAM-WINDOWED-0000000007
Processor: KSTREAM-JOINTHIS-0000000008 (stores: [KSTREAM-JOINOTHER-0000000009-store])
--> KSTREAM-MERGE-0000000010
<-- KSTREAM-WINDOWED-0000000006
Processor: KSTREAM-MERGE-0000000010 (stores: [])
--> KSTREAM-MAP-0000000011
<-- KSTREAM-JOINTHIS-0000000008, KSTREAM-JOINOTHER-0000000009
Processor: KSTREAM-MAP-0000000011 (stores: [])
--> KSTREAM-SINK-0000000012
<-- KSTREAM-MERGE-0000000010
Sink: KSTREAM-SINK-0000000012 (topic: firstJoin)
<-- KSTREAM-MAP-0000000011
pom.xml
<groupId>KafkaJOINS</groupId>
<artifactId>KafkaJOINS</artifactId>
<version>1.0</version>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</pluginRepository>
</pluginRepositories>
<properties>
<log4j.version>2.13.3</log4j.version>
<avro.version>1.9.2</avro.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<confluent.version>6.0.0</confluent.version>
<kafka.version>6.0.0-ccs</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency><dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
在您的第一个 ValueJoiner
中,您创建了一个新对象:
Object[] segments = {node,edge};
在您的第二个 ValueJoiner
中,您正在获取一个列表并向其中添加内容。不过,您需要深度复制列表:
// your code
List<Object> segments = pattern.getSegments();
segments.add(node); // this effectively modifies the input object;
// if this input object joins multiple times,
// you may introduce an undesired side effect
// instead you should do
List<Object> segments = new LinkedList<>(pattern.getSegments());
segments.add(node);