Kafka 连接:JDBC 源连接器:创建具有多个分区的主题

Kafka Connect : JDBC Source Connector : create Topic with multiple partitions

我已经创建了一个示例管道,从 MySQL 轮询数据并写入 HDFS(配置单元 table)。

由于我的要求,我需要为每个数据库创建 Source+Connector 对 table。 下面我发布了源和接收连接器的配置设置。

我可以看到一个主题是用一个分区创建的,复制因子为 1。

主题创建应该是自动的,这意味着我不能在创建 Source+Sink 对之前手动创建主题。

我的问题:

1) 创建源连接器时是否可以配置分区数和复制因子?

2) 如果可以创建多个分区,Source Connector 使用什么样的分区策略?

3) 应该为 Source 和 Sink 连接器创建正确的 worker 数量是多少?

源连接器:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp+incrementing",
  "timestamp.column.name": "modified",
  "incrementing.column.name": "id",
  "topic.prefix": "jdbc_var_cols-",
  "tasks.max": "1",
  "poll.interval.ms": "1000",
  "query": "SELECT id,name,email,department,modified FROM test",
  "connection.url": "jdbc:mariadb://127.0.0.1:3306/connect_test?user=root&password=confluent"
}

接收器连接器:

{
  "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
  "topics.dir": "/user/datalake/topics-hive-var_cols3",
  "hadoop.conf.dir": "/tmp/quickstart/hadoop/conf",
  "flush.size": "5",
  "schema.compatibility": "BACKWARD",
  "connect.hdfs.principal": "datalake@MYREALM.LOCAL",
  "connect.hdfs.keytab": "/tmp/quickstart/datalake.keytab",
  "tasks.max": "3",
  "topics": "jdbc_var_cols-",
  "hdfs.url": "hdfs://mycluster:8020",
  "hive.database": "kafka_connect_db_var_cols3",
  "hdfs.authentication.kerberos": "true",
  "rotate.interval.ms": "1000",
  "hive.metastore.uris": "thrift://hive_server:9083",
  "hadoop.home": "/tmp/quickstart/hadoop",
  "logs.dir": "/logs",
  "format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
  "hive.integration": "true",
  "hdfs.namenode.principal": "nn/_HOST@MYREALM.LOCAL",
  "hive.conf.dir": "/tmp/quickstart/hadoop/conf"
}

1) Is there a way to configure the number of partitions and replication factor when creating the Source Connector?

不是来自 Connect,不是。

听起来好像您在代理上启用了自动创建主题,所以它使用的是默认设置。理想情况下,这应该在生产环境中被禁用,因此您必须提前创建主题。

what kind of partitioning strategy does the Source Connector use?

取决于哪个连接器以及代码的编写方式(即 if/how 它生成记录的密钥)。例如,对于 JDBC 连接器,键可能是数据库 table 的主键。它将使用 DefaultPartitioner 进行哈希处理。我不相信 Connect 允许您在每个连接器级别指定自定义分区程序。如果键为空,则消息将分发到所有分区。

3) Whats the correct number of workers should be created for Source and Sink Connectors?

同样,取决于来源。对于 JDBC,您将在每个 table 中完成一项任务。

不过,对于接收器,任务最多只能达到被接收器主题的分区数(与所有消费者组一样)。


此外,您通常会 运行 将集群与数据库(和 Hadoop 集群)分开连接