Pyspark - Dataframe 深度优先搜索

Pyspark - Depth-First Search on Dataframe

我有以下 pyspark 应用程序,它从 child/parent 进程 ID 的 csv 生成 child/parent 进程序列。将问题视为一棵树,我使用迭代深度优先搜索从叶节点(一个没有子进程的进程)开始并遍历我的文件以创建这些闭包,其中进程 1 是进程 2 的父级,进程 2 是过程 3 的父级等等。

换句话说,给定如下所示的 csv,是否可以使用 pyspark 数据帧和适当的 pyspark-isms 实现深度优先搜索(迭代或递归)以生成所述闭包而无需使用 .collect () 函数(这是难以置信的昂贵)?

from pyspark.sql.functions import monotonically_increasing_id
import copy
from pyspark.sql import SQLContext
from pyspark import SparkContext

class Test():
    def __init__(self):
        self.process_list = []

def main():

    test = Test()
    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc) 
    df = sc.textFile("<path to csv>") 
    df = df.map(lambda line: line.split(","))
    header = df.first()
    data = df.filter(lambda row: row != header)
    data = data.toDF(header)

    data.createOrReplaceTempView("flat")
    data = sqlContext.sql("select doc_process_pid, doc_parent_pid from flat 
                           where doc_parent_pid is not null AND 
                           doc_process_pid is not null")
    data = data.select(monotonically_increasing_id().alias("rowId"), "*")

    data.createOrReplaceTempView("data")
    leaf_df = sqlContext.sql("select doc_process_pid, doc_parent_pid from data 
                                 where doc_parent_pid != -1 AND 
                                 doc_process_pid == -1")
    leaf_df = leaf_df.rdd.collect()
    data = sqlContext.sql("select doc_process_pid, doc_parent_pid from data 
                             where doc_process_pid != -1")
    data.createOrReplaceTempView("data")

    for row in leaf_df:
        path = []
        rowID = row[0]  
        data = data.filter(data['rowId'] != rowID)
        parentID = row[4]
        path.append(parentID)
        while (True):
            next_df = sqlContext.sql(
                "select doc_process_pid, doc_parent_pid from data where 
                 doc_process_pid == " + str(parentID))

            next_df_rdd = next_df.collect()
            print("parent: ", next_df_rdd[0][1])
            parentID = next_df_rdd[0][1]

            if (int(parentID) != -1):
                path.append(next_df_rdd[0][1])
            else:
                test.process_list.append(copy.deepcopy(path))
                break

        print("final: ", test.process_list)


main()

这是我的 csv:

doc_process_pid doc_parent_pid
   1             -1
   2              1
   6             -1
   7              6
   8              7
   9              8
   21            -1
   22            21
   24            -1
   25            24
   26            25
   27            26
   28            27
   29            28
   99             6
   107           99
   108           -1
   109          108
   222          109
   1000           7
   1001        1000
    -1            9
    -1           22
    -1           29
    -1          107
    -1         1001
    -1          222
    -1            2

表示child/parent个进程关系。如果我们将其视为一棵树,则叶节点由 doc_process_id == -1 定义,根节点是进程,其中 doc_parent_process == -1。

上面的代码生成了两个数据帧:

叶节点:

+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
|             -1|             9|
|             -1|            22|
|             -1|            29|
|             -1|           107|
|             -1|          1001|
|             -1|           222|
|             -1|             2|
+---------------+--------------+

其余 child/parent 个没有叶节点的进程:

+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
|              1|            -1|
|              2|             1|
|              6|            -1|
|              7|             6|
|              8|             7|
|              9|             8|
|             21|            -1|
|             22|            21|
|             24|            -1|
|             25|            24|
|             26|            25|
|             27|            26|
|             28|            27|
|             29|            28|
|             99|             6|
|            107|            99|
|            108|            -1|
|            109|           108|
|            222|           109|
|           1000|             7|
+---------------+--------------+

输出将是:

[[1, 2], 
 [6, 99, 107], 
 [6, 99, 7, 1000, 1001], 
 [6, 7, 1000, 8, 9], 
 [21, 22], 
 [24, 25, 26, 27, 28, 29], 
 [108, 109, 222]])

想法?虽然这有点具体,但我想强调执行深度优先搜索以生成以这种 DataFrame 格式表示的序列闭包的一般性问题。

在此先感谢您的帮助!

我不认为 pyspark 是执行此操作的最佳语言。

一种解决方案是遍历树节点级别,每次都将数据框与自身连接起来。

让我们创建我们的数据框,不需要将其拆分为叶节点和其他节点,我们只保留原始数据框:

data = spark.createDataFrame(
    sc.parallelize(
        [[1, -1], [2,  1], [6, -1], [7,  6], [8,  7], [9,  8], [21,-1], [22,21], [24,-1], [25,24], [26,25], [27,26], [28,27], 
         [29,28], [99, 6], [107,99], [108,-1], [109,108], [222,109], [1000,7], [1001,1000], [ -1,9], [ -1,22], [ -1,29], 
         [ -1,107], [ -1, 1001], [ -1,222], [ -1,2]]
    ), 
    ["doc_process_pid", "doc_parent_pid"]
)

我们现在将从这棵树创建两个数据框,一个是我们的建筑基础,另一个是我们的建筑砖:

df1 = data.filter("doc_parent_pid = -1").select(data.doc_process_pid.alias("node"))
df2 = data.select(data.doc_process_pid.alias("son"), data.doc_parent_pid.alias("node")).filter("node != -1")

让我们为构造步骤 i 定义一个函数:

def add_node(df, i):
    return df.filter("node != -1").join(df2, "node", "inner").withColumnRenamed("node", "node" + str(i)).withColumnRenamed("son", "node")

让我们定义初始状态:

from pyspark.sql.types import *
df = df1
i = 0
df_end = spark.createDataFrame(
    sc.emptyRDD(), 
    StructType([StructField("branch", ArrayType(LongType()), True)])
)

当一个分支完全构建后,我们将其从 df 中取出并放入 df_end:

import pyspark.sql.functions as psf
while df.count() > 0:
    i = i + 1
    df = add_node(df, i)
    df_end = df.filter("node = -1").drop('node').select(psf.array(*[c for c in reversed(df.columns) if c != "node"]).alias("branch")).unionAll(
        df_end
    )
    df = df.filter("node != -1")

最后,df 为空,我们有

df_end.show(truncate=False)
    +------------------------+
    |branch                  |
    +------------------------+
    |[24, 25, 26, 27, 28, 29]|
    |[6, 7, 8, 9]            |
    |[6, 7, 1000, 1001]      |
    |[108, 109, 222]         |
    |[6, 99, 107]            |
    |[21, 22]                |
    |[1, 2]                  |
    +------------------------+

此算法的最坏情况是连接数与最大分支长度一样多。