pyspark 将负值替换为零
pyspark replace negative value to zero
我可能想寻求有关将时间戳之间不同的负值替换为零的帮助。 运行 在 python3 上火花。这是我的代码:
代码:
timeFmt = "yyyy-MM-dd HH:mm:ss"
time_diff_1 = when((col("time1").isNotNull()) &
(col("time2").isNotNull()),
(unix_timestamp('time2', format=timeFmt) - unix_timestamp('time1', format=timeFmt)) / 60
).otherwise(lit(0))
time_diff_2 = when((col("time2").isNotNull()) &
(col("time3").isNotNull()),
(unix_timestamp('time3', format=timeFmt) - unix_timestamp('time2', format=timeFmt)) / 60
).otherwise(lit(0))
time_diff_3 = when((col("time3").isNotNull()) &
(col("time4").isNotNull()),
(unix_timestamp('time4', format=timeFmt) - unix_timestamp('time3', format=timeFmt)) / 60
).otherwise(lit(0))
df = (df
.withColumn('time_diff_1', time_diff_1)
.withColumn('time_diff_2', time_diff_2)
.withColumn('time_diff_3', time_diff_3)
)
df = (df
.withColumn('time_diff_1', when(col('time_diff_1') < 0, 0).otherwise(col('time_diff_1')))
.withColumn('time_diff_2', when(col('time_diff_2') < 0, 0).otherwise(col('time_diff_2')))
.withColumn('time_diff_3', when(col('time_diff_3') < 0, 0).otherwise(col('time_diff_3')))
)
当我运行上面的代码时,我得到了错误。
这是错误:
Py4JJavaError: An error occurred while calling o1083.showString. :
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 56.0 failed 4 times, most recent failure: Lost task
0.3 in stage 56.0 (TID 7246, fxhclxcdh8.dftz.local, executor 21): org.codehaus.janino.JaninoRuntimeException: failed to compile:
org.codehaus.janino.JaninoRuntimeException: Code of method
"apply_9$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V"
of class
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
grows beyond 64 KB /* 001 / public java.lang.Object generate(Object[]
references) { / 002 / return new
SpecificUnsafeProjection(references); / 003 / } / 004 / / 005 /
class SpecificUnsafeProjection extends
org.apache.spark.sql.catalyst.expressions.UnsafeProjection { / 006 /
/ 007 / private Object[] references; / 008 / private boolean
evalExprIsNull; / 009 / private boolean evalExprValue; / 010 /
private boolean evalExpr1IsNull; / 011 / private boolean
evalExpr1Value; / 012 / private java.text.DateFormat formatter5;
/ 013 / private java.text.DateFormat formatter8; / 014 /
private java.text.DateFormat formatter12; / 015 / private
java.text.DateFormat formatter13; / 016 / private
UTF8String.IntWrapper wrapper; / 017 / private
java.text.DateFormat formatter15; / 018 / private
java.text.DateFormat formatter18; / 019 / private
java.text.DateFormat formatter19; / 020 / private
java.text.DateFormat formatter23; / 021 / private
java.text.DateFormat formatter26; / 022 / private
java.text.DateFormat formatter27; / 023 / private
java.text.DateFormat formatter30; / 024 */ private
java.text.DateFormat formatter32; ........
有人可以帮忙吗?
我认为更简单的方法是编写一个简单的 UDF(用户定义函数)并将其应用于您想要的列。这是执行此操作的示例代码:
import pyspark.sql.functions as f
correctNegativeDiff = f.udf(lambda diff: 0.0 if diff < 0.0 else diff, DoubleType())
df = df.withColumn('time_diff_1', correctNegativeDiff(df.time_diff_1))\
.withColumn('time_diff_2', correctNegativeDiff(df.time_diff_2))\
.withColumn('time_diff_3', correctNegativeDiff(df.time_diff_3))
我可能想寻求有关将时间戳之间不同的负值替换为零的帮助。 运行 在 python3 上火花。这是我的代码:
代码:
timeFmt = "yyyy-MM-dd HH:mm:ss"
time_diff_1 = when((col("time1").isNotNull()) &
(col("time2").isNotNull()),
(unix_timestamp('time2', format=timeFmt) - unix_timestamp('time1', format=timeFmt)) / 60
).otherwise(lit(0))
time_diff_2 = when((col("time2").isNotNull()) &
(col("time3").isNotNull()),
(unix_timestamp('time3', format=timeFmt) - unix_timestamp('time2', format=timeFmt)) / 60
).otherwise(lit(0))
time_diff_3 = when((col("time3").isNotNull()) &
(col("time4").isNotNull()),
(unix_timestamp('time4', format=timeFmt) - unix_timestamp('time3', format=timeFmt)) / 60
).otherwise(lit(0))
df = (df
.withColumn('time_diff_1', time_diff_1)
.withColumn('time_diff_2', time_diff_2)
.withColumn('time_diff_3', time_diff_3)
)
df = (df
.withColumn('time_diff_1', when(col('time_diff_1') < 0, 0).otherwise(col('time_diff_1')))
.withColumn('time_diff_2', when(col('time_diff_2') < 0, 0).otherwise(col('time_diff_2')))
.withColumn('time_diff_3', when(col('time_diff_3') < 0, 0).otherwise(col('time_diff_3')))
)
当我运行上面的代码时,我得到了错误。 这是错误:
Py4JJavaError: An error occurred while calling o1083.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 56.0 failed 4 times, most recent failure: Lost task 0.3 in stage 56.0 (TID 7246, fxhclxcdh8.dftz.local, executor 21): org.codehaus.janino.JaninoRuntimeException: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "apply_9$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB /* 001 / public java.lang.Object generate(Object[] references) { / 002 / return new SpecificUnsafeProjection(references); / 003 / } / 004 / / 005 / class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection { / 006 / / 007 / private Object[] references; / 008 / private boolean evalExprIsNull; / 009 / private boolean evalExprValue; / 010 /
private boolean evalExpr1IsNull; / 011 / private boolean evalExpr1Value; / 012 / private java.text.DateFormat formatter5; / 013 / private java.text.DateFormat formatter8; / 014 /
private java.text.DateFormat formatter12; / 015 / private java.text.DateFormat formatter13; / 016 / private UTF8String.IntWrapper wrapper; / 017 / private java.text.DateFormat formatter15; / 018 / private java.text.DateFormat formatter18; / 019 / private java.text.DateFormat formatter19; / 020 / private java.text.DateFormat formatter23; / 021 / private java.text.DateFormat formatter26; / 022 / private java.text.DateFormat formatter27; / 023 / private java.text.DateFormat formatter30; / 024 */ private java.text.DateFormat formatter32; ........
有人可以帮忙吗?
我认为更简单的方法是编写一个简单的 UDF(用户定义函数)并将其应用于您想要的列。这是执行此操作的示例代码:
import pyspark.sql.functions as f
correctNegativeDiff = f.udf(lambda diff: 0.0 if diff < 0.0 else diff, DoubleType())
df = df.withColumn('time_diff_1', correctNegativeDiff(df.time_diff_1))\
.withColumn('time_diff_2', correctNegativeDiff(df.time_diff_2))\
.withColumn('time_diff_3', correctNegativeDiff(df.time_diff_3))