KSQLDB - 从 debezium cdc 源连接器获取数据并使用 Table 加入 Stream
KSQLDB - Getting data from debezium cdc source connector and joining Stream with Table
伙计们。
先介绍一下场景:
我正在使用 Debezium CDC Source Connector
从 MS SQL SERVER
中的两个 table 获取数据。遵循连接器配置:
PROVIDER table 的连接器:
CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_PROVIDER WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname'= '<URL>',
'database.port'= '1433',
'database.user'= '<USER>',
'database.password'= '<PASS>',
'database.dbname'= 'a',
'database.server.name'= 'a',
'table.whitelist'='dbo.PROVIDER',
'decimal.handling.mode'='double',
'transforms'= 'unwrap,addTopicPrefix',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
'transforms.addTopicPrefix.regex'='(.*)',
'transforms.addTopicPrefix.replacement'='mssql-01-',
'database.history.kafka.bootstrap.servers'= 'kafka:29092',
'database.history.kafka.topic'= 'dbhistory.PROVIDER'
);
订单连接器table:
CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_ORDER WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname'= '<URL>',
'database.port'= '1433',
'database.user'= '<USER>',
'database.password'= '<PASS>',
'database.dbname'= 'a',
'database.server.name'= 'a',
'table.whitelist'='dbo.ORDER',
'decimal.handling.mode'='double',
'transforms'= 'unwrap,addTopicPrefix',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
'transforms.addTopicPrefix.regex'='(.*)',
'transforms.addTopicPrefix.replacement'='mssql-01-',
'database.history.kafka.bootstrap.servers'= 'kafka:29092',
'database.history.kafka.topic'= 'dbhistory.ORDER'
);
我认为它可以改进,但目前还可以。
一旦设置好连接器,我们就可以创建我们的流和 table:
CREATE TABLE PROVIDER (ID_P VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='mssql-01-a.dbo.PROVIDER', VALUE_FORMAT='AVRO');
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='mssql-01 a.dbo.ORDERS',VALUE_FORMAT='AVRO');
如您所见,现在它只是用来自 PROVIDER table 的数据丰富了 ORDERS 流,对吧?是的,但不是。
SELECT P.PROVIDER_COD, O.ID FROM ORDERS AS O JOIN PROVIDER AS P ON O.PROV = P.PROVIDER_COD EMIT CHANGES;
如果我尝试这样做,我会收到错误消息:
Cannot repartition a TABLE source. If this is a join, make sure that
the criteria uses the TABLE's key column ID_P instead of [PROVIDER_COD]
好吧,它应该很容易修复,但在这种情况下却并非如此。最后我们解决了我的问题:
Provider's id
不在 ORDERS stream
中,因为我从中获取数据的数据库就是这样设计的。
我们如何关联这两个数据集?
如果是关系型数据库就容易了:
SELECT * FROM ORDERS O INNER JOIN PROVIDER P ON O.PROV = P.PROVIDER_COD AND O.SUB_COD = P.SUB_COD;
是的...我之前没有提到它,但是我们这里有一个复合键,Provider Code
和Provider' Subsidiary Code
,我认为这是另一个问题。
拜托,任何人都可以帮助我了解如何在 KSQLDB
中解决这个问题吗?
非常感谢。
我在 Confluent 论坛上找到了解决方案。
感谢 Matthias J. Sax
伙计们。
先介绍一下场景:
我正在使用 Debezium CDC Source Connector
从 MS SQL SERVER
中的两个 table 获取数据。遵循连接器配置:
PROVIDER table 的连接器:
CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_PROVIDER WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname'= '<URL>',
'database.port'= '1433',
'database.user'= '<USER>',
'database.password'= '<PASS>',
'database.dbname'= 'a',
'database.server.name'= 'a',
'table.whitelist'='dbo.PROVIDER',
'decimal.handling.mode'='double',
'transforms'= 'unwrap,addTopicPrefix',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
'transforms.addTopicPrefix.regex'='(.*)',
'transforms.addTopicPrefix.replacement'='mssql-01-',
'database.history.kafka.bootstrap.servers'= 'kafka:29092',
'database.history.kafka.topic'= 'dbhistory.PROVIDER'
);
订单连接器table:
CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_ORDER WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname'= '<URL>',
'database.port'= '1433',
'database.user'= '<USER>',
'database.password'= '<PASS>',
'database.dbname'= 'a',
'database.server.name'= 'a',
'table.whitelist'='dbo.ORDER',
'decimal.handling.mode'='double',
'transforms'= 'unwrap,addTopicPrefix',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
'transforms.addTopicPrefix.regex'='(.*)',
'transforms.addTopicPrefix.replacement'='mssql-01-',
'database.history.kafka.bootstrap.servers'= 'kafka:29092',
'database.history.kafka.topic'= 'dbhistory.ORDER'
);
我认为它可以改进,但目前还可以。
一旦设置好连接器,我们就可以创建我们的流和 table:
CREATE TABLE PROVIDER (ID_P VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='mssql-01-a.dbo.PROVIDER', VALUE_FORMAT='AVRO');
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='mssql-01 a.dbo.ORDERS',VALUE_FORMAT='AVRO');
如您所见,现在它只是用来自 PROVIDER table 的数据丰富了 ORDERS 流,对吧?是的,但不是。
SELECT P.PROVIDER_COD, O.ID FROM ORDERS AS O JOIN PROVIDER AS P ON O.PROV = P.PROVIDER_COD EMIT CHANGES;
如果我尝试这样做,我会收到错误消息:
Cannot repartition a TABLE source. If this is a join, make sure that the criteria uses the TABLE's key column ID_P instead of [PROVIDER_COD]
好吧,它应该很容易修复,但在这种情况下却并非如此。最后我们解决了我的问题:
Provider's id
不在 ORDERS stream
中,因为我从中获取数据的数据库就是这样设计的。
我们如何关联这两个数据集?
如果是关系型数据库就容易了:
SELECT * FROM ORDERS O INNER JOIN PROVIDER P ON O.PROV = P.PROVIDER_COD AND O.SUB_COD = P.SUB_COD;
是的...我之前没有提到它,但是我们这里有一个复合键,Provider Code
和Provider' Subsidiary Code
,我认为这是另一个问题。
拜托,任何人都可以帮助我了解如何在 KSQLDB
中解决这个问题吗?
非常感谢。
我在 Confluent 论坛上找到了解决方案。
感谢 Matthias J. Sax