在 flink processFunction 中,onTimer() 函数中所有 mapstate 为空

in flink processFunction, all mapstate is empty in onTimer() function

我想通过processKeyedFunction实现aggregationFunction,因为默认的aggregationFunction不支持rich function, 此外,我尝试了aggreagationFunction + processWindowFunction(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html),但也不能满足我的需求,所以我不得不使用基本的processKeyedFunction来实现aggregationFunction,我的问题详情如下:

在processFunction中,我定义了一个windowstate元素的聚合值状态,代码如下:

public void open(Configuration parameters) throws Exception {
followCacheMap = FollowSet.getInstance();
windowState = getRuntimeContext().getMapState(windowStateDescriptor);
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
        "timer",
        Long.class
));

在processElement()函数中,我使用windowState(open函数中初始化的MapState)聚合window元素,注册第一个timeServie清除当前window状态,代码如下:

 @Override
public void processElement(FollowData value, Context ctx, Collector<FollowData> out) throws Exception 
{
      if ( (currentTimer==null || (currentTimer.value() ==null) || (long)currentTimer.value()==0 ) && value.getClickTime() != null) {
            currentTimer.update(value.getClickTime() + interval);
            ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
        } 
       windowState = doMyAggregation(value);
}

在 onTimer() 函数中,首先,我在下一分钟注册下一个 timeService,并清除 window 状态

 @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<FollowData> out) throws Exception {
        currentTimer.update(timestamp + interval);   // interval is 1 minute
        ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
        
        out.collect(windowState);
        windowState.clear();
     }

但是当程序为运行时,我发现onTimer中的windowState全部为空,而processElement()函数中却不为空,不知道为什么发生了,也许执行逻辑不同,我该如何解决, 提前致谢!



关于 doMyAggregation() 部分的新增代码


windowState是一个MapState,key是"mykey",value是一个自定义的Object AggregateFollow

public class AggregateFollow {
    private String clicked;
    private String unionid;
    private ArrayList allFollows;
    private int enterCnt;
    private Long clickTime;

}

和doMyAggregation(value)函数差不多,doMyAggregation的作用是获取所有源字段为'follow'的值,但如果没有字段为[=40的值=] 在1分钟内,'follow'值应该是过时的,总之,它就像'follow'数据和'click'数据的连接操作,

AggregateFollow acc = windowState.get(windowkey);
    String flag = acc.getClicked();
    ArrayList<FollowData> followDataList = acc.getAllFollows();
    if ("0".equals(flag)) {
        if ("follow".equals(value.getSource())) {
            followDataList.add(value);
            acc.setAllFollows(followDataList);
        }
        if ("click".equals(value.getSource())) {
            String unionid = value.getUnionid();
            clickTime = value.getClickTime();
            if (followDataList.size() > 0) {
                ArrayList listNew = new ArrayList();
                for (FollowData followData : followDataList) {
                    followData.setUnionid(unionid);
                    followData.setClickTime(clickTime);
                    followData.setSource("joined_flag");   // 
                }
                acc.setAllFollows(listNew);
            }
            acc.setClicked("1");
            acc.setUnionid(unionid);
            acc.setClickTime(clickTime);
            windowState.put(windowkey, acc);
        }
    } else if ("1".equals(flag)) {
        if ("follow".equals(value.getSource())) {
            value.setUnionid(acc.getUnionid());
            value.setClickTime(acc.getClickTime());
            value.setSource("joined_flag");  
            followDataList.add(value);
            acc.setAllFollows(followDataList);
            windowState.put(windowkey, acc);
        }
    }

因为性能问题,原来的windowAPI对我来说不是一个有效的选择,我认为这里唯一的方法是使用 processFunction + ontimer 和 Guava Cache , 非常感谢

如果 windowState 为空,那么查看 doMyAggregation(value) 正在做什么会很有帮助。

如果没有更多上下文,很难对此进行调试或提出好的替代方案,但 out.collect(windowState) 不会按预期工作。您可能想要做的是迭代此 MapState 并将它包含的每个 key/value 对收集到输出中。

我把windowState的类型从MapState改成了ValueState,问题解决了,可能是bug什么的,谁能解释一下?