在 windows 中复制数据以填充无限数量的点
Duplicating data across windows to fill in an indefinite number of spots
我正在尝试将数据记录复制到未来 windows。这解决的问题是每个 window 的计算统计数据将更加准确,因为此数据是连续的(如温度)并且需要基线值。
在这些图表中,每个方框代表一个固定的 window。每个 window 中的数字表示来自某个源的 PCollection 中的数据。
这是输入 PCollection 的示例:
+---------+---------+---------+--------->
| 1 2 | 3 | | |
+---------+---------+---------+--------->
以及生成的输出 PCollection:
+---------+---------+---------+--------->
| 1 2 | 2 3 | 3 | 3 |
+---------+---------+---------+--------->
注意最新的数据点(基于事件时间戳)如何转发到下一个 window。如果有多个空值windows,则必须重新转发该值
我已经解决了运行 ed PCollection 通过有状态的 DoFn 发出一次额外的复制和修改元素转发一次的问题:
public class DupeFn extends DoFn<Datum, Datum> {
@StateId("latest")
private final StateSpec<ValueState<Datum>> latestStateSpec = StateSpecs.value();
@TimerId("emit")
private final TimerSpec emitSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void processElement(
@Element Datum element,
OutputReceiver<Datum> receiver,
IntervalWindow window,
@StateId("latest") ValueState<Datum> latest,
@TimerId("emit") Timer emit
) {
emit.set(window.maxTimestamp());
Datum prev = latest.read();
if (prev == null || element.timestamp > prev.timestamp) {
latest.write(element);
}
receiver.output(element);
}
@OnTimer("emit")
public void emitLatest(
OutputReceiver<Datum> receiver,
IntervalWindow window,
@StateId("latest") ValueState<Datum> latest
) {
Datum last = latest.read();
// modify the timestamp such that it lands in the next window
last.timestamp = window.end().getMillis() + 10;
last.id += " DUPED";
receiver.outputWithTimestamp(last, new Instant(last.timestamp));
}
}
现在的问题是,如果 window 为空,则不会将任何内容复制到程序 window 中。理想情况下,行为将如上图所述。
有办法吗?
编辑
我找到 this 相关未发表的博客 post。
目前 Beam 文档存在一个小问题,一旦问题解决,博客就会出现。循环计时器将为您提供解决此问题的部分方法。因为它将确保即使在没有数据的情况下,每个间隔 window 也有 activity。
编辑:博客现在可以在这里找到 Link to Blog
下一段需要使用 Global Windows,这会带来更多的复杂性。下周的 Apache Beam 峰会上将对此进行讨论。
为了保持状态,您需要将固定的 windows 聚合流入 GlobalWindow。但是 GlobalWindow 不保证顺序,因此您需要遵循以下流程:
@ProcessElement
- 将元素保存到 BaggedState
- 如果尚未设置时间戳,则创建一个 EventTime 时间戳。
You will need something like this to solve for that
@OnTimer
- 读取 BaggedList 并按时间戳排序
- 如果下一个聚合没有值(它是使用循环计时器而不是外部数据源生成的),则将每个聚合的最终值链接到下一个聚合中。
- 输出时间戳小于 OnTimer.Timestamp
的所有值
- 清除任何已处理元素的 baglist,今天请注意,这是低效的,因为您无法从列表中删除特定元素。如果您查看 Apache Beam 上的开发列表,将会有一个关于未来对排序地图请求的很好的讨论,这将在这里非常有用。
抱歉,这不是一个简短的回答!
我正在尝试将数据记录复制到未来 windows。这解决的问题是每个 window 的计算统计数据将更加准确,因为此数据是连续的(如温度)并且需要基线值。
在这些图表中,每个方框代表一个固定的 window。每个 window 中的数字表示来自某个源的 PCollection 中的数据。
这是输入 PCollection 的示例:
+---------+---------+---------+--------->
| 1 2 | 3 | | |
+---------+---------+---------+--------->
以及生成的输出 PCollection:
+---------+---------+---------+--------->
| 1 2 | 2 3 | 3 | 3 |
+---------+---------+---------+--------->
注意最新的数据点(基于事件时间戳)如何转发到下一个 window。如果有多个空值windows,则必须重新转发该值
我已经解决了运行 ed PCollection 通过有状态的 DoFn 发出一次额外的复制和修改元素转发一次的问题:
public class DupeFn extends DoFn<Datum, Datum> {
@StateId("latest")
private final StateSpec<ValueState<Datum>> latestStateSpec = StateSpecs.value();
@TimerId("emit")
private final TimerSpec emitSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void processElement(
@Element Datum element,
OutputReceiver<Datum> receiver,
IntervalWindow window,
@StateId("latest") ValueState<Datum> latest,
@TimerId("emit") Timer emit
) {
emit.set(window.maxTimestamp());
Datum prev = latest.read();
if (prev == null || element.timestamp > prev.timestamp) {
latest.write(element);
}
receiver.output(element);
}
@OnTimer("emit")
public void emitLatest(
OutputReceiver<Datum> receiver,
IntervalWindow window,
@StateId("latest") ValueState<Datum> latest
) {
Datum last = latest.read();
// modify the timestamp such that it lands in the next window
last.timestamp = window.end().getMillis() + 10;
last.id += " DUPED";
receiver.outputWithTimestamp(last, new Instant(last.timestamp));
}
}
现在的问题是,如果 window 为空,则不会将任何内容复制到程序 window 中。理想情况下,行为将如上图所述。
有办法吗?
编辑
我找到 this 相关未发表的博客 post。
目前 Beam 文档存在一个小问题,一旦问题解决,博客就会出现。循环计时器将为您提供解决此问题的部分方法。因为它将确保即使在没有数据的情况下,每个间隔 window 也有 activity。
编辑:博客现在可以在这里找到 Link to Blog
下一段需要使用 Global Windows,这会带来更多的复杂性。下周的 Apache Beam 峰会上将对此进行讨论。
为了保持状态,您需要将固定的 windows 聚合流入 GlobalWindow。但是 GlobalWindow 不保证顺序,因此您需要遵循以下流程:
@ProcessElement
- 将元素保存到 BaggedState
- 如果尚未设置时间戳,则创建一个 EventTime 时间戳。 You will need something like this to solve for that
@OnTimer
- 读取 BaggedList 并按时间戳排序
- 如果下一个聚合没有值(它是使用循环计时器而不是外部数据源生成的),则将每个聚合的最终值链接到下一个聚合中。
- 输出时间戳小于 OnTimer.Timestamp 的所有值
- 清除任何已处理元素的 baglist,今天请注意,这是低效的,因为您无法从列表中删除特定元素。如果您查看 Apache Beam 上的开发列表,将会有一个关于未来对排序地图请求的很好的讨论,这将在这里非常有用。
抱歉,这不是一个简短的回答!