SessionWindows启动和结束时向Kafka发送消息

Send message to Kafka when SessionWindows was started and ended

我想在创建新 SessionWindow 和结束时向 Kafka 主题发送消息。我有以下代码

stream
    .filter(user -> user.isAdmin)
    .keyBy(user -> user.username)
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
//what now? Trigger?

现在我想在新会话开始时发送消息(使用一些元数据,例如网络浏览器和时间戳,这些信息在流的每个元素中都可用)并在会话结束时向 Kafka 发送消息(在本例中为 10我认为在最后一个元素之后的秒数)以及总请求数。

在Flink中可以吗?我想我应该使用一些触发器,但我不知道如何使用,也找不到任何示例。

如果你想在处理 window 时执行此操作,那么你可以简单地使用 WindowProcessFunction,基本上你需要做的是将 .process(new MyProcessFunction() 添加到你的代码中.在 ProcessFunction 您可以访问整个 window 包括它的第一个(开始)和最后一个(结束)元素。您可以简单地使用 Side 输出来输出给定 window 的开头和结尾。然后,您可以从侧输出创建一个流并将其接收到 Kafka。可以找到更多关于 Side outputs 的信息 here.

您可以编写自定义 window 触发器。

  1. 如何判断一个新会话已经开始?
    您可以创建一个默认值为 null 的 ValueState,因此如果状态值为 null,则表示会话开始。

  2. 会话何时结束?
    就在 TriggerResult.FIRE.

  3. 之前

这里有一个基于ProcessingTimeTriggerFlink的demo,我只把问题相关的逻辑放在这里,其他的可以查看源码。

public class MyProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    // a state which keeps a session start.
    private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<Long>("session-start", Long.class);

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ValueState<Long> state = ctx.getPartitionedState(stateDescriptor);
        if(state.value() == null) {
            // if value is null, it's a session start.
            state.update(window.getStart());
        }

        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        // here is a session end.
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDescriptor).clear();
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }
}