Kafka 连接器始终作为单个任务工作
Kafka Connector always works as a single task
我正在尝试使用 kafka 将数据从 oracleDB 传输到 mongoDB。
所以我像上图这样配置了kafka集群。
我知道调整分区和 tasks.max 允许并行处理。
但是,当我 运行 连接器时,它总是 运行 作为单个任务,不能并行处理。
我需要做任何其他设置吗?
这是我配置的。
- 话题创作
bin/kafka-topics.sh --create --bootstrap-server
127.0.0.1:9092,127.0.0.2:9092,127.0.0.1:9093 --partitions 3 --topic topicA
连接器配置
{
"name": "rawsumc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@127.0.0.1:1521/orcl",
"connection.user": "test",
"connection.password": "test",
"topic.prefix": "topicA",
"mode": "bulk",
"poll.interval.ms": "360000000",
"numeric.mapping": "best_fit",
"tasks.max": "10",
"connection.type": "lz4",
"query": "select CAST(NO_TT AS NUMBER(10,0)) AS NO_TT,CAST(NO_SEQ AS NUMBER(10,0)) AS NO_SEQ,DNT_CLCT from table_a",
"name": "rawsumc-source"
},
"tasks": [
{
"connector": "rawsumc-source",
"task": 0
}
],
"type": "source"}
根据docs:
tasks.max
- The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
使用 JdbcSourceConnector 的自定义查询limits你完成一个任务。
我也对这个 connetor 任务感到困惑。
一开始以为kafka connect解决了多客户端数据重复的问题。但实际上kafka connect选择了其他方式来避免这个问题。我跟踪了源代码并找到了它的实际实现。
(我现在无法post图片,您可以点击link查看图片。)
首先是 taskConfigs 接口:
taskConfigs interface
/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
*
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
public abstract List<Map<String, String>> taskConfigs(int maxTasks);
然后是对这个接口的调用:
call for this interface
org.apache.kafka.connect.runtime.distributed.DistributedHerder:
final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
boolean changed = false;
int currentNumTasks = configState.taskCount(connName);
if (taskProps.size() != currentNumTasks) {
log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
changed = true;
}
kafka connect如何使用task props:
how kafka connect use task props
if (changed) {
List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (isLeader()) {
configBackingStore.putTaskConfigs(connName, rawTaskProps);
cb.onCompletion(null, null);
}
这意味着 kafka connect 使用任务配置来创建内部线程。
所以有多少任务 运行 由 taskConfigs 方法上的连接器实现决定。
让我们看看在 mqtt 源连接器上的实现:
implementation on mqtt source connector
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> result = new ArrayList<>();
int taskId = 0;
for (List<String> mqttTopics : (Iterable<List<String>>)ConnectorUtils.groupPartitions(this.config.mqttTopics, maxTasks)) {
if (mqttTopics.isEmpty())
continue;
Map<String, String> settings = new LinkedHashMap<>(this.settings);
settings.put("mqtt.topics", Joiner.on(',').join(mqttTopics));
settings.put("task.id", Integer.toString(taskId++));
result.add(settings);
}
return result;
}
此任务按 mqtt 主题分组。
实际上我只是声明了一个主题过滤器,所以我总是得到一个 mqtt 源任务。有多少个任务 运行 由连接器上的 taskConfigs 方法实现决定,输入是来自连接器的最大任务配置..
您可以在您的连接器上阅读此方法的源代码实现。
我正在尝试使用 kafka 将数据从 oracleDB 传输到 mongoDB。 所以我像上图这样配置了kafka集群。 我知道调整分区和 tasks.max 允许并行处理。 但是,当我 运行 连接器时,它总是 运行 作为单个任务,不能并行处理。 我需要做任何其他设置吗?
这是我配置的。
- 话题创作
bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092,127.0.0.2:9092,127.0.0.1:9093 --partitions 3 --topic topicA
连接器配置
{ "name": "rawsumc-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:oracle:thin:@127.0.0.1:1521/orcl", "connection.user": "test", "connection.password": "test", "topic.prefix": "topicA", "mode": "bulk", "poll.interval.ms": "360000000", "numeric.mapping": "best_fit", "tasks.max": "10", "connection.type": "lz4", "query": "select CAST(NO_TT AS NUMBER(10,0)) AS NO_TT,CAST(NO_SEQ AS NUMBER(10,0)) AS NO_SEQ,DNT_CLCT from table_a", "name": "rawsumc-source" }, "tasks": [ { "connector": "rawsumc-source", "task": 0 } ], "type": "source"}
根据docs:
tasks.max
- The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
使用 JdbcSourceConnector 的自定义查询limits你完成一个任务。
我也对这个 connetor 任务感到困惑。 一开始以为kafka connect解决了多客户端数据重复的问题。但实际上kafka connect选择了其他方式来避免这个问题。我跟踪了源代码并找到了它的实际实现。
(我现在无法post图片,您可以点击link查看图片。) 首先是 taskConfigs 接口: taskConfigs interface
/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
*
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
public abstract List<Map<String, String>> taskConfigs(int maxTasks);
然后是对这个接口的调用: call for this interface
org.apache.kafka.connect.runtime.distributed.DistributedHerder:
final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
boolean changed = false;
int currentNumTasks = configState.taskCount(connName);
if (taskProps.size() != currentNumTasks) {
log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
changed = true;
}
kafka connect如何使用task props: how kafka connect use task props
if (changed) {
List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (isLeader()) {
configBackingStore.putTaskConfigs(connName, rawTaskProps);
cb.onCompletion(null, null);
}
这意味着 kafka connect 使用任务配置来创建内部线程。 所以有多少任务 运行 由 taskConfigs 方法上的连接器实现决定。
让我们看看在 mqtt 源连接器上的实现: implementation on mqtt source connector
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> result = new ArrayList<>();
int taskId = 0;
for (List<String> mqttTopics : (Iterable<List<String>>)ConnectorUtils.groupPartitions(this.config.mqttTopics, maxTasks)) {
if (mqttTopics.isEmpty())
continue;
Map<String, String> settings = new LinkedHashMap<>(this.settings);
settings.put("mqtt.topics", Joiner.on(',').join(mqttTopics));
settings.put("task.id", Integer.toString(taskId++));
result.add(settings);
}
return result;
} 此任务按 mqtt 主题分组。 实际上我只是声明了一个主题过滤器,所以我总是得到一个 mqtt 源任务。有多少个任务 运行 由连接器上的 taskConfigs 方法实现决定,输入是来自连接器的最大任务配置.. 您可以在您的连接器上阅读此方法的源代码实现。