在 windows 中复制数据以填充无限数量的点

Duplicating data across windows to fill in an indefinite number of spots

我正在尝试将数据记录复制到未来 windows。这解决的问题是每个 window 的计算统计数据将更加准确,因为此数据是连续的(如温度)并且需要基线值。

在这些图表中,每个方框代表一个固定的 window。每个 window 中的数字表示来自某个源的 PCollection 中的数据。

这是输入 PCollection 的示例:

+---------+---------+---------+--------->
| 1  2    |       3 |         |         |
+---------+---------+---------+--------->

以及生成的输出 PCollection:

+---------+---------+---------+--------->
| 1  2    | 2     3 | 3       | 3       |
+---------+---------+---------+--------->

注意最新的数据点(基于事件时间戳)如何转发到下一个 window。如果有多个空值windows,则必须重新转发该值

我已经解决了运行 ed PCollection 通过有状态的 DoFn 发出一次额外的复制和修改元素转发一次的问题:

public class DupeFn extends DoFn<Datum, Datum> {
    @StateId("latest")
    private final StateSpec<ValueState<Datum>> latestStateSpec = StateSpecs.value();

    @TimerId("emit")
    private final TimerSpec emitSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement
    public void processElement(
            @Element Datum element,
            OutputReceiver<Datum> receiver,
            IntervalWindow window,
            @StateId("latest") ValueState<Datum> latest,
            @TimerId("emit") Timer emit
    ) {
        emit.set(window.maxTimestamp());

        Datum prev = latest.read();

        if (prev == null || element.timestamp > prev.timestamp) {
            latest.write(element);
        }

        receiver.output(element);
    }

    @OnTimer("emit")
    public void emitLatest(
            OutputReceiver<Datum> receiver,
            IntervalWindow window,
            @StateId("latest") ValueState<Datum> latest
    ) {
        Datum last = latest.read();

        // modify the timestamp such that it lands in the next window
        last.timestamp = window.end().getMillis() + 10;
        last.id += " DUPED";

        receiver.outputWithTimestamp(last, new Instant(last.timestamp));
    }
}

现在的问题是,如果 window 为空,则不会将任何内容复制到程序 window 中。理想情况下,行为将如上图所述。

有办法吗?

编辑
我找到 this 相关未发表的博客 post。

目前 Beam 文档存在一个小问题,一旦问题解决,博客就会出现。循环计时器将为您提供解决此问题的部分方法。因为它将确保即使在没有数据的情况下,每个间隔 window 也有 activity。

编辑:博客现在可以在这里找到 Link to Blog

下一段需要使用 Global Windows,这会带来更多的复杂性。下周的 Apache Beam 峰会上将对此进行讨论。

Berlin Summit

为了保持状态,您需要将固定的 windows 聚合流入 GlobalWindow。但是 GlobalWindow 不保证顺序,因此您需要遵循以下流程:

@ProcessElement

  • 将元素保存到 BaggedState
  • 如果尚未设置时间戳,则创建一个 EventTime 时间戳。 You will need something like this to solve for that

@OnTimer

  • 读取 BaggedList 并按时间戳排序
  • 如果下一个聚合没有值(它是使用循环计时器而不是外部数据源生成的),则将每个聚合的最终值链接到下一个聚合中。
  • 输出时间戳小于 OnTimer.Timestamp
  • 的所有值
  • 清除任何已处理元素的 baglist,今天请注意,这是低效的,因为您无法从列表中删除特定元素。如果您查看 Apache Beam 上的开发列表,将会有一个关于未来对排序地图请求的很好的讨论,这将在这里非常有用。

抱歉,这不是一个简短的回答!