使用kafka流创建一个基于elasticsearch事件的table
Using kafka streams to create a table based on elasticsearch events
是否可以使用 Kafka 流创建一个从 Kafka 主题读取 JSON 的管道,然后对它们执行一些逻辑并将结果发送到另一个 Kafka 主题或其他东西?
例如,我使用来自 elasticsearch 的日志来填充我的主题。使用简单的 logstash 管道,这非常容易。
一旦我在 kafka 主题中有了我的日志,我想从日志中提取一些信息并将它们放入一种 "table" 的 N 列中(Kafka 能够做到这一点吗?)和然后将 table 放在其他地方(另一个主题或数据库)。
我没有找到任何符合我标准的示例。
谢谢
是的,有可能。
kafka 或kafka-streams 中没有列的概念。但是,您通常只定义一个您选择的普通旧 java 对象,其中包含您想要的字段(在这种情况下,字段相当于列)。 You produce the output in that format to an output topic (using an appropriately chosen serializer).最后,如果您想将结果存储在关系数据库中,您可以将字段映射到列中,通常使用 kafka connect jdbc 接收器:
http://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html
是否可以使用 Kafka 流创建一个从 Kafka 主题读取 JSON 的管道,然后对它们执行一些逻辑并将结果发送到另一个 Kafka 主题或其他东西?
例如,我使用来自 elasticsearch 的日志来填充我的主题。使用简单的 logstash 管道,这非常容易。
一旦我在 kafka 主题中有了我的日志,我想从日志中提取一些信息并将它们放入一种 "table" 的 N 列中(Kafka 能够做到这一点吗?)和然后将 table 放在其他地方(另一个主题或数据库)。
我没有找到任何符合我标准的示例。
谢谢
是的,有可能。
kafka 或kafka-streams 中没有列的概念。但是,您通常只定义一个您选择的普通旧 java 对象,其中包含您想要的字段(在这种情况下,字段相当于列)。 You produce the output in that format to an output topic (using an appropriately chosen serializer).最后,如果您想将结果存储在关系数据库中,您可以将字段映射到列中,通常使用 kafka connect jdbc 接收器: http://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html