toChanglelogStream 打印不同类型的变化
toChanglelogStream prints different kinds of changes
读书
示例 1:
// === EXAMPLE 1 ===
// interpret the stream as a retract stream
// create a changelog DataStream
val dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12)),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))
// interpret the DataStream as a Table
val table = tableEnv.fromChangelogStream(dataStream)
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print()
// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// +----+--------------------------------+-------------+
示例 2:
// === EXAMPLE 2 ===
// convert to DataStream in the simplest and most general way possible (no event-time)
val simpleTable = tableEnv
.fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
.as("name", "score")
.groupBy($"name")
.select($"name", $"score".sum())
tableEnv
.toChangelogStream(simpleTable)
.executeAndCollect()
.foreach(println)
// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]
对于这两个例子,我会问为什么第一个在最后两条记录中打印-D
和+I
,而第二个打印-U
和+U
.这里确定变化类型的规则是什么?谢谢。
造成差异的原因有两部分,都在GroupAggFunction
中定义,也就是用来处理这个查询的process函数。
首先是这部分代码:
// update aggregate result and set to the newRow
if (isAccumulateMsg(input)) {
// accumulate input
function.accumulate(input);
} else {
// retract input
function.retract(input);
}
当收到给定键的新值时,该方法首先检查它是累积消息(RowKind.INSERT
或 RowKind.UPDATE_AFTER
)还是收回消息(RowKind.UPDATE_BEFORE
) .
在您的第一个示例中,您自己明确声明了 RowKind
。当执行到Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12))
,这是一个retraction message,它会先retraction现有累加器的输入。这意味着在撤回之后,我们最终得到一个具有空累加器的密钥。发生这种情况时,将达到以下行:
} else {
// we retracted the last record for this key
// sent out a delete message
if (!firstRow) {
// prepare delete message for previous row
resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
out.collect(resultRow);
}
// and clear all state
accState.clear();
// cleanup dataview under current key
function.cleanup();
}
由于这不是键“Alice”收到的第一行,我们为前一行发出删除消息,然后下一行将发出 INSERT
.
对于您未明确指定 RowKind
的第二个示例,默认情况下所有消息都使用 RowKind.INSERT
接收。这意味着现在我们不收回现有的累加器,并采取以下代码路径:
if (!recordCounter.recordCountIsZero(accumulators)) {
// we aggregated at least one record for this key
// update the state
accState.update(accumulators);
// if this was not the first row and we have to emit retractions
if (!firstRow) {
if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, newAggValue)) {
// newRow is the same as before and state cleaning is not enabled.
// We do not emit retraction and acc message.
// If state cleaning is enabled, we have to emit messages to prevent too early
// state eviction of downstream operators.
return;
} else {
// retract previous result
if (generateUpdateBefore) {
// prepare UPDATE_BEFORE message for previous row
resultRow
.replace(currentKey, prevAggValue)
.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(resultRow);
}
// prepare UPDATE_AFTER message for new row
resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
}
由于行数大于 0(我们没有撤回),并且这不是为键接收的第一行,并且因为 AggFunction
已将 generateUpdateBefore
设置为 true
,我们首先收到 UPDATE_BEFORE
消息 (-U
),紧接着是 UPDATE_AFTER
(+U
).
全部相关代码can be found here.
示例 1:
// === EXAMPLE 1 ===
// interpret the stream as a retract stream
// create a changelog DataStream
val dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12)),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))
// interpret the DataStream as a Table
val table = tableEnv.fromChangelogStream(dataStream)
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print()
// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// +----+--------------------------------+-------------+
示例 2:
// === EXAMPLE 2 ===
// convert to DataStream in the simplest and most general way possible (no event-time)
val simpleTable = tableEnv
.fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
.as("name", "score")
.groupBy($"name")
.select($"name", $"score".sum())
tableEnv
.toChangelogStream(simpleTable)
.executeAndCollect()
.foreach(println)
// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]
对于这两个例子,我会问为什么第一个在最后两条记录中打印-D
和+I
,而第二个打印-U
和+U
.这里确定变化类型的规则是什么?谢谢。
造成差异的原因有两部分,都在GroupAggFunction
中定义,也就是用来处理这个查询的process函数。
首先是这部分代码:
// update aggregate result and set to the newRow
if (isAccumulateMsg(input)) {
// accumulate input
function.accumulate(input);
} else {
// retract input
function.retract(input);
}
当收到给定键的新值时,该方法首先检查它是累积消息(RowKind.INSERT
或 RowKind.UPDATE_AFTER
)还是收回消息(RowKind.UPDATE_BEFORE
) .
在您的第一个示例中,您自己明确声明了 RowKind
。当执行到Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12))
,这是一个retraction message,它会先retraction现有累加器的输入。这意味着在撤回之后,我们最终得到一个具有空累加器的密钥。发生这种情况时,将达到以下行:
} else {
// we retracted the last record for this key
// sent out a delete message
if (!firstRow) {
// prepare delete message for previous row
resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
out.collect(resultRow);
}
// and clear all state
accState.clear();
// cleanup dataview under current key
function.cleanup();
}
由于这不是键“Alice”收到的第一行,我们为前一行发出删除消息,然后下一行将发出 INSERT
.
对于您未明确指定 RowKind
的第二个示例,默认情况下所有消息都使用 RowKind.INSERT
接收。这意味着现在我们不收回现有的累加器,并采取以下代码路径:
if (!recordCounter.recordCountIsZero(accumulators)) {
// we aggregated at least one record for this key
// update the state
accState.update(accumulators);
// if this was not the first row and we have to emit retractions
if (!firstRow) {
if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, newAggValue)) {
// newRow is the same as before and state cleaning is not enabled.
// We do not emit retraction and acc message.
// If state cleaning is enabled, we have to emit messages to prevent too early
// state eviction of downstream operators.
return;
} else {
// retract previous result
if (generateUpdateBefore) {
// prepare UPDATE_BEFORE message for previous row
resultRow
.replace(currentKey, prevAggValue)
.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(resultRow);
}
// prepare UPDATE_AFTER message for new row
resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
}
由于行数大于 0(我们没有撤回),并且这不是为键接收的第一行,并且因为 AggFunction
已将 generateUpdateBefore
设置为 true
,我们首先收到 UPDATE_BEFORE
消息 (-U
),紧接着是 UPDATE_AFTER
(+U
).
全部相关代码can be found here.