Kafka-connect 动态添加更多主题
Kafka-connect add more topics on the fly
我有一个 elasticsearch kafka-connect
connector 消耗一些主题。
使用以下配置:
{
connection.url": "https://my-es-cluster:443",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.ignore": "true",
"topics": "topic1,topic2",
...
}
我可以在运行中添加更多主题吗?
会发生什么?
如果我从列表中删除一些主题 并稍后再次添加它们 会怎样?
我想在这里添加一个新的topic3
:
{
...
"topics": "topic1,topic2,topic3",
...
}
我删除了什么topic2
?其他主题会被重新消费吗?:
{
...
"topics": "topic1,topic3",
...
}
由于您已经拥有 kafka
和 kafka-connect
运行,您可以使用 kafka-connect
的 REST API 并自行检查:https://docs.confluent.io/current/connect/references/restapi.html
如果您添加新主题 (topic3
),该主题中当前的所有消息(根据保留策略)都将被使用。
PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
...
"topics": "topic1,topic2,topic3",
...
}
检查此连接器的状态和配置:
GET http://kafka-connect:8083/connectors/my-test-connector
如果您想禁用某些主题,只需使用 PUT
更新该连接器的配置。
PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
...
"topics": "topic1,topic3",
...
}
topic1
和 topic3
没有任何变化。只是 topic2
不会再消耗了。
但是如果你想 return 它回来,来自 topic2
的消息将从最后提交的偏移量开始使用,而不是从头开始。
对于每个 consumer group
最后提交的 offset
被存储,您暂时从配置中删除主题并不重要。
对于这种情况,消费者组将为 connect-my-test-connector
.
即使你删除了连接器(DELETE http://kafka-connect:8083/connectors/my-test-connector
),然后用相同的名称重新创建它,偏移量也会被保存,并且会在你删除它时从它们的点继续消费。 (注意保留政策,通常是 7 天)。
我有一个 elasticsearch kafka-connect
connector 消耗一些主题。
使用以下配置:
{
connection.url": "https://my-es-cluster:443",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.ignore": "true",
"topics": "topic1,topic2",
...
}
我可以在运行中添加更多主题吗?
会发生什么?
如果我从列表中删除一些主题 并稍后再次添加它们 会怎样?
我想在这里添加一个新的topic3
:
{
...
"topics": "topic1,topic2,topic3",
...
}
我删除了什么topic2
?其他主题会被重新消费吗?:
{
...
"topics": "topic1,topic3",
...
}
由于您已经拥有 kafka
和 kafka-connect
运行,您可以使用 kafka-connect
的 REST API 并自行检查:https://docs.confluent.io/current/connect/references/restapi.html
如果您添加新主题 (topic3
),该主题中当前的所有消息(根据保留策略)都将被使用。
PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
...
"topics": "topic1,topic2,topic3",
...
}
检查此连接器的状态和配置:
GET http://kafka-connect:8083/connectors/my-test-connector
如果您想禁用某些主题,只需使用 PUT
更新该连接器的配置。
PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
...
"topics": "topic1,topic3",
...
}
topic1
和 topic3
没有任何变化。只是 topic2
不会再消耗了。
但是如果你想 return 它回来,来自 topic2
的消息将从最后提交的偏移量开始使用,而不是从头开始。
对于每个 consumer group
最后提交的 offset
被存储,您暂时从配置中删除主题并不重要。
对于这种情况,消费者组将为 connect-my-test-connector
.
即使你删除了连接器(DELETE http://kafka-connect:8083/connectors/my-test-connector
),然后用相同的名称重新创建它,偏移量也会被保存,并且会在你删除它时从它们的点继续消费。 (注意保留政策,通常是 7 天)。