从 Akka 中的轮询方法创建源

Create Source from a polling method in Akka

我有一个方法,如果有对新元素的请求,可以使用 poll 方法从不同来源获取元素。我怎样才能从这个方法创建源,以便它只从轮询方法中请求一个新元素?

akka.io 博客 post 中解释了与轮询 API 集成:Writing Akka Streams Connectors for existing APIs 部分 "Polling based APIs"。

它的核心是你想要扩展一个 TimerGraphStageLogic,并做如下事情:

private void schedulePoll() {
  scheduleOnce("poll", pollInterval);
}

@Override
public void onTimer(Object timerKey) {
  if (!isClosed(out)) {
    doPoll();
    if (!buffer.isEmpty()) {
      pushHead();
    } else {
      schedulePoll();
    }
  }
}

安排投票。

或者,您可以坚持在 unfoldAsyncResource.

内实现所有回调

可以找到此类阶段的完整实现 here(在 Java 中实现)。