Kafka Mysql CDC 到 Elastic search

Kafka Mysql CDC to Elastic search

我试图通过 source-connector 将我的 MySql 数据库更改外包给 kafka 主题,这很有效。现在我想将这些数据发送到弹性搜索实例。

为此,我正在关注此 Kafka Connect Elasticsearch: Consuming and Indexing with Kafka Connect and this one Kafka Connect and Elasticsearch

对于 mysql 到 kafka 的 CDC,我可以看到我在 mysql 中所做的更改并读取它创建一个源连接器,但是当我创建另一个连接器时 elasticsearch-sink 连接器, source-connectrtask.state显示Failed!因此,尽管在 es-config.properties 文件中设置了索引,但数据库更改不会进入 ES。

我已将 jar/s 放在 kafka-dir 中,其中用于源连接器的库工作(以避免有关类路径问题的进一步问题)。

创建 elaticsearch-sink-connector 时出现此错误(当然,我没有错误,而且所有库都在同一目录中!):

ERROR Plugin class loader for connector: 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' was not found. Returning: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@5cc126dc (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)

我是 运行 我的联系人:

bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties

简而言之,我的连接器 task.state 一次只剩下 RUNNING

编辑:
plugin.path 用于连接-standablone.properties 文件:

plugin.path=/media/***/projects/playground/kafka/kafka_2.12-2.4.0, /media/***/projects/playground/kafka/kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-3.2.0-SNAPSHOT-package/share/java

它们都包含 es-connector jar.Last 一个是后来添加的,但仍然相同

我现在该怎么办?

当我将 schema.enable 更改为 false

时,它就像魅力一样
key.converter.schemas.enable=false
value.converter.schemas.enable=false

并在 plugin.path 之后添加了额外的 / 尽管没有 / 它适用于源连接器!

编辑: 我忘了说我也用 5.4.0 版本替换了我的连接器版本,正如 cricket_007 提到的

编辑 2: 我后来进行了更多调查,发现额外的 / 问题以及下面提到的新关键属性帮助我摆脱了连接器的 FAILED 状态(一次只有一个连接器是 运行 ):

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

在连接中-standalone.properties 文件

谢谢