Kafka Connect JDBC 连接器查询 + 初始轮询时大数据集的递增模式阻塞
Kafka Connect JDBC connector query + incrementing mode chokes with large datasets at initial poll
我正在使用 JDBC 连接器将数据从 MySQL 移动到 Kafka。我感兴趣的数据来自 select 连接 3 个表,因此我用 mode:incrementing
和 query
:
配置了我的连接器
{
"name": "stats",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
"mode": "incrementing",
"validate.non.null": "false",
"topic.prefix": "t",
"incrementing.column.name": "s.id",
"transforms": "createKey,extractString",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "uuid",
"transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractString.field": "uuid",
"quote.sql.identifiers":"never",
"query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"batch.max.rows": "100",
"poll.interval.ms": "60000"
}
}
检查连接器状态时,我得到的是 运行:
curl http://conncet:8083/connectors/stats/status
{
"name": "stats",
"connector": {
"state": "RUNNING",
"worker_id": "connect-3:38083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-1:18083"
}
],
"type": "source"
}
但是一个小时后,我仍然没有看到创建的主题。我检查了 MySQL 哪些查询是 运行 show full processlist;
并且我看到两个这样的查询:
select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC
所以查询基本上与我在连接器配置中 query
中提供的查询加上 WHERE s.id > -1 ORDER BY s.id ASC
相同,因为此连接中的查询产生了 21 百万行的结果集 MySQL 正在发送数据很长时间。当我再次检查 show full processlist;
时,我现在看到 4 个这样的查询,然后是 8 个,然后是 16 个,依此类推。
问题是:
- 为什么 Kafka connect 在添加时尝试一次获取 ALL 行:
s.id > -1 ORDER BY s.id ASC
。
- 是否可以将连接器配置为不执行此操作,而是获取较小的数量?
"batch.max.rows": "100"
是否仅在初始轮询后控制批量大小??
更新:
此 issue 有一个开放主题。我觉得这个问题可以关闭了。
JDBC 源连接器 incrementing
mode
并通过 query
,
使用以下 where 子句执行该查询:WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC
。
(如果你使用增量模式和查询,你不能在那里传递 where
子句)。
一开始poll lastIncrementedValue
是-1,所以尝试查询所有记录。提取每条记录后 lastIncrementedValue 增加,所以下次查询只会轮询新数据。
batch.max.rows
指的是有多少条记录 SourceTask::poll(...)
将 return 到 Kafka Connect 框架。
这是一次发送到 Kafka 的批处理的最大大小。
我认为,当您从单个 table 获取数据时,它工作得更快,因为查询执行得更快(更简单)。
如果您使用其他 SQL 工具执行这些查询,它会执行类似的操作。
query.suffix 是在 5.5 中添加的。我用它来添加限制语句并且效果很好,它只是将限制附加到查询的末尾。
我正在使用 JDBC 连接器将数据从 MySQL 移动到 Kafka。我感兴趣的数据来自 select 连接 3 个表,因此我用 mode:incrementing
和 query
:
{
"name": "stats",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
"mode": "incrementing",
"validate.non.null": "false",
"topic.prefix": "t",
"incrementing.column.name": "s.id",
"transforms": "createKey,extractString",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "uuid",
"transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractString.field": "uuid",
"quote.sql.identifiers":"never",
"query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"batch.max.rows": "100",
"poll.interval.ms": "60000"
}
}
检查连接器状态时,我得到的是 运行:
curl http://conncet:8083/connectors/stats/status
{
"name": "stats",
"connector": {
"state": "RUNNING",
"worker_id": "connect-3:38083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-1:18083"
}
],
"type": "source"
}
但是一个小时后,我仍然没有看到创建的主题。我检查了 MySQL 哪些查询是 运行 show full processlist;
并且我看到两个这样的查询:
select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC
所以查询基本上与我在连接器配置中 query
中提供的查询加上 WHERE s.id > -1 ORDER BY s.id ASC
相同,因为此连接中的查询产生了 21 百万行的结果集 MySQL 正在发送数据很长时间。当我再次检查 show full processlist;
时,我现在看到 4 个这样的查询,然后是 8 个,然后是 16 个,依此类推。
问题是:
- 为什么 Kafka connect 在添加时尝试一次获取 ALL 行:
s.id > -1 ORDER BY s.id ASC
。 - 是否可以将连接器配置为不执行此操作,而是获取较小的数量?
"batch.max.rows": "100"
是否仅在初始轮询后控制批量大小??
更新:
此 issue 有一个开放主题。我觉得这个问题可以关闭了。
JDBC 源连接器 incrementing
mode
并通过 query
,
使用以下 where 子句执行该查询:WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC
。
(如果你使用增量模式和查询,你不能在那里传递 where
子句)。
一开始poll lastIncrementedValue
是-1,所以尝试查询所有记录。提取每条记录后 lastIncrementedValue 增加,所以下次查询只会轮询新数据。
batch.max.rows
指的是有多少条记录 SourceTask::poll(...)
将 return 到 Kafka Connect 框架。
这是一次发送到 Kafka 的批处理的最大大小。
我认为,当您从单个 table 获取数据时,它工作得更快,因为查询执行得更快(更简单)。 如果您使用其他 SQL 工具执行这些查询,它会执行类似的操作。
query.suffix 是在 5.5 中添加的。我用它来添加限制语句并且效果很好,它只是将限制附加到查询的末尾。