在 flink processFunction 中,onTimer() 函数中所有 mapstate 为空
in flink processFunction, all mapstate is empty in onTimer() function
我想通过processKeyedFunction实现aggregationFunction,因为默认的aggregationFunction不支持rich function,
此外,我尝试了aggreagationFunction + processWindowFunction(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html),但也不能满足我的需求,所以我不得不使用基本的processKeyedFunction来实现aggregationFunction,我的问题详情如下:
在processFunction中,我定义了一个windowstate元素的聚合值状态,代码如下:
public void open(Configuration parameters) throws Exception {
followCacheMap = FollowSet.getInstance();
windowState = getRuntimeContext().getMapState(windowStateDescriptor);
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"timer",
Long.class
));
在processElement()函数中,我使用windowState(open函数中初始化的MapState)聚合window元素,注册第一个timeServie清除当前window状态,代码如下:
@Override
public void processElement(FollowData value, Context ctx, Collector<FollowData> out) throws Exception
{
if ( (currentTimer==null || (currentTimer.value() ==null) || (long)currentTimer.value()==0 ) && value.getClickTime() != null) {
currentTimer.update(value.getClickTime() + interval);
ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
}
windowState = doMyAggregation(value);
}
在 onTimer() 函数中,首先,我在下一分钟注册下一个 timeService,并清除 window 状态
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<FollowData> out) throws Exception {
currentTimer.update(timestamp + interval); // interval is 1 minute
ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
out.collect(windowState);
windowState.clear();
}
但是当程序为运行时,我发现onTimer中的windowState全部为空,而processElement()函数中却不为空,不知道为什么发生了,也许执行逻辑不同,我该如何解决,
提前致谢!
关于 doMyAggregation() 部分的新增代码
windowState是一个MapState,key是"mykey",value是一个自定义的Object AggregateFollow
public class AggregateFollow {
private String clicked;
private String unionid;
private ArrayList allFollows;
private int enterCnt;
private Long clickTime;
}
和doMyAggregation(value)函数差不多,doMyAggregation的作用是获取所有源字段为'follow'的值,但如果没有字段为[=40的值=] 在1分钟内,'follow'值应该是过时的,总之,它就像'follow'数据和'click'数据的连接操作,
AggregateFollow acc = windowState.get(windowkey);
String flag = acc.getClicked();
ArrayList<FollowData> followDataList = acc.getAllFollows();
if ("0".equals(flag)) {
if ("follow".equals(value.getSource())) {
followDataList.add(value);
acc.setAllFollows(followDataList);
}
if ("click".equals(value.getSource())) {
String unionid = value.getUnionid();
clickTime = value.getClickTime();
if (followDataList.size() > 0) {
ArrayList listNew = new ArrayList();
for (FollowData followData : followDataList) {
followData.setUnionid(unionid);
followData.setClickTime(clickTime);
followData.setSource("joined_flag"); //
}
acc.setAllFollows(listNew);
}
acc.setClicked("1");
acc.setUnionid(unionid);
acc.setClickTime(clickTime);
windowState.put(windowkey, acc);
}
} else if ("1".equals(flag)) {
if ("follow".equals(value.getSource())) {
value.setUnionid(acc.getUnionid());
value.setClickTime(acc.getClickTime());
value.setSource("joined_flag");
followDataList.add(value);
acc.setAllFollows(followDataList);
windowState.put(windowkey, acc);
}
}
因为性能问题,原来的windowAPI对我来说不是一个有效的选择,我认为这里唯一的方法是使用 processFunction + ontimer 和 Guava Cache ,
非常感谢
如果 windowState
为空,那么查看 doMyAggregation(value)
正在做什么会很有帮助。
如果没有更多上下文,很难对此进行调试或提出好的替代方案,但 out.collect(windowState)
不会按预期工作。您可能想要做的是迭代此 MapState
并将它包含的每个 key/value 对收集到输出中。
我把windowState的类型从MapState改成了ValueState,问题解决了,可能是bug什么的,谁能解释一下?
我想通过processKeyedFunction实现aggregationFunction,因为默认的aggregationFunction不支持rich function, 此外,我尝试了aggreagationFunction + processWindowFunction(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html),但也不能满足我的需求,所以我不得不使用基本的processKeyedFunction来实现aggregationFunction,我的问题详情如下:
在processFunction中,我定义了一个windowstate元素的聚合值状态,代码如下:
public void open(Configuration parameters) throws Exception {
followCacheMap = FollowSet.getInstance();
windowState = getRuntimeContext().getMapState(windowStateDescriptor);
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"timer",
Long.class
));
在processElement()函数中,我使用windowState(open函数中初始化的MapState)聚合window元素,注册第一个timeServie清除当前window状态,代码如下:
@Override
public void processElement(FollowData value, Context ctx, Collector<FollowData> out) throws Exception
{
if ( (currentTimer==null || (currentTimer.value() ==null) || (long)currentTimer.value()==0 ) && value.getClickTime() != null) {
currentTimer.update(value.getClickTime() + interval);
ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
}
windowState = doMyAggregation(value);
}
在 onTimer() 函数中,首先,我在下一分钟注册下一个 timeService,并清除 window 状态
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<FollowData> out) throws Exception {
currentTimer.update(timestamp + interval); // interval is 1 minute
ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
out.collect(windowState);
windowState.clear();
}
但是当程序为运行时,我发现onTimer中的windowState全部为空,而processElement()函数中却不为空,不知道为什么发生了,也许执行逻辑不同,我该如何解决, 提前致谢!
关于 doMyAggregation() 部分的新增代码
windowState是一个MapState,key是"mykey",value是一个自定义的Object AggregateFollow
public class AggregateFollow {
private String clicked;
private String unionid;
private ArrayList allFollows;
private int enterCnt;
private Long clickTime;
}
和doMyAggregation(value)函数差不多,doMyAggregation的作用是获取所有源字段为'follow'的值,但如果没有字段为[=40的值=] 在1分钟内,'follow'值应该是过时的,总之,它就像'follow'数据和'click'数据的连接操作,
AggregateFollow acc = windowState.get(windowkey);
String flag = acc.getClicked();
ArrayList<FollowData> followDataList = acc.getAllFollows();
if ("0".equals(flag)) {
if ("follow".equals(value.getSource())) {
followDataList.add(value);
acc.setAllFollows(followDataList);
}
if ("click".equals(value.getSource())) {
String unionid = value.getUnionid();
clickTime = value.getClickTime();
if (followDataList.size() > 0) {
ArrayList listNew = new ArrayList();
for (FollowData followData : followDataList) {
followData.setUnionid(unionid);
followData.setClickTime(clickTime);
followData.setSource("joined_flag"); //
}
acc.setAllFollows(listNew);
}
acc.setClicked("1");
acc.setUnionid(unionid);
acc.setClickTime(clickTime);
windowState.put(windowkey, acc);
}
} else if ("1".equals(flag)) {
if ("follow".equals(value.getSource())) {
value.setUnionid(acc.getUnionid());
value.setClickTime(acc.getClickTime());
value.setSource("joined_flag");
followDataList.add(value);
acc.setAllFollows(followDataList);
windowState.put(windowkey, acc);
}
}
因为性能问题,原来的windowAPI对我来说不是一个有效的选择,我认为这里唯一的方法是使用 processFunction + ontimer 和 Guava Cache , 非常感谢
如果 windowState
为空,那么查看 doMyAggregation(value)
正在做什么会很有帮助。
如果没有更多上下文,很难对此进行调试或提出好的替代方案,但 out.collect(windowState)
不会按预期工作。您可能想要做的是迭代此 MapState
并将它包含的每个 key/value 对收集到输出中。
我把windowState的类型从MapState改成了ValueState,问题解决了,可能是bug什么的,谁能解释一下?