从 Akka 中的轮询方法创建源
Create Source from a polling method in Akka
我有一个方法,如果有对新元素的请求,可以使用 poll 方法从不同来源获取元素。我怎样才能从这个方法创建源,以便它只从轮询方法中请求一个新元素?
akka.io 博客 post 中解释了与轮询 API 集成:Writing Akka Streams Connectors for existing APIs 部分 "Polling based APIs"。
它的核心是你想要扩展一个 TimerGraphStageLogic
,并做如下事情:
private void schedulePoll() {
scheduleOnce("poll", pollInterval);
}
@Override
public void onTimer(Object timerKey) {
if (!isClosed(out)) {
doPoll();
if (!buffer.isEmpty()) {
pushHead();
} else {
schedulePoll();
}
}
}
安排投票。
或者,您可以坚持在 unfoldAsyncResource
.
内实现所有回调
可以找到此类阶段的完整实现 here(在 Java 中实现)。
我有一个方法,如果有对新元素的请求,可以使用 poll 方法从不同来源获取元素。我怎样才能从这个方法创建源,以便它只从轮询方法中请求一个新元素?
akka.io 博客 post 中解释了与轮询 API 集成:Writing Akka Streams Connectors for existing APIs 部分 "Polling based APIs"。
它的核心是你想要扩展一个 TimerGraphStageLogic
,并做如下事情:
private void schedulePoll() {
scheduleOnce("poll", pollInterval);
}
@Override
public void onTimer(Object timerKey) {
if (!isClosed(out)) {
doPoll();
if (!buffer.isEmpty()) {
pushHead();
} else {
schedulePoll();
}
}
}
安排投票。
或者,您可以坚持在 unfoldAsyncResource
.
可以找到此类阶段的完整实现 here(在 Java 中实现)。