在 Flink KeyedStream 上暂停处理

Pausing Processing on Flink KeyedStream

我有一个 Flink 流应用程序,它需要能够对特定的键控流进行 'pause' 和 'unpause' 处理。 'Processing' 表示仅对流执行一些简单的异常检测。

我们考虑的流程是这样的:

命令流,ProcessCommandPauseCommandResumeCommand,每个命令都有一个 id,用于 KeyBy

ProcessCommands会检查key在处理之前是否暂停,如果没有就缓冲。

PauseCommands 将暂停键的处理。

ResumeCommands 将取消暂停键的处理并刷新缓冲区。

这个流程看起来合理吗?如果合理,我是否可以使用类似 split 运算符的方法来实现?

示例流,省略了单个记录时间戳:

[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]

Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=> 
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=> 
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off 

这可以使用 Flink's Window operator 来实现。首先,通过应用 map 操作创建基于 POJOtuple 的流。

然后,根据您的需要,您可以在该流上使用 keyBy 以获得 keyedStream

现在,通过结合使用基于时间的无限 windowtriggerwindow function,您可以实现命令流的切换行为。

基本上,您可以使用windows 作为您的缓冲区,在收到暂停记录后,它会保留进程记录,直到收到恢复记录。您将编写一个自定义触发器,根据您的场景驱逐 window(缓冲区)。

以下是 Trigger 的自定义实现,具有 onElement() 覆盖的方法。

/**
 * We trigger the window processing as per command inside the record. The
 * process records are buffered when a pause record is received and the
 * buffer is evicted once resume record is received. If no pause record is
 * received earlier, then for each process record the buffer is evicted.
 */
@Override
public TriggerResult onElement(Tuple2<Integer, String> element, long timestamp, Window window,
        TriggerContext context) throws Exception {
    if (element.f1.equals("pause")) {
        paused = true;
        return TriggerResult.CONTINUE;
    } else if (element.f1.equals("resume")) {
        paused = false;
        return TriggerResult.FIRE_AND_PURGE;
    } else if (paused) // paused is a ValueState per keyed stream.
        return TriggerResult.CONTINUE;
    return TriggerResult.FIRE_AND_PURGE;
}

查看此 github repository

中的完整工作示例