如何使用 kafka connect 将 json 消息从 Kinesis 发送到 MSK,然后发送到弹性搜索
How to send json message from Kinesis to MSK and then to elastic search using kafka connect
我已经准备就绪,流程也在运行。
我正在使用 lambda 函数将我的数据从 Kinesis 流发送到 MSK,消息格式如下
{
"data": {
"RequestID": 517082653,
"ContentTypeID": 9,
"OrgID": 16145,
"UserID": 4,
"PromotionStartDateTime": "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime": "2019-12-14T16:17:45.507000000Z"
},
"metadata": {
"timestamp": "2019-12-29T10:37:31.502042Z",
"record-type": "data",
"operation": "insert",
"partition-key-type": "schema-table",
"schema-name": "dbo",
"table-name": "TRFSDIQueue"
}
}
这条 json 消息我发送到 kafka 主题如下
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("producer.type", "async");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
System.out.println("Inside loop successfully");
try {
producer.send(
new ProducerRecord<String, String>(topicName, new String(rec.getKinesis().getData().array())));
Thread.sleep(1000);
System.out.println("Message sent successfully");
} catch (Exception e) {
System.out.println("------------Exception message=-------------" + e.toString());
}
finally {
producer.flush();
producer.close();
}
当我启动 kafka connect 进行弹性搜索时,出现类似
的错误
DataException: Converting byte[] to Kafka Connect data failed due to serialization error
我还修改了 quickstart-elasticsearch.properties 并将键值序列化程序更改为字符串。
当它是 json 时抛出错误。
我可以看到在弹性搜索中使用 kafka 主题名称创建了索引,但没有记录。
所以请帮助我解决我的一些困惑。
1. 我是否从生产者运动流正确发送消息?
我正在使用
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
或者我应该在这里使用 json。但是没有 json 这样的。
或者我是否必须在 quickstart-elasticsearch.properties
中使用 json 序列化程序?
如果事件是插入那么它会在elastci搜索中插入记录删除和更新呢,Kafka-connect在弹性搜索中为我们处理删除和更新?
提前致谢
对于 30 天的免费试用,您可以使用 Kinesis Source Connector,或者您可以学习如何编写自己的源连接器并将其与 Elasticsearch 接收器一起部署,而不是完全使用 lambda...
其次,逆向工作。你能创建一个假主题并在 lambda 之外发送相同格式的记录 吗?那些最终会出现在卡夫卡吗?弹性搜索怎么样?并从等式中删除 Kibana,如果您正在使用它并且它不起作用
然后关注lambda集成
回答您的问题
1) 您将 JSON 作为字符串 发送。 JSON 不需要单独的序列化程序,除非您发送的 POJO 类 被映射到序列化程序接口中的 JSON 字符串中。
您正在发送 JSON 条记录,因此您应该在 Connect 中使用 JSONConverter,是的。但是,我不认为 Elasticsearch 映射会自动创建,除非你 have a schema and payload,所以简单的解决方法是提前创建 ES 索引映射(但是如果你已经知道,那么你已经设计了一个模式,所以发送正确的记录最终是生产者代码的责任)。
如果你提前定义映射,你应该能够简单地在 Connect 中使用 StringConverter
关于您的生产者代码,我唯一要更改的是重试次数高于 0。使用资源尝试而不是显式关闭生产者
//... parse input
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
//... send record
}
2) 您可以搜索连接器的 Github 问题,但我最后检查了一下,它执行完整的文档更新和插入,没有部分更新或任何删除
我已经准备就绪,流程也在运行。 我正在使用 lambda 函数将我的数据从 Kinesis 流发送到 MSK,消息格式如下
{
"data": {
"RequestID": 517082653,
"ContentTypeID": 9,
"OrgID": 16145,
"UserID": 4,
"PromotionStartDateTime": "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime": "2019-12-14T16:17:45.507000000Z"
},
"metadata": {
"timestamp": "2019-12-29T10:37:31.502042Z",
"record-type": "data",
"operation": "insert",
"partition-key-type": "schema-table",
"schema-name": "dbo",
"table-name": "TRFSDIQueue"
}
}
这条 json 消息我发送到 kafka 主题如下
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("producer.type", "async");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
System.out.println("Inside loop successfully");
try {
producer.send(
new ProducerRecord<String, String>(topicName, new String(rec.getKinesis().getData().array())));
Thread.sleep(1000);
System.out.println("Message sent successfully");
} catch (Exception e) {
System.out.println("------------Exception message=-------------" + e.toString());
}
finally {
producer.flush();
producer.close();
}
当我启动 kafka connect 进行弹性搜索时,出现类似
的错误DataException: Converting byte[] to Kafka Connect data failed due to serialization error
我还修改了 quickstart-elasticsearch.properties 并将键值序列化程序更改为字符串。
当它是 json 时抛出错误。
我可以看到在弹性搜索中使用 kafka 主题名称创建了索引,但没有记录。
所以请帮助我解决我的一些困惑。 1. 我是否从生产者运动流正确发送消息? 我正在使用
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
或者我应该在这里使用 json。但是没有 json 这样的。
或者我是否必须在
quickstart-elasticsearch.properties
中使用 json 序列化程序?如果事件是插入那么它会在elastci搜索中插入记录删除和更新呢,Kafka-connect在弹性搜索中为我们处理删除和更新?
提前致谢
对于 30 天的免费试用,您可以使用 Kinesis Source Connector,或者您可以学习如何编写自己的源连接器并将其与 Elasticsearch 接收器一起部署,而不是完全使用 lambda...
其次,逆向工作。你能创建一个假主题并在 lambda 之外发送相同格式的记录 吗?那些最终会出现在卡夫卡吗?弹性搜索怎么样?并从等式中删除 Kibana,如果您正在使用它并且它不起作用
然后关注lambda集成
回答您的问题
1) 您将 JSON 作为字符串 发送。 JSON 不需要单独的序列化程序,除非您发送的 POJO 类 被映射到序列化程序接口中的 JSON 字符串中。
您正在发送 JSON 条记录,因此您应该在 Connect 中使用 JSONConverter,是的。但是,我不认为 Elasticsearch 映射会自动创建,除非你 have a schema and payload,所以简单的解决方法是提前创建 ES 索引映射(但是如果你已经知道,那么你已经设计了一个模式,所以发送正确的记录最终是生产者代码的责任)。
如果你提前定义映射,你应该能够简单地在 Connect 中使用 StringConverter
关于您的生产者代码,我唯一要更改的是重试次数高于 0。使用资源尝试而不是显式关闭生产者
//... parse input
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
//... send record
}
2) 您可以搜索连接器的 Github 问题,但我最后检查了一下,它执行完整的文档更新和插入,没有部分更新或任何删除