Apache Beam 中的会话 windows 与 python
Session windows in Apache Beam with python
我有一连串的用户事件。我已经将它们映射到 KV{ userId, event },并分配了时间戳。
这是 运行 流式传输模式。我希望能够创建以下输入输出结果:
届会window差距=1
- 输入:
user=1, timestamp=1, event=a
- 输入:
user=2, timestamp=2, event=a
- 输入:
user=2, timestamp=3, event=a
- 输入:
user=1, timestamp=2, event=b
- 时间:
lwm=3
- 输出:
user=1, [ { event=a, timestamp=1 }, { event=b, timestamp=2 } ]
- 时间:
lwm=4
- 输出:
user=2, [ { event=a, timestamp=2 }, { event=a, timestamp=3 } ]
这样我就可以编写函数来为用户减少会话 window 中的事件列表以及会话的开始和结束时间 window。
这个怎么写? (如果你回答;"look at the examples",这不是一个有效的答案,因为他们从不使用 window 作为参数将事件列表提供给 reducer)
如果我理解正确,这将是此 的后续行动,并且自然地通过添加我在我的解决方案中提出的 Group By Key 步骤来完成。
所以,参考我之前的解释,这里只关注变化,如果我们有这样的管道:
events = (p
| 'Create Events' >> beam.Create(user1_data + user2_data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'keyed_on_user_id' >> beam.Map(lambda x: (x['user_id'], x))
| 'user_session_window' >> beam.WindowInto(window.Sessions(session_gap),
timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
| 'Group' >> beam.GroupByKey() \
| 'analyze_session' >> beam.ParDo(AnalyzeSession()))
现在元素已经按照你在问题描述中描述的那样排列了,所以我们可以简单地登录它们 AnalyzeSession
:
class AnalyzeSession(beam.DoFn):
"""Prints per session information"""
def process(self, element, window=beam.DoFn.WindowParam):
logging.info(element)
yield element
获得想要的结果:
INFO:root:('Groot', [{'timestamp': 1554203778.904401, 'user_id': 'Groot', 'value': 'event_0'}, {'timestamp': 1554203780.904401, 'user_id': 'Groot', 'value': 'event_1'}])
INFO:root:('Groot', [{'timestamp': 1554203786.904402, 'user_id': 'Groot', 'value': 'event_2'}])
INFO:root:('Thanos', [{'timestamp': 1554203792.904399, 'user_id': 'Thanos', 'value': 'event_4'}])
INFO:root:('Thanos', [{'timestamp': 1554203784.904398, 'user_id': 'Thanos', 'value': 'event_3'}, {'timestamp': 1554203777.904395, 'user_id': 'Thanos', 'value': 'event_0'}, {'timestamp': 1554203778.904397, 'user_id': 'Thanos', 'value': 'event_1'}, {'timestamp': 1554203780.904398, 'user_id': 'Thanos', 'value': 'event_2'}])
如果您想避免冗余信息,例如将 user_id
和 timestamp
作为值的一部分,可以在 Map
步骤中删除它们。
根据完整的用例(即减少每个会话级别的聚合事件),我们可以像这样计算事件数量或会话持续时间:
class AnalyzeSession(beam.DoFn):
"""Prints per session information"""
def process(self, element, window=beam.DoFn.WindowParam):
user = element[0]
num_events = str(len(element[1]))
window_end = window.end.to_utc_datetime()
window_start = window.start.to_utc_datetime()
session_duration = window_end - window_start
logging.info(">>> User %s had %s event(s) in %s session", user, num_events, session_duration)
yield element
对于我的示例,它将输出以下内容:
INFO:root:>>> User Groot had 2 event(s) in 0:00:07 session
INFO:root:>>> User Groot had 1 event(s) in 0:00:05 session
INFO:root:>>> User Thanos had 4 event(s) in 0:00:12 session
INFO:root:>>> User Thanos had 1 event(s) in 0:00:05 session
完整代码here
我有一连串的用户事件。我已经将它们映射到 KV{ userId, event },并分配了时间戳。
这是 运行 流式传输模式。我希望能够创建以下输入输出结果:
届会window差距=1
- 输入:
user=1, timestamp=1, event=a
- 输入:
user=2, timestamp=2, event=a
- 输入:
user=2, timestamp=3, event=a
- 输入:
user=1, timestamp=2, event=b
- 时间:
lwm=3
- 输出:
user=1, [ { event=a, timestamp=1 }, { event=b, timestamp=2 } ]
- 时间:
lwm=4
- 输出:
user=2, [ { event=a, timestamp=2 }, { event=a, timestamp=3 } ]
这样我就可以编写函数来为用户减少会话 window 中的事件列表以及会话的开始和结束时间 window。
这个怎么写? (如果你回答;"look at the examples",这不是一个有效的答案,因为他们从不使用 window 作为参数将事件列表提供给 reducer)
如果我理解正确,这将是此
所以,参考我之前的解释,这里只关注变化,如果我们有这样的管道:
events = (p
| 'Create Events' >> beam.Create(user1_data + user2_data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'keyed_on_user_id' >> beam.Map(lambda x: (x['user_id'], x))
| 'user_session_window' >> beam.WindowInto(window.Sessions(session_gap),
timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
| 'Group' >> beam.GroupByKey() \
| 'analyze_session' >> beam.ParDo(AnalyzeSession()))
现在元素已经按照你在问题描述中描述的那样排列了,所以我们可以简单地登录它们 AnalyzeSession
:
class AnalyzeSession(beam.DoFn):
"""Prints per session information"""
def process(self, element, window=beam.DoFn.WindowParam):
logging.info(element)
yield element
获得想要的结果:
INFO:root:('Groot', [{'timestamp': 1554203778.904401, 'user_id': 'Groot', 'value': 'event_0'}, {'timestamp': 1554203780.904401, 'user_id': 'Groot', 'value': 'event_1'}])
INFO:root:('Groot', [{'timestamp': 1554203786.904402, 'user_id': 'Groot', 'value': 'event_2'}])
INFO:root:('Thanos', [{'timestamp': 1554203792.904399, 'user_id': 'Thanos', 'value': 'event_4'}])
INFO:root:('Thanos', [{'timestamp': 1554203784.904398, 'user_id': 'Thanos', 'value': 'event_3'}, {'timestamp': 1554203777.904395, 'user_id': 'Thanos', 'value': 'event_0'}, {'timestamp': 1554203778.904397, 'user_id': 'Thanos', 'value': 'event_1'}, {'timestamp': 1554203780.904398, 'user_id': 'Thanos', 'value': 'event_2'}])
如果您想避免冗余信息,例如将 user_id
和 timestamp
作为值的一部分,可以在 Map
步骤中删除它们。
根据完整的用例(即减少每个会话级别的聚合事件),我们可以像这样计算事件数量或会话持续时间:
class AnalyzeSession(beam.DoFn):
"""Prints per session information"""
def process(self, element, window=beam.DoFn.WindowParam):
user = element[0]
num_events = str(len(element[1]))
window_end = window.end.to_utc_datetime()
window_start = window.start.to_utc_datetime()
session_duration = window_end - window_start
logging.info(">>> User %s had %s event(s) in %s session", user, num_events, session_duration)
yield element
对于我的示例,它将输出以下内容:
INFO:root:>>> User Groot had 2 event(s) in 0:00:07 session
INFO:root:>>> User Groot had 1 event(s) in 0:00:05 session
INFO:root:>>> User Thanos had 4 event(s) in 0:00:12 session
INFO:root:>>> User Thanos had 1 event(s) in 0:00:05 session
完整代码here