toChanglelogStream 打印不同类型的变化

toChanglelogStream prints different kinds of changes

我正在 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream

读书

示例 1:

// === EXAMPLE 1 ===

// interpret the stream as a retract stream

// create a changelog DataStream
val dataStream = env.fromElements(
    Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
    Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
    Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12)),
    Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))


// interpret the DataStream as a Table
val table = tableEnv.fromChangelogStream(dataStream)

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
    .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    .print()

// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -D |                          Alice |          12 |
// | +I |                          Alice |         100 |
// +----+--------------------------------+-------------+

示例 2:

// === EXAMPLE 2 ===

// convert to DataStream in the simplest and most general way possible (no event-time)

val simpleTable = tableEnv
    .fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
    .as("name", "score")
    .groupBy($"name")
    .select($"name", $"score".sum())

tableEnv
    .toChangelogStream(simpleTable)
    .executeAndCollect()
    .foreach(println)

// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]

对于这两个例子,我会问为什么第一个在最后两条记录中打印-D+I,而第二个打印-U+U.这里确定变化类型的规则是什么?谢谢。

造成差异的原因有两部分,都在GroupAggFunction中定义,也就是用来处理这个查询的process函数。

首先是这部分代码:

// update aggregate result and set to the newRow
if (isAccumulateMsg(input)) {
    // accumulate input
    function.accumulate(input);
} else {
    // retract input
    function.retract(input);
}

当收到给定键的新值时,该方法首先检查它是累积消息(RowKind.INSERTRowKind.UPDATE_AFTER)还是收回消息(RowKind.UPDATE_BEFORE) .

在您的第一个示例中,您自己明确声明了 RowKind。当执行到Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12)),这是一个retraction message,它会先retraction现有累加器的输入。这意味着在撤回之后,我们最终得到一个具有空累加器的密钥。发生这种情况时,将达到以下行:

} else {
    // we retracted the last record for this key
    // sent out a delete message
    if (!firstRow) {
        // prepare delete message for previous row
        resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
        out.collect(resultRow);
    }
    // and clear all state
    accState.clear();
    // cleanup dataview under current key
    function.cleanup();
}

由于这不是键“Alice”收到的第一行,我们为前一行发出删除消息,然后下一行将发出 INSERT.

对于您未明确指定 RowKind 的第二个示例,默认情况下所有消息都使用 RowKind.INSERT 接收。这意味着现在我们不收回现有的累加器,并采取以下代码路径:

if (!recordCounter.recordCountIsZero(accumulators)) {
    // we aggregated at least one record for this key

    // update the state
    accState.update(accumulators);

    // if this was not the first row and we have to emit retractions
    if (!firstRow) {
        if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, newAggValue)) {
            // newRow is the same as before and state cleaning is not enabled.
            // We do not emit retraction and acc message.
            // If state cleaning is enabled, we have to emit messages to prevent too early
            // state eviction of downstream operators.
            return;
        } else {
            // retract previous result
            if (generateUpdateBefore) {
                // prepare UPDATE_BEFORE message for previous row
                resultRow
                        .replace(currentKey, prevAggValue)
                        .setRowKind(RowKind.UPDATE_BEFORE);
                out.collect(resultRow);
            }
            // prepare UPDATE_AFTER message for new row
            resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
        }

由于行数大于 0(我们没有撤回),并且这不是为键接收的第一行,并且因为 AggFunction 已将 generateUpdateBefore 设置为 true,我们首先收到 UPDATE_BEFORE 消息 (-U),紧接着是 UPDATE_AFTER (+U).

全部相关代码can be found here.