向数据框添加一列,其值基于另一个字段,但需要递增

Add a column to a dataframe whose value is based on another field, but needs to increment

我遇到了一个我一直在努力解决的问题,我不得不想象有一个比逐行循环遍历我的数据框更优雅的解决方案。我有一个如下所示的数据框:

          EventTime | ConditionMet

--------- --------- | --------
2017-09-11 00:00:01 | 0
2017-09-11 00:00:02 | 0
2017-09-11 00:00:03 | 0
2017-09-11 00:00:04 | 1
2017-09-11 00:00:05 | 1
2017-09-11 00:00:06 | 1
2017-09-11 00:00:07 | 0
2017-09-11 00:00:08 | 0
2017-09-11 00:00:09 | 1
2017-09-11 00:00:10 | 1
2017-09-11 00:00:11 | 1
2017-09-11 00:00:12 | 1
2017-09-11 00:00:13 | 0

每次满足条件(ConditionMet=1),我想用事件名称(例如 Event1)标记记录。我找不到使用 when 条件或窗口处理 .withColumn() 的优雅方法。理想的结果是:

        EventTime   |ConditionMet|EventName
----------------- - | ---------- | --------
2017-09-11 00:00:01 |       0    |     
2017-09-11 00:00:02 |       0    |
2017-09-11 00:00:03 |       0    |
2017-09-11 00:00:04 |       1    | Event1
2017-09-11 00:00:05 |       1    | Event1
2017-09-11 00:00:06 |       1    | Event1
2017-09-11 00:00:07 |       0    |
2017-09-11 00:00:08 |       0    |
2017-09-11 00:00:09 |       1    | Event2
2017-09-11 00:00:10 |       1    | Event2
2017-09-11 00:00:11 |       1    | Event2
2017-09-11 00:00:12 |       1    | Event2
2017-09-11 00:00:13 |       0    | 

对这里的任何巧妙方法感兴趣。

from pyspark.sql import functions as F

df.withColumn('EventName', F.when(df["ConditionMet"]>0.0, "Event1").otherwise("")

我相信我找到了解决问题的方法,尽管我不知道它有多优雅。本质上,我通过前一条记录上的时间戳将数据集连接回自身(我认为 lead() 函数可以帮助我解决这个问题。从那里,我可以传入当前条件和下一个条件以确定更改(那就是我增加 EventName 创建中使用的全局计数器的时间。下面的代码。

from pyspark import *
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import lit, col, expr, when
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, array

cnt=0

def tagTrip(t):
     global cnt
     if t[0]==1:
         if t[1]==0:
             cnt=cnt+1
         return "Event" + str(cnt)
     else:
        return ""

tagTripUdf = udf(lambda arr: tagTrip(arr), StringType())

# I don't include the code that shows how I got to this df and how I can join it onto itself
dfJoined = df.join(dfNext, df['time'] == dfNext['timeprev'], 'inner')

dfNew=dfJoined.withColumn('EventName',tagTripUdf(array(col('ConditionMet'),col('ConditionMetNext'))))

我想如果这个想法是用一个唯一的标签来标记每个连续的集群,那么你可以通过计算累积和来做到这一点

  1. 排序 df,在 EventTime 例如
  2. 反转ConditionMet
  3. 计算倒列的累计和
  4. 忽略ConditionMet=0的累计和,ConditionMet=1

    的累计和作为簇的标签
    +-------------------+------------+-------+-------------+---------+
    |          EventTime|ConditionMet|InvCond|CumulativeSum|EventName|
    +-------------------+------------+-------+-------------+---------+
    |2017-09-11 00:00:01|           0|      1|            1|         |
    |2017-09-11 00:00:02|           0|      1|            2|         |
    |2017-09-11 00:00:03|           0|      1|            3|         |
    |2017-09-11 00:00:04|           1|      0|            3|   Event3|
    |2017-09-11 00:00:05|           1|      0|            3|   Event3|
    |2017-09-11 00:00:06|           1|      0|            3|   Event3|
    |2017-09-11 00:00:07|           0|      1|            4|         |
    |2017-09-11 00:00:08|           0|      1|            5|         |
    |2017-09-11 00:00:09|           1|      0|            5|   Event5|
    |2017-09-11 00:00:10|           1|      0|            5|   Event5|
    |2017-09-11 00:00:11|           1|      0|            5|   Event5|
    |2017-09-11 00:00:12|           1|      0|            5|   Event5|
    |2017-09-11 00:00:13|           0|      1|            6|         |
    +-------------------+------------+-------+-------------+---------+
    

代码

from pyspark.sql.functions import lag, udf, col
from pyspark.sql import Row
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.window import Window

def tagSequentialClusters(df, condColumn, tagColumnName):
    ## Invert condition 0 will be 1 and 1 will be 0
    def InvCond(value):
        if value == '1':
            return 0
        else:
            return 1

    ## Add Event for the valid clusters
    def mapEventNumber(cond, number):
        if cond == "1":
            return "Event" + str(number)
        else:
            return ""

    ## Add new columns to row
    def addRowColumn(row, **kwargs):
        rowData = row.asDict()
        for column in kwargs:
            rowData[column] = kwargs[column]
        return Row(**rowData)

    ## Calculate partial cumulative sum for partition iterator
    def calcPartialCumulativeSum(iter):
        counter = 0     
        final_iterator = []
        for row in iter:
            counter = counter + row["InvCond"]
            newRow = addRowColumn(row, PartialSum=counter)
            final_iterator.append(newRow)
        return final_iterator

    ## get tail of partiton with index
    def getTailWithIndex(index, iter): 
        tailRow = None
        for row in iter:
            tailRow = row
        return (index, tailRow["PartialSum"])

    ## Calculate sum map for each partition
    def calcSumMap(collectedMap):
        final_iterator = {}
        for index, value in enumerate(collectedMap):
            newVal = value
            for i in range(0, index):
                newVal += collectedMap[i]
            final_iterator[index] = newVal
        return final_iterator

    ## Calculate global cumulative sum
    def calcCumulativeSum(index, iter):
        final_iterator = []
        for row in iter:
            newVal = row["PartialSum"] + sumMap.value[index]
            final_iterator.append(addRowColumn(row, EventNumber=newVal))
        return final_iterator

    ## Register udf functions
    invCondUdf = udf(InvCond, IntegerType())
    mapEventNumberUdf = udf(mapEventNumber, StringType())

    ## Invert ConditionMet column
    rdd = df.withColumn("InvCond", invCondUdf(col(condColumn))).rdd

    ## Calculate partial cumulative sum over each partition
    rdd = rdd.mapPartitions(lambda iter: calcPartialCumulativeSum(iter)).cache()

    ## Calculate max sum value for each partition
    collctedMap = rdd.mapPartitionsWithIndex(getTailWithIndex).collect()
    sumMap = spark.sparkContext.broadcast(calcSumMap(collctedMap))

    ## Calculate global cumulative sum 
    df = rdd.mapPartitionsWithIndex(calcCumulativeSum).toDF()

    ## Append `Event` before each cluster number and ignore the rest
    df = df.withColumn(tagColumnName, mapEventNumberUdf(col(condColumn), col("EventNumber")))

    return df.drop(col("EventNumber")).drop(col("InvCond")).drop(col("PartialSum"))

## Read data
df = spark.read.csv("/home/eslam-elbanna/data.csv", header=True)    

## Tag sequnetial clusters
df = tagSequentialClusters(df, "ConditionMet", "EventName")
df.show()