发布和使用不同类型消息的最佳方式是什么?
What is the best way to publish and consume different type of messages?
卡夫卡 0.8V
我想发布/消费 byte[] 对象、java bean 对象、可序列化对象等等..
为这种类型的场景定义发布者和消费者的最佳方式是什么?
当我消费来自消费者迭代器的消息时,我不知道它是什么类型的消息。
谁能指点我如何设计此类场景的指南?
我对每个 Kafka 主题实施单一模式或 object 类型。这样,当您收到消息时,您就知道自己收到了什么。
至少,您应该决定给定的主题是要保存 binary
还是 string
数据,并根据此决定如何对其进行进一步编码。
例如,您可以有一个名为 Schema 的主题,其中包含存储为字符串的 JSON
-编码 object。
如果您使用 JSON
和像 JavaScript 这样的 loosely-typed 语言,可能很容易在同一主题中存储具有不同模式的不同 object。使用 JavaScript,您只需调用 JSON.parse(...)
,查看生成的 object,并确定您要用它做什么。
但是你不能用像 Scala 这样的 strictly-typed 语言来做到这一点。 Scala JSON 解析器通常希望您将 JSON 解析为已定义的 Scala 类型,通常是 case class
。他们不适用于此模型。
一种解决方案是保留一个模式/一个主题规则,但稍微作弊:将 object 包装在 object 中。一个典型的示例是 Action object,其中您有一个描述操作的 header,以及一个具有依赖于header 中列出的操作类型。想象一下 pseudo-schema:
{name: "Action", fields: [
{name: "actionType", type: "string"},
{name: "actionObject", type: "string"}
]}
这样,即使在 strongly-typed 语言中,您也可以执行以下操作(同样是 pseudo-code):
action = JSONParser[Action].parse(msg)
switch(action.actionType) {
case "foo" => var foo = JSONParser[Foo].parse(action.actionObject)
case "bar" => var bar = JSONParser[Bar].parse(action.actionObject)
}
这种方法的一个巧妙之处在于,如果您有一个消费者只等待特定的 action.actionType
,并且将忽略所有其他的,那么它非常轻量级,只需解码header 并推迟解码 action.actionObject
直到需要的时候。
到目前为止,这都是关于 string-encoded 数据的。如果你想使用二进制数据,当然你也可以将它包装在 JSON 中,或者像 XML 这样的 string-based 编码中的任何一种。但是也有很多 binary-encoding 系统,比如 Thrift 和 Avro. In fact, the pseudo-schema above is based on Avro. You can even do cool things in Avro like schema evolution, which amongst other things provides a very slick way to handle the above Action
use case -- instead of wrapping an object in an object, you can define a schema that is a subset of other schemas and decode just the fields you want, in this case just the action.actionType
field. Here is a really excellent description of schema evolution.
简而言之,我推荐的是:
- 选择 schema-based 编码系统(可以是 JSON、XML、Avro、
随便)
- 每个主题规则执行一个模式
卡夫卡 0.8V
我想发布/消费 byte[] 对象、java bean 对象、可序列化对象等等..
为这种类型的场景定义发布者和消费者的最佳方式是什么? 当我消费来自消费者迭代器的消息时,我不知道它是什么类型的消息。 谁能指点我如何设计此类场景的指南?
我对每个 Kafka 主题实施单一模式或 object 类型。这样,当您收到消息时,您就知道自己收到了什么。
至少,您应该决定给定的主题是要保存 binary
还是 string
数据,并根据此决定如何对其进行进一步编码。
例如,您可以有一个名为 Schema 的主题,其中包含存储为字符串的 JSON
-编码 object。
如果您使用 JSON
和像 JavaScript 这样的 loosely-typed 语言,可能很容易在同一主题中存储具有不同模式的不同 object。使用 JavaScript,您只需调用 JSON.parse(...)
,查看生成的 object,并确定您要用它做什么。
但是你不能用像 Scala 这样的 strictly-typed 语言来做到这一点。 Scala JSON 解析器通常希望您将 JSON 解析为已定义的 Scala 类型,通常是 case class
。他们不适用于此模型。
一种解决方案是保留一个模式/一个主题规则,但稍微作弊:将 object 包装在 object 中。一个典型的示例是 Action object,其中您有一个描述操作的 header,以及一个具有依赖于header 中列出的操作类型。想象一下 pseudo-schema:
{name: "Action", fields: [
{name: "actionType", type: "string"},
{name: "actionObject", type: "string"}
]}
这样,即使在 strongly-typed 语言中,您也可以执行以下操作(同样是 pseudo-code):
action = JSONParser[Action].parse(msg)
switch(action.actionType) {
case "foo" => var foo = JSONParser[Foo].parse(action.actionObject)
case "bar" => var bar = JSONParser[Bar].parse(action.actionObject)
}
这种方法的一个巧妙之处在于,如果您有一个消费者只等待特定的 action.actionType
,并且将忽略所有其他的,那么它非常轻量级,只需解码header 并推迟解码 action.actionObject
直到需要的时候。
到目前为止,这都是关于 string-encoded 数据的。如果你想使用二进制数据,当然你也可以将它包装在 JSON 中,或者像 XML 这样的 string-based 编码中的任何一种。但是也有很多 binary-encoding 系统,比如 Thrift 和 Avro. In fact, the pseudo-schema above is based on Avro. You can even do cool things in Avro like schema evolution, which amongst other things provides a very slick way to handle the above Action
use case -- instead of wrapping an object in an object, you can define a schema that is a subset of other schemas and decode just the fields you want, in this case just the action.actionType
field. Here is a really excellent description of schema evolution.
简而言之,我推荐的是:
- 选择 schema-based 编码系统(可以是 JSON、XML、Avro、 随便)
- 每个主题规则执行一个模式