KSQL table 获取旧值和新值

KSQL table get old and new value

在 KSQL 中是否可以从 table 中流出旧值和新值?我们想使用 table 作为值的存储,当一个更改流出一个 "reversal" 值,它是前一个值,以某种方式标记,以及新值,这样我们就可以处理下游系统中的增量?

Kafka table一般用于存储最新的值。因此,例如,假设 table 中存在键为“123”的流,并且主题上出现具有相同键“123”但不同列值的新流,这将覆盖(更新)table 中的现有值.

所以在 Table 上做这件事可能不是一个好主意。

我不清楚你的用例,我的建议仍然是你需要在流源中有一些机制或使用时间戳来处理增量提要。

是的,这是可能的。确实需要一些杂耍。

创建 table 以保持最后状态

create table v1_mux_connection_ping_ta
as 
select 
  assetid, 
  LATEST_BY_OFFSET(pingable) pingable
from v1_mux_connection_ping_st_parse 
group by assetid;

问题是它也没有发出变化。一种解决方案是将 table 转换为流。

CREATE STREAM v1_mux_connection_ping_ta_s 
  (assetId VARCHAR KEY, pingable VARCHAR) 
WITH (kafka_topic='V1_MUX_CONNECTION_PING_TA', value_format='JSON');

只得到改变的值

create table d_opt_details as 
select 
  s.assetId,
  LATEST_BY_OFFSET(s.pingable) new,
  LATEST_BY_OFFSET(s.pingable, 2)[1] old
from v1_mux_connection_ping_ta_s s
group by
  s.assetId;

create table opt_details as 
select 
  s.assetId, s.new as pingable
from d_opt_details s
where new != old;