org.apache.kafka.common.errors.RecordTooLargeException
org.apache.kafka.common.errors.RecordTooLargeException
我正在做一些 kafka Streams 聚合并将聚合记录写入主题并收到以下错误。我正在为聚合助手 class 使用自定义 json serde。我在一些博客上找到的解决这个问题的方法是增加max.request.size。
虽然我将 max.request 大小从默认值增加到 401391899,但序列化聚合消息的大小在后续写入主题时不断增加。
运行 10 分钟后的流,出现以下错误。不确定问题是否出在我的 serde 上,或者我是否应该更改 max.request.size 以外的任何配置设置来解决这个问题。
消息已写入主题;
{A=5, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=6, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=7, B=1, C=2, D=87, E=1, F=0.4482758620689655 }
org.apache.kafka.common.errors.RecordTooLargeException:序列化后的消息为 2292506 字节,大于您使用 max.request.size 配置配置的最大请求大小。
Exception in thread "StreamThread-1" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at com.google.gson.stream.JsonReader.nextString(JsonReader.java:1043)
at com.google.gson.stream.JsonReader.nextValue(JsonReader.java:784)
at com.google.gson.stream.JsonReader.nextInArray(JsonReader.java:693)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:376)
at com.google.gson.stream.JsonReader.hasNext(JsonReader.java:349)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory.read(ReflectiveTypeAdapterFactory.java:93)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:172)
at com.google.gson.Gson.fromJson(Gson.java:795)
at com.google.gson.Gson.fromJson(Gson.java:761)
at com.google.gson.Gson.fromJson(Gson.java:710)
at com.google.gson.Gson.fromJson(Gson.java:682)
at com.data.agg.streams.JsonDeserializer.deserialize(JsonDeserializer.java:34)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:156)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access0(CachingWindowStore.java:34)
at org.apache.kafka.streams.state.internals.CachingWindowStore.apply(CachingWindowStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:222)
at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:205)
at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:149)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
[Thread-2] INFO org.apache.kafka.streams.KafkaStreams - Stopped Kafka Stream process
这是一个黑暗的镜头,因为你没有分享你的代码,但我假设你 assemble 在你的 window 中记录 - 聚合成一个越来越大的记录,你维护为聚合结果。
因为状态由 Kafka 主题支持以实现容错,Streams 将记录写入该主题(每个键一个记录,值是属于该键的状态)。当您声明(每个键)随时间增长时,"state records" 会随着时间增长,并最终超过最大大小限制。
我正在做一些 kafka Streams 聚合并将聚合记录写入主题并收到以下错误。我正在为聚合助手 class 使用自定义 json serde。我在一些博客上找到的解决这个问题的方法是增加max.request.size。
虽然我将 max.request 大小从默认值增加到 401391899,但序列化聚合消息的大小在后续写入主题时不断增加。
运行 10 分钟后的流,出现以下错误。不确定问题是否出在我的 serde 上,或者我是否应该更改 max.request.size 以外的任何配置设置来解决这个问题。
消息已写入主题;
{A=5, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=6, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=7, B=1, C=2, D=87, E=1, F=0.4482758620689655 }
org.apache.kafka.common.errors.RecordTooLargeException:序列化后的消息为 2292506 字节,大于您使用 max.request.size 配置配置的最大请求大小。
Exception in thread "StreamThread-1" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at com.google.gson.stream.JsonReader.nextString(JsonReader.java:1043)
at com.google.gson.stream.JsonReader.nextValue(JsonReader.java:784)
at com.google.gson.stream.JsonReader.nextInArray(JsonReader.java:693)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:376)
at com.google.gson.stream.JsonReader.hasNext(JsonReader.java:349)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory.read(ReflectiveTypeAdapterFactory.java:93)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:172)
at com.google.gson.Gson.fromJson(Gson.java:795)
at com.google.gson.Gson.fromJson(Gson.java:761)
at com.google.gson.Gson.fromJson(Gson.java:710)
at com.google.gson.Gson.fromJson(Gson.java:682)
at com.data.agg.streams.JsonDeserializer.deserialize(JsonDeserializer.java:34)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:156)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access0(CachingWindowStore.java:34)
at org.apache.kafka.streams.state.internals.CachingWindowStore.apply(CachingWindowStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:222)
at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:205)
at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:149)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
[Thread-2] INFO org.apache.kafka.streams.KafkaStreams - Stopped Kafka Stream process
这是一个黑暗的镜头,因为你没有分享你的代码,但我假设你 assemble 在你的 window 中记录 - 聚合成一个越来越大的记录,你维护为聚合结果。
因为状态由 Kafka 主题支持以实现容错,Streams 将记录写入该主题(每个键一个记录,值是属于该键的状态)。当您声明(每个键)随时间增长时,"state records" 会随着时间增长,并最终超过最大大小限制。