"The $changeStream stage is only supported on replica sets" 使用 mongodb-source-connect 时出错

"The $changeStream stage is only supported on replica sets" error while using mongodb-source-connect

当 运行ning kafka-mongodb-source-connect 时出现错误 我正在尝试 运行 connect-standaloneconnect-avro-standalone.propertiesMongoSourceConnector.properties 以便连接将写入 MongoDB 的数据写入 Kafka主题。

这就是我想做的

bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties share/confluent-hub-components/mongodb-kafka-connect-mongodb/etc/MongoSourceConnector.properties

connect-avro-standalone.properties

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=share/java,/Users/anton/Downloads/confluent-5.3.2/share/confluent-hub-components

MongoSourceConnecor.properties

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=test
collection=test

这是错误:

[2020-01-02 18:55:11,546] ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
com.mongodb.MongoCommandException: Command failed with error 40573 (Location40573): 'The $changeStream stage is only supported on replica sets' on server localhost:27017. The full response is {"ok": 0.0, "errmsg": "The $changeStream stage is only supported on replica sets", "code": 40573, "codeName": "Location40573"}

stage is only supported on replica sets

您需要 make your Mongo database a replica set 才能阅读操作日志

https://dba.stackexchange.com/questions/243780/converting-mongodb-instance-from-standalone-to-replica-set-and-backing-up

MongoDB 更改流选项仅在副本集设置中可用。但是,您可以按照以下步骤将独立安装更新为单节点副本集。

  1. 找到 mongodb.conf 文件并添加副本集详细信息

将以下副本集详细信息添加到 mongodb.conf 文件

replication:
  replSetName: "<replica-set name>"

例子

replication: replSetName: "rs0"

注意:安装在 brew 中的位置 MongoDB /usr/local/etc/mongod.conf

  1. 使用rs.initiate()
  2. 启动副本集

登录 MongoDB shell 和 运行 命令 rs.initiate() 这将启动您的副本集。成功启动后的日志如下所示

> rs.initiate()
{
    "info2" : "no configuration specified. Using a default configuration for the set",
    "me" : "127.0.0.1:27017",
    "ok" : 1,
    "$clusterTime" : {
        "clusterTime" : Timestamp(1577545731, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    },
    "operationTime" : Timestamp(1577545731, 1)
}

这两个简单的步骤就是您 运行仅具有一个节点的 MongoDB 副本集的全部内容。

参考:https://onecompiler.com/posts/3vchuyxuh/enabling-replica-set-in-mongodb-with-just-one-node

这对我的情况有帮助 (macOS env),参见 more:

我。安装零配置 MongoDB 运行器。启动一个没有非节点依赖的副本集,甚至 MongoDB.

npm install -g run-rs   // OR yarn global add run-rs

二.使用连接字符串

mongodb://localhost:27017,localhost:27018,localhost:27019/YOUR_DB_NAME?replicaSet=rs&retryWrites=false