向数据框添加一列,其值基于另一个字段,但需要递增
Add a column to a dataframe whose value is based on another field, but needs to increment
我遇到了一个我一直在努力解决的问题,我不得不想象有一个比逐行循环遍历我的数据框更优雅的解决方案。我有一个如下所示的数据框:
EventTime | ConditionMet
--------- --------- | --------
2017-09-11 00:00:01 | 0
2017-09-11 00:00:02 | 0
2017-09-11 00:00:03 | 0
2017-09-11 00:00:04 | 1
2017-09-11 00:00:05 | 1
2017-09-11 00:00:06 | 1
2017-09-11 00:00:07 | 0
2017-09-11 00:00:08 | 0
2017-09-11 00:00:09 | 1
2017-09-11 00:00:10 | 1
2017-09-11 00:00:11 | 1
2017-09-11 00:00:12 | 1
2017-09-11 00:00:13 | 0
每次满足条件(ConditionMet=1
),我想用事件名称(例如 Event1)标记记录。我找不到使用 when
条件或窗口处理 .withColumn()
的优雅方法。理想的结果是:
EventTime |ConditionMet|EventName
----------------- - | ---------- | --------
2017-09-11 00:00:01 | 0 |
2017-09-11 00:00:02 | 0 |
2017-09-11 00:00:03 | 0 |
2017-09-11 00:00:04 | 1 | Event1
2017-09-11 00:00:05 | 1 | Event1
2017-09-11 00:00:06 | 1 | Event1
2017-09-11 00:00:07 | 0 |
2017-09-11 00:00:08 | 0 |
2017-09-11 00:00:09 | 1 | Event2
2017-09-11 00:00:10 | 1 | Event2
2017-09-11 00:00:11 | 1 | Event2
2017-09-11 00:00:12 | 1 | Event2
2017-09-11 00:00:13 | 0 |
对这里的任何巧妙方法感兴趣。
from pyspark.sql import functions as F
df.withColumn('EventName', F.when(df["ConditionMet"]>0.0, "Event1").otherwise("")
我相信我找到了解决问题的方法,尽管我不知道它有多优雅。本质上,我通过前一条记录上的时间戳将数据集连接回自身(我认为 lead() 函数可以帮助我解决这个问题。从那里,我可以传入当前条件和下一个条件以确定更改(那就是我增加 EventName 创建中使用的全局计数器的时间。下面的代码。
from pyspark import *
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import lit, col, expr, when
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, array
cnt=0
def tagTrip(t):
global cnt
if t[0]==1:
if t[1]==0:
cnt=cnt+1
return "Event" + str(cnt)
else:
return ""
tagTripUdf = udf(lambda arr: tagTrip(arr), StringType())
# I don't include the code that shows how I got to this df and how I can join it onto itself
dfJoined = df.join(dfNext, df['time'] == dfNext['timeprev'], 'inner')
dfNew=dfJoined.withColumn('EventName',tagTripUdf(array(col('ConditionMet'),col('ConditionMetNext'))))
我想如果这个想法是用一个唯一的标签来标记每个连续的集群,那么你可以通过计算累积和来做到这一点
- 排序 df,在
EventTime
例如
- 反转
ConditionMet
- 计算倒列的累计和
忽略ConditionMet
=0的累计和,ConditionMet
=1
的累计和作为簇的标签
+-------------------+------------+-------+-------------+---------+
| EventTime|ConditionMet|InvCond|CumulativeSum|EventName|
+-------------------+------------+-------+-------------+---------+
|2017-09-11 00:00:01| 0| 1| 1| |
|2017-09-11 00:00:02| 0| 1| 2| |
|2017-09-11 00:00:03| 0| 1| 3| |
|2017-09-11 00:00:04| 1| 0| 3| Event3|
|2017-09-11 00:00:05| 1| 0| 3| Event3|
|2017-09-11 00:00:06| 1| 0| 3| Event3|
|2017-09-11 00:00:07| 0| 1| 4| |
|2017-09-11 00:00:08| 0| 1| 5| |
|2017-09-11 00:00:09| 1| 0| 5| Event5|
|2017-09-11 00:00:10| 1| 0| 5| Event5|
|2017-09-11 00:00:11| 1| 0| 5| Event5|
|2017-09-11 00:00:12| 1| 0| 5| Event5|
|2017-09-11 00:00:13| 0| 1| 6| |
+-------------------+------------+-------+-------------+---------+
代码
from pyspark.sql.functions import lag, udf, col
from pyspark.sql import Row
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.window import Window
def tagSequentialClusters(df, condColumn, tagColumnName):
## Invert condition 0 will be 1 and 1 will be 0
def InvCond(value):
if value == '1':
return 0
else:
return 1
## Add Event for the valid clusters
def mapEventNumber(cond, number):
if cond == "1":
return "Event" + str(number)
else:
return ""
## Add new columns to row
def addRowColumn(row, **kwargs):
rowData = row.asDict()
for column in kwargs:
rowData[column] = kwargs[column]
return Row(**rowData)
## Calculate partial cumulative sum for partition iterator
def calcPartialCumulativeSum(iter):
counter = 0
final_iterator = []
for row in iter:
counter = counter + row["InvCond"]
newRow = addRowColumn(row, PartialSum=counter)
final_iterator.append(newRow)
return final_iterator
## get tail of partiton with index
def getTailWithIndex(index, iter):
tailRow = None
for row in iter:
tailRow = row
return (index, tailRow["PartialSum"])
## Calculate sum map for each partition
def calcSumMap(collectedMap):
final_iterator = {}
for index, value in enumerate(collectedMap):
newVal = value
for i in range(0, index):
newVal += collectedMap[i]
final_iterator[index] = newVal
return final_iterator
## Calculate global cumulative sum
def calcCumulativeSum(index, iter):
final_iterator = []
for row in iter:
newVal = row["PartialSum"] + sumMap.value[index]
final_iterator.append(addRowColumn(row, EventNumber=newVal))
return final_iterator
## Register udf functions
invCondUdf = udf(InvCond, IntegerType())
mapEventNumberUdf = udf(mapEventNumber, StringType())
## Invert ConditionMet column
rdd = df.withColumn("InvCond", invCondUdf(col(condColumn))).rdd
## Calculate partial cumulative sum over each partition
rdd = rdd.mapPartitions(lambda iter: calcPartialCumulativeSum(iter)).cache()
## Calculate max sum value for each partition
collctedMap = rdd.mapPartitionsWithIndex(getTailWithIndex).collect()
sumMap = spark.sparkContext.broadcast(calcSumMap(collctedMap))
## Calculate global cumulative sum
df = rdd.mapPartitionsWithIndex(calcCumulativeSum).toDF()
## Append `Event` before each cluster number and ignore the rest
df = df.withColumn(tagColumnName, mapEventNumberUdf(col(condColumn), col("EventNumber")))
return df.drop(col("EventNumber")).drop(col("InvCond")).drop(col("PartialSum"))
## Read data
df = spark.read.csv("/home/eslam-elbanna/data.csv", header=True)
## Tag sequnetial clusters
df = tagSequentialClusters(df, "ConditionMet", "EventName")
df.show()
我遇到了一个我一直在努力解决的问题,我不得不想象有一个比逐行循环遍历我的数据框更优雅的解决方案。我有一个如下所示的数据框:
EventTime | ConditionMet
--------- --------- | --------
2017-09-11 00:00:01 | 0
2017-09-11 00:00:02 | 0
2017-09-11 00:00:03 | 0
2017-09-11 00:00:04 | 1
2017-09-11 00:00:05 | 1
2017-09-11 00:00:06 | 1
2017-09-11 00:00:07 | 0
2017-09-11 00:00:08 | 0
2017-09-11 00:00:09 | 1
2017-09-11 00:00:10 | 1
2017-09-11 00:00:11 | 1
2017-09-11 00:00:12 | 1
2017-09-11 00:00:13 | 0
每次满足条件(ConditionMet=1
),我想用事件名称(例如 Event1)标记记录。我找不到使用 when
条件或窗口处理 .withColumn()
的优雅方法。理想的结果是:
EventTime |ConditionMet|EventName
----------------- - | ---------- | --------
2017-09-11 00:00:01 | 0 |
2017-09-11 00:00:02 | 0 |
2017-09-11 00:00:03 | 0 |
2017-09-11 00:00:04 | 1 | Event1
2017-09-11 00:00:05 | 1 | Event1
2017-09-11 00:00:06 | 1 | Event1
2017-09-11 00:00:07 | 0 |
2017-09-11 00:00:08 | 0 |
2017-09-11 00:00:09 | 1 | Event2
2017-09-11 00:00:10 | 1 | Event2
2017-09-11 00:00:11 | 1 | Event2
2017-09-11 00:00:12 | 1 | Event2
2017-09-11 00:00:13 | 0 |
对这里的任何巧妙方法感兴趣。
from pyspark.sql import functions as F
df.withColumn('EventName', F.when(df["ConditionMet"]>0.0, "Event1").otherwise("")
我相信我找到了解决问题的方法,尽管我不知道它有多优雅。本质上,我通过前一条记录上的时间戳将数据集连接回自身(我认为 lead() 函数可以帮助我解决这个问题。从那里,我可以传入当前条件和下一个条件以确定更改(那就是我增加 EventName 创建中使用的全局计数器的时间。下面的代码。
from pyspark import *
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import lit, col, expr, when
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, array
cnt=0
def tagTrip(t):
global cnt
if t[0]==1:
if t[1]==0:
cnt=cnt+1
return "Event" + str(cnt)
else:
return ""
tagTripUdf = udf(lambda arr: tagTrip(arr), StringType())
# I don't include the code that shows how I got to this df and how I can join it onto itself
dfJoined = df.join(dfNext, df['time'] == dfNext['timeprev'], 'inner')
dfNew=dfJoined.withColumn('EventName',tagTripUdf(array(col('ConditionMet'),col('ConditionMetNext'))))
我想如果这个想法是用一个唯一的标签来标记每个连续的集群,那么你可以通过计算累积和来做到这一点
- 排序 df,在
EventTime
例如 - 反转
ConditionMet
- 计算倒列的累计和
忽略
的累计和作为簇的标签ConditionMet
=0的累计和,ConditionMet
=1+-------------------+------------+-------+-------------+---------+ | EventTime|ConditionMet|InvCond|CumulativeSum|EventName| +-------------------+------------+-------+-------------+---------+ |2017-09-11 00:00:01| 0| 1| 1| | |2017-09-11 00:00:02| 0| 1| 2| | |2017-09-11 00:00:03| 0| 1| 3| | |2017-09-11 00:00:04| 1| 0| 3| Event3| |2017-09-11 00:00:05| 1| 0| 3| Event3| |2017-09-11 00:00:06| 1| 0| 3| Event3| |2017-09-11 00:00:07| 0| 1| 4| | |2017-09-11 00:00:08| 0| 1| 5| | |2017-09-11 00:00:09| 1| 0| 5| Event5| |2017-09-11 00:00:10| 1| 0| 5| Event5| |2017-09-11 00:00:11| 1| 0| 5| Event5| |2017-09-11 00:00:12| 1| 0| 5| Event5| |2017-09-11 00:00:13| 0| 1| 6| | +-------------------+------------+-------+-------------+---------+
代码
from pyspark.sql.functions import lag, udf, col
from pyspark.sql import Row
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.window import Window
def tagSequentialClusters(df, condColumn, tagColumnName):
## Invert condition 0 will be 1 and 1 will be 0
def InvCond(value):
if value == '1':
return 0
else:
return 1
## Add Event for the valid clusters
def mapEventNumber(cond, number):
if cond == "1":
return "Event" + str(number)
else:
return ""
## Add new columns to row
def addRowColumn(row, **kwargs):
rowData = row.asDict()
for column in kwargs:
rowData[column] = kwargs[column]
return Row(**rowData)
## Calculate partial cumulative sum for partition iterator
def calcPartialCumulativeSum(iter):
counter = 0
final_iterator = []
for row in iter:
counter = counter + row["InvCond"]
newRow = addRowColumn(row, PartialSum=counter)
final_iterator.append(newRow)
return final_iterator
## get tail of partiton with index
def getTailWithIndex(index, iter):
tailRow = None
for row in iter:
tailRow = row
return (index, tailRow["PartialSum"])
## Calculate sum map for each partition
def calcSumMap(collectedMap):
final_iterator = {}
for index, value in enumerate(collectedMap):
newVal = value
for i in range(0, index):
newVal += collectedMap[i]
final_iterator[index] = newVal
return final_iterator
## Calculate global cumulative sum
def calcCumulativeSum(index, iter):
final_iterator = []
for row in iter:
newVal = row["PartialSum"] + sumMap.value[index]
final_iterator.append(addRowColumn(row, EventNumber=newVal))
return final_iterator
## Register udf functions
invCondUdf = udf(InvCond, IntegerType())
mapEventNumberUdf = udf(mapEventNumber, StringType())
## Invert ConditionMet column
rdd = df.withColumn("InvCond", invCondUdf(col(condColumn))).rdd
## Calculate partial cumulative sum over each partition
rdd = rdd.mapPartitions(lambda iter: calcPartialCumulativeSum(iter)).cache()
## Calculate max sum value for each partition
collctedMap = rdd.mapPartitionsWithIndex(getTailWithIndex).collect()
sumMap = spark.sparkContext.broadcast(calcSumMap(collctedMap))
## Calculate global cumulative sum
df = rdd.mapPartitionsWithIndex(calcCumulativeSum).toDF()
## Append `Event` before each cluster number and ignore the rest
df = df.withColumn(tagColumnName, mapEventNumberUdf(col(condColumn), col("EventNumber")))
return df.drop(col("EventNumber")).drop(col("InvCond")).drop(col("PartialSum"))
## Read data
df = spark.read.csv("/home/eslam-elbanna/data.csv", header=True)
## Tag sequnetial clusters
df = tagSequentialClusters(df, "ConditionMet", "EventName")
df.show()