节点:向 Kafka 错误发送 Protobuf 消息
Node: Sending Protobuf message to Kafka Error
我正在尝试使用 HDFS kafka 连接器将 protobuf 消息从 kafka 发送到 HDFS。我的连接器配置如下所示
{
"name": "hdfs3-connector-test",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "1",
"topics": "test-topic",
"hdfs.url": "hdfs://10.8.0.1:9000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://10.8.0.1:8081",
"confluent.topic.bootstrap.servers": "10.8.0.1:9092",
"confluent.topic.replication.factor": "1"
}
}
为了对此进行测试,我尝试在小型节点应用程序中发送 protobuf 序列化消息。这是我的文件:
// data.proto
syntax = "proto3";
package awesomepackage;
message SearchRequest {
string query = 1;
int32 page = 2;
}
和我的节点应用程序
const { Kafka } = require('kafkajs')
const protobuf = require('protobufjs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['10.8.0.1:9092']
})
const producer = kafka.producer()
const run = async () => {
await producer.connect()
protobuf.load('data.proto', async (err, root) => {
console.log("TESTING")
console.log(err)
let SearchRequest = root.lookupType('awesomepackage.SearchRequest')
let payload = {query: "test", page: 2}
var errMsg = SearchRequest.verify(payload);
console.log(errMsg)
let msg = SearchRequest.create(payload)
var buffer = SearchRequest.encode(msg).finish();
console.log(buffer)
await producer.send({
topic: 'test-topic',
messages: [
{key: 'key1', value: buffer}
]
})
})
}
run()
但是,当我 运行 这样做时,出现以下错误:
Failed to deserialize data for topic test-topic to Protobuf
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我该如何解决这个问题?我的猜测是我的 protobuf 模式没有在 kafka 模式注册表中注册,但我不确定。如果是这种情况,有没有办法将模式从节点发送到注册表?
io.confluent.connect.protobuf.ProtobufConverter
需要 Schema Registry,而不是普通的序列化 Protobuf。换句话说,您在节点代码中缺少模式注册表部分(或“包装的”Proto 消息的手动字节创建)
如果您不想使用 Schema Registry,可以使用 BlueApron Protobuf Converter,但看起来您正在使用一个,所以最好使用 Confluent 转换器
我正在尝试使用 HDFS kafka 连接器将 protobuf 消息从 kafka 发送到 HDFS。我的连接器配置如下所示
{
"name": "hdfs3-connector-test",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "1",
"topics": "test-topic",
"hdfs.url": "hdfs://10.8.0.1:9000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://10.8.0.1:8081",
"confluent.topic.bootstrap.servers": "10.8.0.1:9092",
"confluent.topic.replication.factor": "1"
}
}
为了对此进行测试,我尝试在小型节点应用程序中发送 protobuf 序列化消息。这是我的文件:
// data.proto
syntax = "proto3";
package awesomepackage;
message SearchRequest {
string query = 1;
int32 page = 2;
}
和我的节点应用程序
const { Kafka } = require('kafkajs')
const protobuf = require('protobufjs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['10.8.0.1:9092']
})
const producer = kafka.producer()
const run = async () => {
await producer.connect()
protobuf.load('data.proto', async (err, root) => {
console.log("TESTING")
console.log(err)
let SearchRequest = root.lookupType('awesomepackage.SearchRequest')
let payload = {query: "test", page: 2}
var errMsg = SearchRequest.verify(payload);
console.log(errMsg)
let msg = SearchRequest.create(payload)
var buffer = SearchRequest.encode(msg).finish();
console.log(buffer)
await producer.send({
topic: 'test-topic',
messages: [
{key: 'key1', value: buffer}
]
})
})
}
run()
但是,当我 运行 这样做时,出现以下错误:
Failed to deserialize data for topic test-topic to Protobuf
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我该如何解决这个问题?我的猜测是我的 protobuf 模式没有在 kafka 模式注册表中注册,但我不确定。如果是这种情况,有没有办法将模式从节点发送到注册表?
io.confluent.connect.protobuf.ProtobufConverter
需要 Schema Registry,而不是普通的序列化 Protobuf。换句话说,您在节点代码中缺少模式注册表部分(或“包装的”Proto 消息的手动字节创建)
如果您不想使用 Schema Registry,可以使用 BlueApron Protobuf Converter,但看起来您正在使用一个,所以最好使用 Confluent 转换器