使用 upsert-kafka 格式读取用 Kafka 编写的消息时的不同结果
Different results when reading messages written in Kafka with upsert-kafka format
我正在使用以下三个测试用例来测试 upsert-kafka
的行为
- 将聚合结果以
upsert-kafka
格式写入kafka(TestCase1)
- 使用
fink table result print
输出消息。(TestCase2)
- 使用 consume-console.sh 工具直接消费 Kafka 消息。(TestCase3)
我发现在使用fink table result print
时,会打印出-U
和+U
两条消息,表示一条被删除,一条被插入,而对于consume-console
, 直接正确打印结果
我会问为什么 fink table result print
的行为与我所观察到的一样
-U
和+U
(删除消息和插入消息)是从哪里来的,它们是作为两条消息保存在Kafka中的吗?我认为答案是否定的,因为我没有看到这些立竿见影的效果。
使用 consumer-console
.
时
package org.example.official.sql
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.example.model.Stock
import org.example.sources.StockSource
import org.scalatest.funsuite.AnyFunSuite
class UpsertKafkaTest extends AnyFunSuite {
val topic = "test-UpsertKafkaTest-1"
//Test Case 1
test("write to upsert kafka: upsert-kafka as sink") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val ds: DataStream[Stock] = env.addSource(new StockSource(emitInterval = 1500, print = false))
ds.print()
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceTable", ds)
val ddl =
s"""
CREATE TABLE sinkTable (
id STRING,
total_price DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '$topic',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
tenv.executeSql(
"""
insert into sinkTable
select id, sum(price)
from sourceTable
group by id
""".stripMargin(' '))
env.execute()
}
//Test Case 2
test("read from upsert kafka: upsert-kafka as source 2") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
s"""
CREATE TABLE sourceTable (
id STRING,
total_price DOUBLE,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '$topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup001',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
val result = tenv.executeSql(
"""
select * from sourceTable
""".stripMargin(' '))
result.print()
/*
+----+--------------------------------+--------------------------------+
| op | id | total_price |
+----+--------------------------------+--------------------------------+
| +I | id1 | 1.0 |
| -U | id1 | 1.0 |
| +U | id1 | 3.0 |
| -U | id1 | 3.0 |
| +U | id1 | 6.0 |
| -U | id1 | 6.0 |
| +U | id1 | 10.0 |
| -U | id1 | 10.0 |
| +U | id1 | 15.0 |
| -U | id1 | 15.0 |
| +U | id1 | 21.0 |
| -U | id1 | 21.0 |
| +U | id1 | 28.0 |
| -U | id1 | 28.0 |
| +U | id1 | 36.0 |
| -U | id1 | 36.0 |
| +U | id1 | 45.0 |
*/
}
//Test Case 3
test("read from upsert kafka with consumer console") {
/*
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-UpsertKafkaTest-1 --from-beginning
{"id":"id1","total_price":1.0}
{"id":"id1","total_price":3.0}
{"id":"id1","total_price":6.0}
{"id":"id1","total_price":10.0}
{"id":"id1","total_price":15.0}
{"id":"id1","total_price":21.0}
{"id":"id1","total_price":28.0}
{"id":"id1","total_price":36.0}
{"id":"id1","total_price":45.0}
*/
}
}
对于 Flink SQL,我们谈到了 table 和流之间的二元性——流可以被认为是(动态的)table,反之亦然。 streams/tables有两种类型:追加和更新。一个append流对应一个只执行INSERT操作的动态table;没有任何内容被删除或更新。一个更新流对应一个动态的table,其中的行可以被更新和删除。
您的来源 table 是一个 upsert-kafka table,因此是一个更新 table(不是附加 table)。 upsert-kafka 源对应于压缩主题,当压缩发生时,会导致 updates/retractions 各种键的现有值随时间更新。
当更新 table 转换为流时,有两种可能的结果:您要么得到更新插入流,要么得到撤回流。有些接收器支持这些类型的更新流中的一种或另一种,有些则同时支持这两种类型。
您看到的是 upsert-kafka sink 可以处理 upsert,而 print sink 不能。因此,相同的更新 table 作为更新插入(可能还有删除)事件流被提供给 Kafka,并且它作为流被发送到标准输出,每个键都有一个初始插入(+I),然后是 update_before/update_after 对编码为 -U +U 每次更新(和删除,如果发生的话)。
我正在使用以下三个测试用例来测试 upsert-kafka
- 将聚合结果以
upsert-kafka
格式写入kafka(TestCase1) - 使用
fink table result print
输出消息。(TestCase2) - 使用 consume-console.sh 工具直接消费 Kafka 消息。(TestCase3)
我发现在使用fink table result print
时,会打印出-U
和+U
两条消息,表示一条被删除,一条被插入,而对于consume-console
, 直接正确打印结果
我会问为什么 fink table result print
的行为与我所观察到的一样
-U
和+U
(删除消息和插入消息)是从哪里来的,它们是作为两条消息保存在Kafka中的吗?我认为答案是否定的,因为我没有看到这些立竿见影的效果。
使用 consumer-console
.
package org.example.official.sql
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.example.model.Stock
import org.example.sources.StockSource
import org.scalatest.funsuite.AnyFunSuite
class UpsertKafkaTest extends AnyFunSuite {
val topic = "test-UpsertKafkaTest-1"
//Test Case 1
test("write to upsert kafka: upsert-kafka as sink") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val ds: DataStream[Stock] = env.addSource(new StockSource(emitInterval = 1500, print = false))
ds.print()
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceTable", ds)
val ddl =
s"""
CREATE TABLE sinkTable (
id STRING,
total_price DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '$topic',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
tenv.executeSql(
"""
insert into sinkTable
select id, sum(price)
from sourceTable
group by id
""".stripMargin(' '))
env.execute()
}
//Test Case 2
test("read from upsert kafka: upsert-kafka as source 2") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
s"""
CREATE TABLE sourceTable (
id STRING,
total_price DOUBLE,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '$topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup001',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
val result = tenv.executeSql(
"""
select * from sourceTable
""".stripMargin(' '))
result.print()
/*
+----+--------------------------------+--------------------------------+
| op | id | total_price |
+----+--------------------------------+--------------------------------+
| +I | id1 | 1.0 |
| -U | id1 | 1.0 |
| +U | id1 | 3.0 |
| -U | id1 | 3.0 |
| +U | id1 | 6.0 |
| -U | id1 | 6.0 |
| +U | id1 | 10.0 |
| -U | id1 | 10.0 |
| +U | id1 | 15.0 |
| -U | id1 | 15.0 |
| +U | id1 | 21.0 |
| -U | id1 | 21.0 |
| +U | id1 | 28.0 |
| -U | id1 | 28.0 |
| +U | id1 | 36.0 |
| -U | id1 | 36.0 |
| +U | id1 | 45.0 |
*/
}
//Test Case 3
test("read from upsert kafka with consumer console") {
/*
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-UpsertKafkaTest-1 --from-beginning
{"id":"id1","total_price":1.0}
{"id":"id1","total_price":3.0}
{"id":"id1","total_price":6.0}
{"id":"id1","total_price":10.0}
{"id":"id1","total_price":15.0}
{"id":"id1","total_price":21.0}
{"id":"id1","total_price":28.0}
{"id":"id1","total_price":36.0}
{"id":"id1","total_price":45.0}
*/
}
}
对于 Flink SQL,我们谈到了 table 和流之间的二元性——流可以被认为是(动态的)table,反之亦然。 streams/tables有两种类型:追加和更新。一个append流对应一个只执行INSERT操作的动态table;没有任何内容被删除或更新。一个更新流对应一个动态的table,其中的行可以被更新和删除。
您的来源 table 是一个 upsert-kafka table,因此是一个更新 table(不是附加 table)。 upsert-kafka 源对应于压缩主题,当压缩发生时,会导致 updates/retractions 各种键的现有值随时间更新。
当更新 table 转换为流时,有两种可能的结果:您要么得到更新插入流,要么得到撤回流。有些接收器支持这些类型的更新流中的一种或另一种,有些则同时支持这两种类型。
您看到的是 upsert-kafka sink 可以处理 upsert,而 print sink 不能。因此,相同的更新 table 作为更新插入(可能还有删除)事件流被提供给 Kafka,并且它作为流被发送到标准输出,每个键都有一个初始插入(+I),然后是 update_before/update_after 对编码为 -U +U 每次更新(和删除,如果发生的话)。