Kafka 流:groupByKey 并在流中发生错误时减少不触发操作恰好一次
Kafka streams: groupByKey and reduce not triggering action exactly once when error occurs in stream
我有一个简单的 Kafka 流场景,我正在执行一个 groupyByKey
然后 reduce
然后一个动作。源主题中可能存在重复事件,因此 groupyByKey
和 reduce
该操作可能会出错,在这种情况下,我需要流应用程序重新处理该事件。在下面的例子中,我总是抛出一个错误来证明这一点。
动作只发生一次且至少发生一次非常重要。
我发现的问题是,当流应用程序重新处理事件时,会调用 reduce 函数,因此 returns null
不会调用操作。
由于源主题 TOPIC_NAME
只产生了一个事件,我希望 reduce 没有任何值并跳到 mapValues
.
val topologyBuilder = StreamsBuilder()
topologyBuilder.stream(
TOPIC_NAME,
Consumed.with(Serdes.String(), EventSerde())
)
.groupByKey(Grouped.with(Serdes.String(), EventSerde()))
.reduce { current, _ ->
println("reduce hit")
null
}
.mapValues { _, v ->
println(Id: "${v.correlationId}")
throw Exception("simulate error")
}
为了导致这个问题,我 运行 了两次流应用程序。这是输出:
第一个运行
Id: 90e6aefb-8763-4861-8d82-1304a6b5654e
11:10:52.320 [test-app-dcea4eb1-a58f-4a30-905f-46dad446b31e-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [test-app-dcea4eb1-a58f-4a30-905f-46dad446b31e] All stream threads have died. The instance will be in error state and should be closed.
第二个运行
reduce hit
如您所见,.mapValues
不会在第二个 运行 上被调用,即使它在第一个 运行 上出错导致流应用程序再次重新处理相同的事件.
是否可以让 Streams 应用程序以减少的步骤重新处理事件,以前所未有的方式处理事件? - 还是有更好的方法来解决我的问题?
我缺少流应用程序的 属性 设置。
props["processing.guarantee"]= "exactly_once"
通过设置此项,它将保证从拾取事件时创建的任何状态都将在抛出异常和流应用程序崩溃的情况下回滚。
问题是 streams 应用程序会再次拾取事件以重新处理,但 reducer 步骤的状态一直存在。通过启用 exactly_once
设置,它确保减速器状态也被回滚。
它现在成功地重新处理了事件,就好像它以前从未见过它一样
我有一个简单的 Kafka 流场景,我正在执行一个 groupyByKey
然后 reduce
然后一个动作。源主题中可能存在重复事件,因此 groupyByKey
和 reduce
该操作可能会出错,在这种情况下,我需要流应用程序重新处理该事件。在下面的例子中,我总是抛出一个错误来证明这一点。
动作只发生一次且至少发生一次非常重要。
我发现的问题是,当流应用程序重新处理事件时,会调用 reduce 函数,因此 returns null
不会调用操作。
由于源主题 TOPIC_NAME
只产生了一个事件,我希望 reduce 没有任何值并跳到 mapValues
.
val topologyBuilder = StreamsBuilder()
topologyBuilder.stream(
TOPIC_NAME,
Consumed.with(Serdes.String(), EventSerde())
)
.groupByKey(Grouped.with(Serdes.String(), EventSerde()))
.reduce { current, _ ->
println("reduce hit")
null
}
.mapValues { _, v ->
println(Id: "${v.correlationId}")
throw Exception("simulate error")
}
为了导致这个问题,我 运行 了两次流应用程序。这是输出:
第一个运行
Id: 90e6aefb-8763-4861-8d82-1304a6b5654e
11:10:52.320 [test-app-dcea4eb1-a58f-4a30-905f-46dad446b31e-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [test-app-dcea4eb1-a58f-4a30-905f-46dad446b31e] All stream threads have died. The instance will be in error state and should be closed.
第二个运行
reduce hit
如您所见,.mapValues
不会在第二个 运行 上被调用,即使它在第一个 运行 上出错导致流应用程序再次重新处理相同的事件.
是否可以让 Streams 应用程序以减少的步骤重新处理事件,以前所未有的方式处理事件? - 还是有更好的方法来解决我的问题?
我缺少流应用程序的 属性 设置。
props["processing.guarantee"]= "exactly_once"
通过设置此项,它将保证从拾取事件时创建的任何状态都将在抛出异常和流应用程序崩溃的情况下回滚。
问题是 streams 应用程序会再次拾取事件以重新处理,但 reducer 步骤的状态一直存在。通过启用 exactly_once
设置,它确保减速器状态也被回滚。
它现在成功地重新处理了事件,就好像它以前从未见过它一样