Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)

Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)

我有一个使用 kafka 和协议缓冲区生成和使用消息的应用程序,一切都很好。我正在使用 SerializeAsString() 序列化协议缓冲区(这个应用程序是用 C++ 编写的)。

现在,我添加了新的 node.js 网站,该网站也使用消息并尝试对其进行解码。

我的js代码(使用很棒的ProtoBuf.js模块):

var builder = ProtoBuf.loadProtoFile("/home/aii/general/proto/All.proto"),
    protobuf = builder.build("protobuf"),
    Trace = protobuf.Trace,
    MessageType = protobuf.MessageType,
    MessageTypeAck = protobuf.MessageTypeAck,
    MessageTypeKeepAlive = protobuf.MessageTypeKeepAlive;

function getMessageType(val) {
  return Object.keys(MessageType).filter(function(key) {return MessageType[key] === val})[0]
}

consumer.on('message', function (message) {
    try{
      switch(getMessageType(message.key[0])) {
        case 'MESSAGE_TYPE_ACK':
          console.log(MessageTypeAck.decode(message.value));
          break;
        case 'MESSAGE_TYPE_KEEP_ALIVE':
          console.log(MessageTypeKeepAlive.decode(message.value));
          break;
        default:
          console.log("Unknown message type");
      }
    } catch (e){
      if (e.decoded) {
        var err = e.decoded;
        console.log(err);
      }
      else {
        console.log(e);
      }
    }
});

结果:

[Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)]

我的原型文件:

Trace.proto:

package protobuf;

message Trace {
    optional string topic = 1;
    optional int32 partition = 2;
    optional int64 offset = 3;
}

MessageType.proto

package protobuf;

enum MessageType {
    MESSAGE_TYPE_ACK = 1;
    MESSAGE_TYPE_KEEP_ALIVE = 2;
}

Messages.proto:

import "Trace.proto";

package protobuf;

message MessageTypeAck {
    repeated Trace trace = 1;

    optional string sourceModuleName = 2;
    optional int32  sourceModuleID   = 3;
}

message MessageTypeKeepAlive {
    repeated Trace trace = 1;

    optional string sourceModuleName = 2;
    optional int32  sourceModuleID   = 3;
}

All.proto

import "Trace.proto"
import "MessageType.proto";
import "Messages.proto"

我做错了什么? (解码?)

所以,多亏了这个 ,我想通了! 问题与我使用缓冲区(通过 kafka)的方式有关 - 作为 utf-8(默认)。它实际上与我没有附加的代码有关:

var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client('localhost:2181'),
    consumer = new Consumer(
        client,
        [
            { topic: 'Genesis', partition: 0 }
        ],
        {
            autoCommit: false,
            encoding: 'buffer'
        }
    ); 

解决方案是添加编码:'buffer' 行(默认为 'utf-8',如前所述 here)。