Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)
Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)
我有一个使用 kafka 和协议缓冲区生成和使用消息的应用程序,一切都很好。我正在使用 SerializeAsString()
序列化协议缓冲区(这个应用程序是用 C++ 编写的)。
现在,我添加了新的 node.js 网站,该网站也使用消息并尝试对其进行解码。
我的js代码(使用很棒的ProtoBuf.js模块):
var builder = ProtoBuf.loadProtoFile("/home/aii/general/proto/All.proto"),
protobuf = builder.build("protobuf"),
Trace = protobuf.Trace,
MessageType = protobuf.MessageType,
MessageTypeAck = protobuf.MessageTypeAck,
MessageTypeKeepAlive = protobuf.MessageTypeKeepAlive;
function getMessageType(val) {
return Object.keys(MessageType).filter(function(key) {return MessageType[key] === val})[0]
}
consumer.on('message', function (message) {
try{
switch(getMessageType(message.key[0])) {
case 'MESSAGE_TYPE_ACK':
console.log(MessageTypeAck.decode(message.value));
break;
case 'MESSAGE_TYPE_KEEP_ALIVE':
console.log(MessageTypeKeepAlive.decode(message.value));
break;
default:
console.log("Unknown message type");
}
} catch (e){
if (e.decoded) {
var err = e.decoded;
console.log(err);
}
else {
console.log(e);
}
}
});
结果:
[Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)]
我的原型文件:
Trace.proto:
package protobuf;
message Trace {
optional string topic = 1;
optional int32 partition = 2;
optional int64 offset = 3;
}
MessageType.proto
package protobuf;
enum MessageType {
MESSAGE_TYPE_ACK = 1;
MESSAGE_TYPE_KEEP_ALIVE = 2;
}
Messages.proto:
import "Trace.proto";
package protobuf;
message MessageTypeAck {
repeated Trace trace = 1;
optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}
message MessageTypeKeepAlive {
repeated Trace trace = 1;
optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}
All.proto
import "Trace.proto"
import "MessageType.proto";
import "Messages.proto"
我做错了什么? (解码?)
所以,多亏了这个 ,我想通了!
问题与我使用缓冲区(通过 kafka)的方式有关 - 作为 utf-8(默认)。它实际上与我没有附加的代码有关:
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('localhost:2181'),
consumer = new Consumer(
client,
[
{ topic: 'Genesis', partition: 0 }
],
{
autoCommit: false,
encoding: 'buffer'
}
);
解决方案是添加编码:'buffer' 行(默认为 'utf-8',如前所述 here)。
我有一个使用 kafka 和协议缓冲区生成和使用消息的应用程序,一切都很好。我正在使用 SerializeAsString()
序列化协议缓冲区(这个应用程序是用 C++ 编写的)。
现在,我添加了新的 node.js 网站,该网站也使用消息并尝试对其进行解码。
我的js代码(使用很棒的ProtoBuf.js模块):
var builder = ProtoBuf.loadProtoFile("/home/aii/general/proto/All.proto"),
protobuf = builder.build("protobuf"),
Trace = protobuf.Trace,
MessageType = protobuf.MessageType,
MessageTypeAck = protobuf.MessageTypeAck,
MessageTypeKeepAlive = protobuf.MessageTypeKeepAlive;
function getMessageType(val) {
return Object.keys(MessageType).filter(function(key) {return MessageType[key] === val})[0]
}
consumer.on('message', function (message) {
try{
switch(getMessageType(message.key[0])) {
case 'MESSAGE_TYPE_ACK':
console.log(MessageTypeAck.decode(message.value));
break;
case 'MESSAGE_TYPE_KEEP_ALIVE':
console.log(MessageTypeKeepAlive.decode(message.value));
break;
default:
console.log("Unknown message type");
}
} catch (e){
if (e.decoded) {
var err = e.decoded;
console.log(err);
}
else {
console.log(e);
}
}
});
结果:
[Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)]
我的原型文件:
Trace.proto:
package protobuf;
message Trace {
optional string topic = 1;
optional int32 partition = 2;
optional int64 offset = 3;
}
MessageType.proto
package protobuf;
enum MessageType {
MESSAGE_TYPE_ACK = 1;
MESSAGE_TYPE_KEEP_ALIVE = 2;
}
Messages.proto:
import "Trace.proto";
package protobuf;
message MessageTypeAck {
repeated Trace trace = 1;
optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}
message MessageTypeKeepAlive {
repeated Trace trace = 1;
optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}
All.proto
import "Trace.proto"
import "MessageType.proto";
import "Messages.proto"
我做错了什么? (解码?)
所以,多亏了这个
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('localhost:2181'),
consumer = new Consumer(
client,
[
{ topic: 'Genesis', partition: 0 }
],
{
autoCommit: false,
encoding: 'buffer'
}
);
解决方案是添加编码:'buffer' 行(默认为 'utf-8',如前所述 here)。