将相同的用户事件聚合成两个不同的 windows
Aggregate same user events into two different windows
我正在尝试使用 Apache Beam 编写一个简单的管道。假设我正在接收类似以下内容的用户请求:
(country, user_id, score, timestamp)
我只想总结每个国家/地区所有用户的总分,每分钟和 10 分钟累计。但是,需要注意的是,我想获取每个用户、每个存储桶的最新分数。意思是,如果我有两条记录:
('USA', 1, 10, 62)
('USA', 1, 4, 64)
假设它们映射到同一个分钟桶,我想保留第二个记录(后一个得分为 4)。
我如何有效地做到这一点?现在,我正在获取用户事件流并将其通过管道传输到两个独立的分支 - 一个分支每分钟计算一次聚合,另一个分支每 10 分钟计算一次。显然,这里的大量计算量翻了一番。理想情况下,我们可以重复使用每 1 分钟的计算 window 来加起来 10 分钟 window 但我不太清楚该怎么做。
谢谢!
将元素通过管道传输到两个不同的分支可能没什么大不了的,但是是的,您可以通过避免重复聚合的方式来做到这一点。
假设您的 10 分钟和 1 分钟 windows 可以均匀地相互转换(固定时间 Windows 应该可以正常工作),您可以执行以下操作:
Assign 1 min. windows -> Aggregate -> Assign 10 min. windows -> Aggregate
在第一次聚合(可能是某种类型的 Combine)之后,生成的元素应该具有来自组合元素的最新时间戳(这可以通过更改 TimestampCombiner 来修改)。这意味着只要当您从一个转换为另一个时 windows 均匀排列,第二个聚合应该聚合所有与原始方法相同的数据。
对于问题的第二部分,要保留 window 的最新时间戳元素并删除其他元素,您需要实现自定义 CombineFn that keeps the most recent element. Now in order to actually read the timestamps of the elements from a CombineFn you'll first need to use Reify.timestamps 以将时间戳附加到元素。您可能希望您的 CombineFn 输出没有时间戳的原始元素类型。所以它看起来像这样(方括号中的 PCollections 以便您可以看到类型):
[ElementT] -> Reify.timestamps -> [TimestampedValue<ElementT>] -> Combine -> [ElementT]
我正在尝试使用 Apache Beam 编写一个简单的管道。假设我正在接收类似以下内容的用户请求:
(country, user_id, score, timestamp)
我只想总结每个国家/地区所有用户的总分,每分钟和 10 分钟累计。但是,需要注意的是,我想获取每个用户、每个存储桶的最新分数。意思是,如果我有两条记录:
('USA', 1, 10, 62)
('USA', 1, 4, 64)
假设它们映射到同一个分钟桶,我想保留第二个记录(后一个得分为 4)。
我如何有效地做到这一点?现在,我正在获取用户事件流并将其通过管道传输到两个独立的分支 - 一个分支每分钟计算一次聚合,另一个分支每 10 分钟计算一次。显然,这里的大量计算量翻了一番。理想情况下,我们可以重复使用每 1 分钟的计算 window 来加起来 10 分钟 window 但我不太清楚该怎么做。
谢谢!
将元素通过管道传输到两个不同的分支可能没什么大不了的,但是是的,您可以通过避免重复聚合的方式来做到这一点。
假设您的 10 分钟和 1 分钟 windows 可以均匀地相互转换(固定时间 Windows 应该可以正常工作),您可以执行以下操作:
Assign 1 min. windows -> Aggregate -> Assign 10 min. windows -> Aggregate
在第一次聚合(可能是某种类型的 Combine)之后,生成的元素应该具有来自组合元素的最新时间戳(这可以通过更改 TimestampCombiner 来修改)。这意味着只要当您从一个转换为另一个时 windows 均匀排列,第二个聚合应该聚合所有与原始方法相同的数据。
对于问题的第二部分,要保留 window 的最新时间戳元素并删除其他元素,您需要实现自定义 CombineFn that keeps the most recent element. Now in order to actually read the timestamps of the elements from a CombineFn you'll first need to use Reify.timestamps 以将时间戳附加到元素。您可能希望您的 CombineFn 输出没有时间戳的原始元素类型。所以它看起来像这样(方括号中的 PCollections 以便您可以看到类型):
[ElementT] -> Reify.timestamps -> [TimestampedValue<ElementT>] -> Combine -> [ElementT]