在 Flink KeyedStream 上暂停处理
Pausing Processing on Flink KeyedStream
我有一个 Flink 流应用程序,它需要能够对特定的键控流进行 'pause' 和 'unpause' 处理。 'Processing' 表示仅对流执行一些简单的异常检测。
我们考虑的流程是这样的:
命令流,ProcessCommand
、PauseCommand
或 ResumeCommand
,每个命令都有一个 id
,用于 KeyBy
。
ProcessCommands
会检查key在处理之前是否暂停,如果没有就缓冲。
PauseCommands
将暂停键的处理。
ResumeCommands
将取消暂停键的处理并刷新缓冲区。
这个流程看起来合理吗?如果合理,我是否可以使用类似 split
运算符的方法来实现?
示例流,省略了单个记录时间戳:
[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]
Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=>
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=>
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off
这可以使用 Flink's Window operator 来实现。首先,通过应用 map
操作创建基于 POJO
或 tuple
的流。
然后,根据您的需要,您可以在该流上使用 keyBy
以获得 keyedStream
。
现在,通过结合使用基于时间的无限 window
、trigger
和 window function
,您可以实现命令流的切换行为。
基本上,您可以使用windows
作为您的缓冲区,在收到暂停记录后,它会保留进程记录,直到收到恢复记录。您将编写一个自定义触发器,根据您的场景驱逐 window(缓冲区)。
以下是 Trigger
的自定义实现,具有 onElement()
覆盖的方法。
/**
* We trigger the window processing as per command inside the record. The
* process records are buffered when a pause record is received and the
* buffer is evicted once resume record is received. If no pause record is
* received earlier, then for each process record the buffer is evicted.
*/
@Override
public TriggerResult onElement(Tuple2<Integer, String> element, long timestamp, Window window,
TriggerContext context) throws Exception {
if (element.f1.equals("pause")) {
paused = true;
return TriggerResult.CONTINUE;
} else if (element.f1.equals("resume")) {
paused = false;
return TriggerResult.FIRE_AND_PURGE;
} else if (paused) // paused is a ValueState per keyed stream.
return TriggerResult.CONTINUE;
return TriggerResult.FIRE_AND_PURGE;
}
中的完整工作示例
我有一个 Flink 流应用程序,它需要能够对特定的键控流进行 'pause' 和 'unpause' 处理。 'Processing' 表示仅对流执行一些简单的异常检测。
我们考虑的流程是这样的:
命令流,ProcessCommand
、PauseCommand
或 ResumeCommand
,每个命令都有一个 id
,用于 KeyBy
。
ProcessCommands
会检查key在处理之前是否暂停,如果没有就缓冲。
PauseCommands
将暂停键的处理。
ResumeCommands
将取消暂停键的处理并刷新缓冲区。
这个流程看起来合理吗?如果合理,我是否可以使用类似 split
运算符的方法来实现?
示例流,省略了单个记录时间戳:
[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]
Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=>
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=>
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off
这可以使用 Flink's Window operator 来实现。首先,通过应用 map
操作创建基于 POJO
或 tuple
的流。
然后,根据您的需要,您可以在该流上使用 keyBy
以获得 keyedStream
。
现在,通过结合使用基于时间的无限 window
、trigger
和 window function
,您可以实现命令流的切换行为。
基本上,您可以使用windows
作为您的缓冲区,在收到暂停记录后,它会保留进程记录,直到收到恢复记录。您将编写一个自定义触发器,根据您的场景驱逐 window(缓冲区)。
以下是 Trigger
的自定义实现,具有 onElement()
覆盖的方法。
/**
* We trigger the window processing as per command inside the record. The
* process records are buffered when a pause record is received and the
* buffer is evicted once resume record is received. If no pause record is
* received earlier, then for each process record the buffer is evicted.
*/
@Override
public TriggerResult onElement(Tuple2<Integer, String> element, long timestamp, Window window,
TriggerContext context) throws Exception {
if (element.f1.equals("pause")) {
paused = true;
return TriggerResult.CONTINUE;
} else if (element.f1.equals("resume")) {
paused = false;
return TriggerResult.FIRE_AND_PURGE;
} else if (paused) // paused is a ValueState per keyed stream.
return TriggerResult.CONTINUE;
return TriggerResult.FIRE_AND_PURGE;
}
中的完整工作示例