如何使用 BigQuery 和 Apache Beam 将 SQL table 转换为行序列列表?

How to transform an SQL table into a list of row sequences using BigQuery and Apache Beam?

我有一个非常大的 table,其中每一行代表一个称为旅行的抽象。行程由数字列组成,例如车辆 ID、行程 ID、开始时间、停止时间、行驶距离、行驶持续时间等。因此每个行程都是浮点值的一维向量。

我想将此 table 或矢量列表转换为行程序列列表,其中行程按车辆 ID 分组到序列中,并根据开始时间排序。序列长度需要限制为特定大小,例如 256,但可以/应该有多个具有相同 VehicleId 的序列。

示例:
(序列长度 = 4)

[  
(Vehicle1, [Trip1, Trip2, Trip3, Trip4]),  
(Vehicle1, [Trip5, Trip6, Trip7]),  
(Vehicle2, [Trip1, Trip2, Trip3, Trip4])  
]

我正在尝试使用基于序列的模型(例如 LSTM/Transformer)基于这些行程对驾驶模式进行建模。将每个 Trip 想象成一个词嵌入,将每个 trip 序列想象成一个句子。由于我们讨论的是数百 GB 的数据,因此我需要通过 BigQuery / Apache Beam 函数(或任何其他推荐的工具)的组合来构建这些句子。我对这两种工具都很陌生,因此非常感谢您的帮助。

以下适用于 BigQuery 标准 SQL

#standardSQL
SELECT trip.vehicle_id, ARRAY_AGG(trip ORDER BY trip.start_time) trips
FROM (
  SELECT trip, DIV(ROW_NUMBER() OVER(PARTITION BY vehicle_id ORDER BY start_time) - 1, 4) grp   
  FROM `project.dataset.table` trip
)
GROUP BY trip.vehicle_id, grp

以上假定按 start_time 排序行程且序列长度 = 4
此外,它 returns vehicle_id 作为数组中旅行信息的一部分 - 如下面的示例

Row vehicle_id  trips.vehicle_id    trips.trip_id   trips.start_time    trips.stop_time  
1   Vehicle1    Vehicle1            Trip1           1                   2    
                Vehicle1            Trip2           2                   3    
                Vehicle1            Trip3           3                   4    
                Vehicle1            Trip4           4                   5    
2   Vehicle1    Vehicle1            Trip5           5                   6    
                Vehicle1            Trip6           6                   6    
                Vehicle1            Trip7           7                   6    
3   Vehicle2    Vehicle2            Trip1           2                   3    
                Vehicle2            Trip2           3                   4    
                Vehicle2            Trip3           4                   5    
                Vehicle2            Trip4           5                   6    

要消除此问题 - 请尝试以下

#standardSQL
SELECT vehicle_id,
  ARRAY( 
    SELECT AS STRUCT * EXCEPT(vehicle_id)
    FROM UNNEST(trips)
    ORDER BY start_time
  ) trips
FROM (
  SELECT trip.vehicle_id, ARRAY_AGG(trip ORDER BY trip.start_time) trips
  FROM (
    SELECT trip, DIV(ROW_NUMBER() OVER(PARTITION BY vehicle_id ORDER BY start_time) - 1, 4) grp   
    FROM `project.dataset.table` trip
  )
  GROUP BY trip.vehicle_id, grp
)


Row vehicle_id  trips.trip_id   trips.start_time    trips.stop_time  
1   Vehicle1    Trip1           1                   2    
                Trip2           2                   3    
                Trip3           3                   4    
                Trip4           4                   5    
2   Vehicle1    Trip5           5                   6    
                Trip6           6                   6    
                Trip7           7                   6    
3   Vehicle2    Trip1           2                   3    
                Trip2           3                   4    
                Trip3           4                   5    
                Trip4           5                   6