Kafka Streams KTable 外键连接未按预期工作
Kafka Streams KTable foreign key join not working as expected
我正在尝试在 Kafka Streams 中加入一个简单的外键连接,类似于许多文章(比如这篇文章:https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/)。
当我尝试用 account_balance
table 中的外键 user_id
加入用户 id
(用户 table 的主键)时生成一个 AccountRecord
对象,我收到以下错误:
[-StreamThread-1] ignJoinSubscriptionSendProcessorSupplier : Skipping record due to null foreign key.
最终目标是在每次 table 中的任何字段更新时将 AccountRecord
传送到一个主题。问题是,当我简单地分别打印用户 table 和帐户 table 时,外键和所有字段都被完全填充了。我看不出有什么问题或为什么会发生此错误。这是我的代码片段:
public void start_test(){
StreamsBuilder builder = new StreamsBuilder();
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.User()));
KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.AccountBalance()));
final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance());
);
// print the table
accountRecordTable
.toStream()
.print(Printed.toSysOut());
KafkaStreams stream = new KafkaStreams(builder.build(), properties);
stream.start();
}
任何指导都会有所帮助。我没有包含自定义 serde 代码或对象形状,但它们非常简单。如果您需要进一步说明,请告诉我。
谢谢
您的消息是否包含密钥记录? KTable 是 changelog 流的抽象,其中每条数据记录代表一个更新,通过 key 知道更新的方式,对于当前与 KTables 一起工作的记录的 key 非常重要。
例如
AccountBalance<Key=10,Value={accountBalanceId=10,userId=777,balance=10}>
User<Key=777, Value={firstName="Panchito"}>
另一个观察结果是您的 Serde 密钥,如果您将 Long 定义为密钥,为什么还要使用自定义 serde?
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.User()));
KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.AccountBalance()))
也许您的密钥解串器将密钥作为空值发送。检查自定义 Serde 的输出登录输出。
此外,您还必须改进加入物化的连接方法,因为您正在创建一个新对象,而 Kafka 不知道如何处理新对象。
final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance()),
Materialized.with(Serdes.Long(), CustomSerdes.AccountBalanceSerde() )
);
尝试使用 JsonSerde 或 Avro 来创建您的自定义 Serdes。
我正在尝试在 Kafka Streams 中加入一个简单的外键连接,类似于许多文章(比如这篇文章:https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/)。
当我尝试用 account_balance
table 中的外键 user_id
加入用户 id
(用户 table 的主键)时生成一个 AccountRecord
对象,我收到以下错误:
[-StreamThread-1] ignJoinSubscriptionSendProcessorSupplier : Skipping record due to null foreign key.
最终目标是在每次 table 中的任何字段更新时将 AccountRecord
传送到一个主题。问题是,当我简单地分别打印用户 table 和帐户 table 时,外键和所有字段都被完全填充了。我看不出有什么问题或为什么会发生此错误。这是我的代码片段:
public void start_test(){
StreamsBuilder builder = new StreamsBuilder();
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.User()));
KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.AccountBalance()));
final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance());
);
// print the table
accountRecordTable
.toStream()
.print(Printed.toSysOut());
KafkaStreams stream = new KafkaStreams(builder.build(), properties);
stream.start();
}
任何指导都会有所帮助。我没有包含自定义 serde 代码或对象形状,但它们非常简单。如果您需要进一步说明,请告诉我。
谢谢
您的消息是否包含密钥记录? KTable 是 changelog 流的抽象,其中每条数据记录代表一个更新,通过 key 知道更新的方式,对于当前与 KTables 一起工作的记录的 key 非常重要。 例如
AccountBalance<Key=10,Value={accountBalanceId=10,userId=777,balance=10}>
User<Key=777, Value={firstName="Panchito"}>
另一个观察结果是您的 Serde 密钥,如果您将 Long 定义为密钥,为什么还要使用自定义 serde?
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.User()));
KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.AccountBalance()))
也许您的密钥解串器将密钥作为空值发送。检查自定义 Serde 的输出登录输出。 此外,您还必须改进加入物化的连接方法,因为您正在创建一个新对象,而 Kafka 不知道如何处理新对象。
final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance()),
Materialized.with(Serdes.Long(), CustomSerdes.AccountBalanceSerde() )
);
尝试使用 JsonSerde 或 Avro 来创建您的自定义 Serdes。