Kafka Mysql CDC 到 Elastic search
Kafka Mysql CDC to Elastic search
我试图通过 source-connector
将我的 MySql
数据库更改外包给 kafka 主题,这很有效。现在我想将这些数据发送到弹性搜索实例。
为此,我正在关注此 Kafka Connect Elasticsearch: Consuming and Indexing with Kafka Connect and this one Kafka Connect and Elasticsearch。
对于 mysql 到 kafka 的 CDC,我可以看到我在 mysql 中所做的更改并读取它创建一个源连接器,但是当我创建另一个连接器时 elasticsearch-sink
连接器, source-connectr
task.state显示Failed
!因此,尽管在 es-config.properties 文件中设置了索引,但数据库更改不会进入 ES。
我已将 jar/s 放在 kafka-dir 中,其中用于源连接器的库工作(以避免有关类路径问题的进一步问题)。
创建 elaticsearch-sink-connector
时出现此错误(当然,我没有错误,而且所有库都在同一目录中!):
ERROR Plugin class loader for connector:
'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' was
not found. Returning:
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@5cc126dc
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
我是 运行 我的联系人:
bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties
简而言之,我的连接器 task.state 一次只剩下 RUNNING
编辑:
plugin.path
用于连接-standablone.properties 文件:
plugin.path=/media/***/projects/playground/kafka/kafka_2.12-2.4.0, /media/***/projects/playground/kafka/kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-3.2.0-SNAPSHOT-package/share/java
它们都包含 es-connector jar.Last 一个是后来添加的,但仍然相同
我现在该怎么办?
当我将 schema.enable 更改为 false
时,它就像魅力一样
key.converter.schemas.enable=false
value.converter.schemas.enable=false
并在 plugin.path
之后添加了额外的 /
尽管没有 /
它适用于源连接器!
编辑: 我忘了说我也用 5.4.0 版本替换了我的连接器版本,正如 cricket_007 提到的
编辑 2:
我后来进行了更多调查,发现额外的 /
问题以及下面提到的新关键属性帮助我摆脱了连接器的 FAILED
状态(一次只有一个连接器是 运行 ):
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
在连接中-standalone.properties 文件
谢谢
我试图通过 source-connector
将我的 MySql
数据库更改外包给 kafka 主题,这很有效。现在我想将这些数据发送到弹性搜索实例。
为此,我正在关注此 Kafka Connect Elasticsearch: Consuming and Indexing with Kafka Connect and this one Kafka Connect and Elasticsearch。
对于 mysql 到 kafka 的 CDC,我可以看到我在 mysql 中所做的更改并读取它创建一个源连接器,但是当我创建另一个连接器时 elasticsearch-sink
连接器, source-connectr
task.state显示Failed
!因此,尽管在 es-config.properties 文件中设置了索引,但数据库更改不会进入 ES。
我已将 jar/s 放在 kafka-dir 中,其中用于源连接器的库工作(以避免有关类路径问题的进一步问题)。
创建 elaticsearch-sink-connector
时出现此错误(当然,我没有错误,而且所有库都在同一目录中!):
ERROR Plugin class loader for connector: 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' was not found. Returning: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@5cc126dc (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
我是 运行 我的联系人:
bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties
简而言之,我的连接器 task.state 一次只剩下 RUNNING
编辑:plugin.path
用于连接-standablone.properties 文件:
plugin.path=/media/***/projects/playground/kafka/kafka_2.12-2.4.0, /media/***/projects/playground/kafka/kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-3.2.0-SNAPSHOT-package/share/java
它们都包含 es-connector jar.Last 一个是后来添加的,但仍然相同
我现在该怎么办?
当我将 schema.enable 更改为 false
时,它就像魅力一样key.converter.schemas.enable=false
value.converter.schemas.enable=false
并在 plugin.path
之后添加了额外的 /
尽管没有 /
它适用于源连接器!
编辑: 我忘了说我也用 5.4.0 版本替换了我的连接器版本,正如 cricket_007 提到的
编辑 2:
我后来进行了更多调查,发现额外的 /
问题以及下面提到的新关键属性帮助我摆脱了连接器的 FAILED
状态(一次只有一个连接器是 运行 ):
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
在连接中-standalone.properties 文件
谢谢