Flink:合并后如何发出?

Flink: How to emit after a merge?

我定义一个Transactionclass:

case class Transaction(accountId: Long, amount: Long, timestamp: Long)

TransactionSource 只是以一定的时间间隔发出 Transaction。现在我想计算每个帐户 ID 的最后 2 个交易时间戳,请参见下面的代码:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource

object LastNJob {

  final val QUERY =
    """
      |WITH last_n AS (
      |    SELECT accountId, `timestamp`
      |    FROM (
      |        SELECT *,
      |            ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num
      |        FROM transactions
      |    )
      |    WHERE row_num <= 2
      |)
      |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp
      |FROM last_n
      |GROUP BY accountId
      |""".stripMargin

  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings)

    val txnStream: DataStream[Transaction] = streamEnv
      .addSource(new TransactionSource)
      .name("transactions")

    tableEnv.createTemporaryView("transactions", txnStream)

    tableEnv.executeSql(QUERY).print()
  }
}

当我运行程序时,我得到:

+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| +I |                    1 |                  1546272000000 |
| +I |                    2 |                  1546272360000 |
| +I |                    3 |                  1546272720000 |
| +I |                    4 |                  1546273080000 |
| +I |                    5 |                  1546273440000 |
| -U |                    1 |                  1546272000000 |
| +U |                    1 |    1546272000000,1546273800000 |
| -U |                    2 |                  1546272360000 |
| +U |                    2 |    1546272360000,1546274160000 |
| -U |                    3 |                  1546272720000 |
| +U |                    3 |    1546272720000,1546274520000 |
| -U |                    4 |                  1546273080000 |
| +U |                    4 |    1546273080000,1546274880000 |
| -U |                    5 |                  1546273440000 |
| +U |                    5 |    1546273440000,1546275240000 |
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |
(to continue)

让我们关注 accountId=1 的最后一笔交易(从上面)。当 timestamp=1546275600000 有一笔来自账户 1 的新交易时,总共有 4 次操作。

+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |

虽然我只想通过某种合并向我的下游(假设是另一个 Kafka 主题)发出以下“新状态”:

+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |    1546273800000,1546275600000 |

以便我的下游能够真正消费“每个账户的最后 2 个交易时间戳”:

+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |                  1546272000000 |
|                    1 |    1546272000000,1546273800000 |
|                    1 |    1546273800000,1546275600000 |
(to continue)

正确的做法是什么?

这里是来自 Apache 用户邮件列表的 answer Timo Walther 的简短摘要。


方案一:使用DataStream API toRetractStreamfilter过滤掉delete-event,只有increment-events下沉到下游

解决方案 2:实施 UDF 以将 Top-2 和 LIST_AGG 操作合二为一。