pyspark dataframe foreach填充列表

pyspark dataframe foreach to fill a list

我正在使用 Spark 1.6.1 和 Python 2.7,我有这个问题要解决:

我现在提出的解决方案是收集数据框 A,检查它,将 B 的行附加到列表,然后从该列表创建数据框 B。

有了这个解决方案,我显然失去了使用数据帧的所有好处,我想使用 foreach,但我找不到一种方法来完成这项工作。到目前为止我已经试过了:

有没有人有什么想法?

谢谢

--------------------编辑:

我尝试过的例子:

def f(row, list):
    if row.one:
        list += [Row(type='one', field='ok')]
    else:
        list += [Row(type='one', field='ok')]
        list += [Row(type='two', field='nok')]

list = []
dfA.foreach(lambda x : f(x, list))

正如我提到的,这什么都不做,它不执行函数

我也试过(在class开头定义的列表):

global list
def f(row):
    if row.one:
        list += [Row(type='one', field='ok')]
    else:
        list += [Row(type='one', field='ok')]
        list += [Row(type='two', field='nok')]

dfA.foreach(list)

------------编辑 2:

我现在正在做的是:

    list = []
    for row in dfA.collect():
        string = re.search(a_regex, row['raw'])
        if string:
            dates = re.findall(date_regex, string.group())
            for date in dates:
                date_string = datetime.strptime(date, '%Y-%m-%d').date()
                list += [Row(event_type='1', event_date=date_string)]

        b_string = re.search(b_regex, row['raw'])
        if b_string:
            dates = re.findall(date_regex, b_string.group())
            for date in dates:
                scheduled_to = datetime.strptime(date, '%Y-%m-%d').date()
                list += [Row(event_type='2', event_date= date_string)]

然后:

dfB = self._sql_context.createDataFrame(list)

dfA 由其他进程提供,我无法更改它,我知道这是使用数据帧的一种非常愚蠢的方式,但我对此无能为力

--------------------编辑 3: dfA.raw 样本:

{"new":[],"removed":[{"start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null}]}
{"new":[{"start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null}],"removed":[]}
{"new":[{"start":"2017-01-28","end":"2017-02-03"},{"start":"2017-02-04","end":"2017-02-10"},{"start":"2017-02-11","end":"2017-02-17"},{"start":"2017-02-18","end":"2017-02-24"},{"start":"2017-03-04","end":"2017-03-10"},{"start":"2017-03-11","end":"2017-03-17"},{"start":"2017-03-18","end":"2017-03-24"},{"start":"2017-09-02","end":"2017-09-08"},{"start":"2017-09-16","end":"2017-09-22"},{"start":"2017-09-23","end":"2017-09-29"},{"start":"2017-09-30","end":"2017-10-06"},{"start":"2017-10-07","end":"2017-10-13"},{"start":"2017-12-02","end":"2017-12-08"},{"start":"2017-12-09","end":"2017-12-15"},{"start":"2017-12-16","end":"2017-12-22"},{"start":"2017-12-23","end":"2017-12-29"},{"start":"2018-01-06","end":"2018-01-12"}],"removed":[{"start":"2017-02-04","end":"2017-02-10"},{"start":"2017-02-11","end":"2017-02-17"},{"start":"2017-02-18","end":"2017-02-24"},{"start":"2017-03-04","end":"2017-03-10"},{"start":"2017-03-11","end":"2017-03-17"},{"start":"2017-03-18","end":"2017-03-24"},{"start":"2017-01-28","end":"2017-02-03"},{"start":"2017-09-16","end":"2017-09-22"},{"start":"2017-09-02","end":"2017-09-08"},{"start":"2017-09-30","end":"2017-10-06"},{"start":"2017-10-07","end":"2017-10-13"},{"start":"2017-09-23","end":"2017-09-29"},{"start":"2017-12-16","end":"2017-12-22"},{"start":"2017-12-23","end":"2017-12-29"},{"start":"2018-01-06","end":"2018-01-12"},{"start":"2017-12-09","end":"2017-12-15"},{"start":"2017-12-02","end":"2017-12-08"},{"start":"2018-02-10","end":"2018-02-16"}]}|

和正则表达式:

a_regex = r'\"new\":{(.*?)}{2}|\"new\":\[(.*?)\]'
b_regex = r'\"removed\":{(.*?)}{2}|removed\":\[(.*?)\]'
date_regex = r'\"start\":\"(\d{4}-\d{2}-\d{2})\"'

dfA.select('raw').show(2,False)

+-------------------------------------------------------------------------------------------------------+
|raw                                                                                                    |
+-------------------------------------------------------------------------------------------------------+
|{"new":[{"start":"2018-03-24","end":"2018-03-30","scheduled_by_system":null}],"removed":[]}|
|{"new":[{"start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null}],"removed":[]}|
+-------------------------------------------------------------------------------------------------------+
only showing top 2 rows

df.select('raw').printSchema()

root
 |-- raw: string (nullable = true)

在选择所需的 raw 列后,您需要为 return event_typeevent_date 字符串编写一个 udf 函数。

import re
def searchUdf(regex, dateRegex, x):
    list_return = []
    string = re.search(regex, x)
    if string:
        dates = re.findall(dateRegex, string.group())
        for date in dates:
            date_string = datetime.strptime(date, '%Y-%m-%d').date()
            list_return.append(date_string)
    return list_return

from pyspark.sql import functions as F
udfFunctionCall = F.udf(searchUdf, T.ArrayType(T.DateType()))

udf 函数将使用作为参数传递的 regexdateRegex 解析原始列字符串,并且 return eventTypedata_string 作为 arrayType

您应该调用定义的 udf 函数并 filter 空行,然后 separate 列作为 event_typeevent_date

df = df.select("raw")
adf = df.select(F.lit(1).alias("event_type"), udfFunctionCall(F.lit(a_regex), F.lit(date_regex), df.raw).alias("event_date"))\
    .filter(F.size(F.col("event_date")) > 0)

bdf = df.select(F.lit(2).alias("event_type"), udfFunctionCall(F.lit(a_regex), F.lit(date_regex), df.raw).alias("event_date")) \
    .filter(F.size(F.col("event_date")) > 0)

问题中提供的正则表达式

a_regex = r'\"new\":{(.*?)}{2}|\"new\":\[(.*?)\]'
b_regex = r'\"removed\":{(.*?)}{2}|removed\":\[(.*?)\]'
date_regex = r'\"start\":\"(\d{4}-\d{2}-\d{2})\"'

现在您有两个 dataframes 用于两个 event_type,最后一步是 将它们合并 在一起

adf.unionAll(bdf)

就是这样。你的困惑都解决了。

具有以下原始列


|raw|

|{"new":[],"removed":[{"start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null}]}|
|{"new":[{"start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null}],"removed":[]}|
|{"new":[{"start":"2017-01-28","end":"2017-02-03"},{"start":"2017-02-04","end":"2017-02-10"},{"start":"2017-02-11","end":"2017-02-17"},{"start":"2017-02-18","end":"2017-02-24"},{"start":"2017-03-04","end":"2017-03-10"},{"start":"2017-03-11","end":"2017-03-17"},{"start":"2017-03-18","end":"2017-03-24"},{"start":"2017-09-02","end":"2017-09-08"},{"start":"2017-09-16","end":"2017-09-22"},{"start":"2017-09-23","end":"2017-09-29"},{"start":"2017-09-30","end":"2017-10-06"},{"start":"2017-10-07","end":"2017-10-13"},{"start":"2017-12-02","end":"2017-12-08"},{"start":"2017-12-09","end":"2017-12-15"},{"start":"2017-12-16","end":"2017-12-22"},{"start":"2017-12-23","end":"2017-12-29"},{"start":"2018-01-06","end":"2018-01-12"}],"removed":[{"start":"2017-02-04","end":"2017-02-10"},{"start":"2017-02-11","end":"2017-02-17"},{"start":"2017-02-18","end":"2017-02-24"},{"start":"2017-03-04","end":"2017-03-10"},{"start":"2017-03-11","end":"2017-03-17"},{"start":"2017-03-18","end":"2017-03-24"},{"start":"2017-01-28","end":"2017-02-03"},{"start":"2017-09-16","end":"2017-09-22"},{"start":"2017-09-02","end":"2017-09-08"},{"start":"2017-09-30","end":"2017-10-06"},{"start":"2017-10-07","end":"2017-10-13"},{"start":"2017-09-23","end":"2017-09-29"},{"start":"2017-12-16","end":"2017-12-22"},{"start":"2017-12-23","end":"2017-12-29"},{"start":"2018-01-06","end":"2018-01-12"},{"start":"2017-12-09","end":"2017-12-15"},{"start":"2017-12-02","end":"2017-12-08"},{"start":"2018-02-10","end":"2018-02-16"}]}|


你应该得到

+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|event_type|event_date                                                                                                                                                                                                  |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1         |[2018-03-10]                                                                                                                                                                                                |
|1         |[2017-01-28, 2017-02-04, 2017-02-11, 2017-02-18, 2017-03-04, 2017-03-11, 2017-03-18, 2017-09-02, 2017-09-16, 2017-09-23, 2017-09-30, 2017-10-07, 2017-12-02, 2017-12-09, 2017-12-16, 2017-12-23, 2018-01-06]|
|2         |[2018-03-10]                                                                                                                                                                                                |
|2         |[2017-01-28, 2017-02-04, 2017-02-11, 2017-02-18, 2017-03-04, 2017-03-11, 2017-03-18, 2017-09-02, 2017-09-16, 2017-09-23, 2017-09-30, 2017-10-07, 2017-12-02, 2017-12-09, 2017-12-16, 2017-12-23, 2018-01-06]|
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+