MongoDB 接收器连接器:消息在 Apache Kafka 中被截断
MongoDB Sink Connector : Message truncated in Apache Kafka
我遇到了 MongoDB Kafka 连接器的问题。
我正在尝试将来自控制台生产者(和控制台消费者)的 json 消息生成到 Kafka 主题中。
当消息小于4096字节时,可以正常消费。但是当消息大于 4096 字节时,我得到这个异常:
ERROR WorkerSinkTask{id=scraper-mongo-sink-0} Error converting message value in topic 'rawdata' partition 0 at offset 154 and timestamp 1636471830852: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
.......
.......
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in VALUE_STRING
at [Source: (byte[])"{ ........."[truncated 3595 bytes]; line: 1, column: 4096]
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in VALUE_STRING
at [Source: (byte[])"{ "....[truncated 3595 bytes]; line: 1, column: 4096]
有谁知道导致此错误的原因吗?更重要的是,如何解决这个问题?
注意。我试过修改代理的一些默认属性,以及 producer/consumer 例如:offset.metadata.max.bytes、max.request.size、message.max.bytes、fetch.max.bytes
n等
请大家帮忙
您可以尝试使用 kcat
工具,但我依稀记得以前遇到过这个问题,我可能查看了源代码,但无论如何,替代方法是使用 shell 而不是键入(或粘贴)那么多数据。 (如果你是粘贴,那么问题要么是剪贴板,要么是终端,而不是Kafka)
kafka-console-producer ... < data.json
确保每行有一个 JSON object/array/value
我遇到了 MongoDB Kafka 连接器的问题。 我正在尝试将来自控制台生产者(和控制台消费者)的 json 消息生成到 Kafka 主题中。
当消息小于4096字节时,可以正常消费。但是当消息大于 4096 字节时,我得到这个异常:
ERROR WorkerSinkTask{id=scraper-mongo-sink-0} Error converting message value in topic 'rawdata' partition 0 at offset 154 and timestamp 1636471830852: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
.......
.......
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in VALUE_STRING
at [Source: (byte[])"{ ........."[truncated 3595 bytes]; line: 1, column: 4096]
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in VALUE_STRING
at [Source: (byte[])"{ "....[truncated 3595 bytes]; line: 1, column: 4096]
有谁知道导致此错误的原因吗?更重要的是,如何解决这个问题?
注意。我试过修改代理的一些默认属性,以及 producer/consumer 例如:offset.metadata.max.bytes、max.request.size、message.max.bytes、fetch.max.bytes n等
请大家帮忙
您可以尝试使用 kcat
工具,但我依稀记得以前遇到过这个问题,我可能查看了源代码,但无论如何,替代方法是使用 shell 而不是键入(或粘贴)那么多数据。 (如果你是粘贴,那么问题要么是剪贴板,要么是终端,而不是Kafka)
kafka-console-producer ... < data.json
确保每行有一个 JSON object/array/value