使用 upsert-kafka 格式读取用 Kafka 编写的消息时的不同结果

Different results when reading messages written in Kafka with upsert-kafka format

我正在使用以下三个测试用例来测试 upsert-kafka

的行为
  1. 将聚合结果以upsert-kafka格式写入kafka(TestCase1)
  2. 使用fink table result print输出消息。(TestCase2)
  3. 使用 consume-console.sh 工具直接消费 Kafka 消息。(TestCase3)

我发现在使用fink table result print时,会打印出-U+U两条消息,表示一条被删除,一条被插入,而对于consume-console, 直接正确打印结果

我会问为什么 fink table result print 的行为与我所观察到的一样

-U+U(删除消息和插入消息)是从哪里来的,它们是作为两条消息保存在Kafka中的吗?我认为答案是否定的,因为我没有看到这些立竿见影的效果。 使用 consumer-console.

package org.example.official.sql

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.example.model.Stock
import org.example.sources.StockSource
import org.scalatest.funsuite.AnyFunSuite

class UpsertKafkaTest extends AnyFunSuite {
  val topic = "test-UpsertKafkaTest-1"
  //Test Case 1
  test("write to upsert kafka: upsert-kafka as sink") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val ds: DataStream[Stock] = env.addSource(new StockSource(emitInterval = 1500, print = false))
    ds.print()
    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("sourceTable", ds)
    val ddl =
      s"""
              CREATE TABLE sinkTable (
                id STRING,
                total_price DOUBLE,
                PRIMARY KEY (id) NOT ENFORCED
              ) WITH (
                'connector' = 'upsert-kafka',
                'topic' = '$topic',
                'properties.bootstrap.servers' = 'localhost:9092',
                'key.format' = 'json',
                'value.format' = 'json'
              )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    tenv.executeSql(
      """
      insert into sinkTable
      select id, sum(price)
      from sourceTable
      group by id

     """.stripMargin(' '))

    env.execute()
  }

//Test Case 2  
  test("read from upsert kafka: upsert-kafka as source 2") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val tenv = StreamTableEnvironment.create(env)
    val ddl =
      s"""
              CREATE TABLE sourceTable (
                id STRING,
                total_price DOUBLE,
                PRIMARY KEY (`id`) NOT ENFORCED
              ) WITH (
                'connector' = 'upsert-kafka',
                'topic' = '$topic',
                'properties.bootstrap.servers' = 'localhost:9092',
                'properties.group.id' = 'testGroup001',
                'key.format' = 'json',
                'key.json.ignore-parse-errors' = 'true',

                'value.format' = 'json',
                'value.json.fail-on-missing-field' = 'false',
                'value.fields-include' = 'EXCEPT_KEY'
              )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    val result = tenv.executeSql(
      """
              select *  from sourceTable

     """.stripMargin(' '))

    result.print()

    /*
    +----+--------------------------------+--------------------------------+
  | op |                             id |                    total_price |
  +----+--------------------------------+--------------------------------+
  | +I |                            id1 |                            1.0 |
  | -U |                            id1 |                            1.0 |
  | +U |                            id1 |                            3.0 |
  | -U |                            id1 |                            3.0 |
  | +U |                            id1 |                            6.0 |
  | -U |                            id1 |                            6.0 |
  | +U |                            id1 |                           10.0 |
  | -U |                            id1 |                           10.0 |
  | +U |                            id1 |                           15.0 |
  | -U |                            id1 |                           15.0 |
  | +U |                            id1 |                           21.0 |
  | -U |                            id1 |                           21.0 |
  | +U |                            id1 |                           28.0 |
  | -U |                            id1 |                           28.0 |
  | +U |                            id1 |                           36.0 |
  | -U |                            id1 |                           36.0 |
  | +U |                            id1 |                           45.0 |
     */


  }
  
//Test Case 3  
test("read from upsert kafka with consumer console") {


    /*
      kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-UpsertKafkaTest-1 --from-beginning
      {"id":"id1","total_price":1.0}
      {"id":"id1","total_price":3.0}
      {"id":"id1","total_price":6.0}
      {"id":"id1","total_price":10.0}
      {"id":"id1","total_price":15.0}
      {"id":"id1","total_price":21.0}
      {"id":"id1","total_price":28.0}
      {"id":"id1","total_price":36.0}
      {"id":"id1","total_price":45.0}
     */
  }

 


}


对于 Flink SQL,我们谈到了 table 和流之间的二元性——流可以被认为是(动态的)table,反之亦然。 streams/tables有两种类型:追加和更新。一个append流对应一个只执行INSERT操作的动态table;没有任何内容被删除或更新。一个更新流对应一个动态的table,其中的行可以被更新和删除。

您的来源 table 是一个 upsert-kafka table,因此是一个更新 table(不是附加 table)。 upsert-kafka 源对应于压缩主题,当压缩发生时,会导致 updates/retractions 各种键的现有值随时间更新。

当更新 table 转换为流时,有两种可能的结果:您要么得到更新插入流,要么得到撤回流。有些接收器支持这些类型的更新流中的一种或另一种,有些则同时支持这两种类型。

您看到的是 upsert-kafka sink 可以处理 upsert,而 print sink 不能。因此,相同的更新 table 作为更新插入(可能还有删除)事件流被提供给 Kafka,并且它作为流被发送到标准输出,每个键都有一个初始插入(+I),然后是 update_before/update_after 对编码为 -U +U 每次更新(和删除,如果发生的话)。