mongo kafka 连接源

mongo kafka connect source

我正在使用 kafka connect 从 mongo 读取数据并将它们写入 kafka 主题。

我正在使用 mongo kafka 源连接器。

我收到以下错误:

ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:115)
java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
    at com.mongodb.kafka.connect.source.MongoSourceConfig.createConfigDef(MongoSourceConfig.java:209)
    at com.mongodb.kafka.connect.source.MongoSourceConfig.<clinit>(MongoSourceConfig.java:138)
    at com.mongodb.kafka.connect.MongoSourceConnector.config(MongoSourceConnector.java:56)
    at org.apache.kafka.connect.connector.Connector.validate(Connector.java:129)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:282)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:109)
Caused by: java.lang.ClassNotFoundException: com.mongodb.ConnectionString
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    ... 7 more

罐子里好像有aming clas。为了得到罐子,我使用了两种不同的方法,但我得到了同样的错误。首先,我使用了 download fro: the maven repository,然后我从 github repo 克隆了源代码,然后我自己构建了 jar。我把罐子推到了 plugins.path。 当我解压缩生成的 jar 并通过课程时,我找不到提到的 class: com.mongodb.ConnectionString

我使用了以下配置文件

worker.properties :

 rest.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/plugins

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=127.0.0.1:9092

mongo-connector.properties:

name=mongo
tasks.max=1
connector.class =com.mongodb.kafka.connect.MongoSourceConnector
database=
collection=alerts
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter

topic.prefix=someprefix
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup

然后我通过以下命令启动连接器:

/usr/local/kafka/bin/connect-standalone.sh worker.properties mongo-connector.properties 

知道如何解决这个问题

您必须将连接器的 JAR 文件放在 plugin.path 下,在您的情况下是 /usr/share/java/plugins

说明已存在于 Confluent's documentation 中:

A Kafka Connect plugin is:

an uber JAR containing all of the classfiles for the plugin and its third-party dependencies in a single JAR file; or a directory on the file system that contains the JAR files for the plugin and its third-party dependencies. However, a plugin should never contain any libraries that are provided by Kafka Connect’s runtime.

Kafka Connect finds the plugins using its plugin path, which is a comma-separated list of directories defined in the Kafka Connect’s worker configuration. To install a plugin, place the plugin directory or uber JAR (or a symbolic link that resolves to one of those) in a directory listed on the plugin path, or update the plugin path to include the absolute path of the directory containing the plugin.

我创建这个答案是因为我花了一些时间来找出解决方案,正如 scalacode 所指出的,最简单的解决方案是从 confluent 下载 jar,而不是从 maven。

https://www.confluent.io/hub/mongodb/kafka-connect-mongodb