发布和使用不同类型消息的最佳方式是什么?

What is the best way to publish and consume different type of messages?

卡夫卡 0.8V

我想发布/消费 byte[] 对象、java bean 对象、可序列化对象等等..

为这种类型的场景定义发布者和消费者的最佳方式是什么? 当我消费来自消费者迭代器的消息时,我不知道它是什么类型的消息。 谁能指点我如何设计此类场景的指南?

我对每个 Kafka 主题实施单一模式或 object 类型。这样,当您收到消息时,您就知道自己收到了什么。

至少,您应该决定给定的主题是要保存 binary 还是 string 数据,并根据此决定如何对其进行进一步编码。

例如,您可以有一个名为 Schema 的主题,其中包含存储为字符串的 JSON-编码 object。

如果您使用 JSON 和像 JavaScript 这样的 loosely-typed 语言,可能很容易在同一主题中存储具有不同模式的不同 object。使用 JavaScript,您只需调用 JSON.parse(...),查看生成的 object,并确定您要用它做什么。

但是你不能用像 Scala 这样的 strictly-typed 语言来做到这一点。 Scala JSON 解析器通常希望您将 JSON 解析为已定义的 Scala 类型,通常是 case class。他们不适用于此模型。

一种解决方案是保留一个模式/一个主题规则,但稍微作弊:将 object 包装在 object 中。一个典型的示例是 Action object,其中您有一个描述操作的 header,以及一个具有依赖于header 中列出的操作类型。想象一下 pseudo-schema:

{name: "Action", fields: [
  {name: "actionType", type: "string"},
  {name: "actionObject", type: "string"}
]}

这样,即使在 strongly-typed 语言中,您也可以执行以下操作(同样是 pseudo-code):

action = JSONParser[Action].parse(msg)
switch(action.actionType) {
  case "foo" => var foo = JSONParser[Foo].parse(action.actionObject)
  case "bar" => var bar = JSONParser[Bar].parse(action.actionObject)
}

这种方法的一个巧妙之处在于,如果您有一个消费者只等待特定的 action.actionType,并且将忽略所有其他的,那么它非常轻量级,只需解码header 并推迟解码 action.actionObject 直到需要的时候。

到目前为止,这都是关于 string-encoded 数据的。如果你想使用二进制数据,当然你也可以将它包装在 JSON 中,或者像 XML 这样的 string-based 编码中的任何一种。但是也有很多 binary-encoding 系统,比如 Thrift 和 Avro. In fact, the pseudo-schema above is based on Avro. You can even do cool things in Avro like schema evolution, which amongst other things provides a very slick way to handle the above Action use case -- instead of wrapping an object in an object, you can define a schema that is a subset of other schemas and decode just the fields you want, in this case just the action.actionType field. Here is a really excellent description of schema evolution.

简而言之,我推荐的是:

  1. 选择 schema-based 编码系统(可以是 JSON、XML、Avro、 随便)
  2. 每个主题规则执行一个模式