使用行值作为分隔符将 spark 数据帧分成块

Divide spark dataframe into chunks using row values as separators

在我的 PySpark 代码中,我有一个 DataFrame 填充了来自传感器的数据,每一行都有时间戳,event_description 和 event_value。 每个传感器事件都由一个 id 和一个值定义的测量值组成。我唯一的保证是与单个事件相关的所有 "phases" 都包含在两个 EV_SEP 行之间(未排序)。 在每个事件 "block" 中都有一个事件标签,它是与 EV_CODE.

关联的值
+-------------------------+------------+-------------+
| timestamp               | event_id   | event_value |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:12.540 | EV_SEP     | -----       |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:14.201 | EV_2       | 10          |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:13.331 | EV_1       | 11          |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:15.203 | EV_CODE    | ABC         |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:16.670 | EV_SEP     | -----       |
+-------------------------+------------+-------------+

我想创建一个包含该标签的新列,以便我知道所有事件都与该标签相关联:

+-------------------------+----------+-------------+------------+
| timestamp               | event_id | event_value | event_code |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:12.540 | EV_SEP   | -----       | ABC        |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:14.201 | EV_2     | 10          | ABC        |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:13.331 | EV_1     | 11          | ABC        |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:15.203 | EV_CODE  | ABC         | ABC        |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:16.670 | EV_SEP   | -----       | ABC        |
+-------------------------+----------+-------------+------------+

使用 pandas 我可以轻松获取 EV_SEP 行的索引,将 table 分成块,从每个块中取出 EV_CODE 并创建一个 event_code 具有这样值的列。

一个可能的解决方案是:

有没有更好的办法解决这个问题?

创建一个新的 Hadoop InputFormat 将是一种计算效率更高的方法来实现您的目标(尽管在代码方面可以说是相同或更多的体操)。您可以使用 the Python API, but you must take care of conversion from the Java format to Python. You can then specify the format. The converters available in PySpark are relatively few but this reference proposes using the Avro converter as an example 中的 sc.hadoopFile 指定替代 Hadoop 输入格式。您可能还会发现让您的自定义 Hadoop 输入格式输出文本很方便,然后您在 Python 中额外解析这些文本以避免实现转换器的问题。

一旦你有了它,你将创建一个特殊的输入格式(在 Java 或使用 Hadoop API 的 Scala 中)来处理具有 EV_SEP 作为记录分隔符而不是换行符。您可以通过在累加器中读取行时收集行来非常简单地做到这一点(只需一个简单的 ArrayList 就可以作为概念验证),然后在找到两个 [=12] 时发出累积的记录列表=] 一行一行。

我要指出的是,使用 TextInputFormat 作为此类设计的基础可能很诱人,但输入格式会在换行符处任意拆分此类文件,您需要实施自定义逻辑才能正确支持拆分文件。或者,您可以通过不实施文件拆分来避免该问题。这是对分区程序的简单修改。

如果确实需要拆分文件,基本思路是:

  • 通过将文件平均分成几部分来选择分割偏移量
  • 寻找偏移量
  • 从偏移量到找到定界符序列的位置逐个字符地查找(在这种情况下,一行中的两行类型为 EV_SEP

针对文件拆分的边缘情况检测这些序列将是一个挑战。我建议建立行的最大字节宽度,并从起点向后读取适当宽度(基本上是行大小的 2 倍)的滑动 window 块,然后使用 windows 匹配预编译的 Java 正则表达式和匹配器。这与 Sequence Files find their sync marks 的方式类似,但使用正则表达式而不是严格相等来检测序列。

附带说明一下,考虑到您在此处提到的其他一些背景,我会担心按时间戳对 DataFrame 进行排序可能会改变不同文件中同一时间段内发生的事件的内容。

from pyspark.sql import functions as f

示例数据:

df.show()

+-----------------------+--------+-----------+
|timestamp              |event_id|event_value|
+-----------------------+--------+-----------+
|2017-01-01 00:00:12.540|EV_SEP  |null       |
|2017-01-01 00:00:14.201|EV_2    |10         |
|2017-01-01 00:00:13.331|EV_1    |11         |
|2017-01-01 00:00:15.203|EV_CODE |ABC        |
|2017-01-01 00:00:16.670|EV_SEP  |null       |
|2017-01-01 00:00:20.201|EV_2    |10         |
|2017-01-01 00:00:24.203|EV_CODE |DEF        |
|2017-01-01 00:00:31.670|EV_SEP  |null       |
+-----------------------+--------+-----------+

添加索引:

df_idx = df.filter(df['event_id'] == 'EV_SEP') \
    .withColumn('idx', f.row_number().over(Window.partitionBy().orderBy(df['timestamp'])))
df_block = df.filter(df['event_id'] != 'EV_SEP').withColumn('idx', f.lit(0))

'Spread' 索引:

df = df_idx.union(df_block).withColumn('idx', f.max('idx').over(
    Window.partitionBy().orderBy('timestamp').rowsBetween(Window.unboundedPreceding, Window.currentRow))).cache()

添加EV_CODE:

df_code = df.filter(df['event_id'] == 'EV_CODE').withColumnRenamed('event_value', 'event_code')
df = df.join(df_code, on=[df['idx'] == df_code['idx']]) \
    .select(df['timestamp'], df['event_id'], df['event_value'], df_code['event_code'])

最后:

+-----------------------+--------+-----------+----------+
|timestamp              |event_id|event_value|event_code|
+-----------------------+--------+-----------+----------+
|2017-01-01 00:00:12.540|EV_SEP  |null       |ABC       |
|2017-01-01 00:00:13.331|EV_1    |11         |ABC       |
|2017-01-01 00:00:14.201|EV_2    |10         |ABC       |
|2017-01-01 00:00:15.203|EV_CODE |ABC        |ABC       |
|2017-01-01 00:00:16.670|EV_SEP  |null       |DEF       |
|2017-01-01 00:00:20.201|EV_2    |10         |DEF       |
|2017-01-01 00:00:24.203|EV_CODE |DEF        |DEF       |
+-----------------------+--------+-----------+----------+