如何替换 PySpark DataFrame 中的无穷大

How to replace infinity in PySpark DataFrame

似乎不​​支持替换无穷大值。我尝试了下面的代码,但它不起作用。还是我遗漏了什么?

a=sqlContext.createDataFrame([(None, None), (1, np.inf), (None, 2)])
a.replace(np.inf, 10)

或者我是否必须走痛苦的路线:将 PySpark DataFrame 转换为 pandas DataFrame,替换无穷大值,然后将其转换回 PySpark DataFrame

It seems like there is no support for replacing infinity values.

实际上它看起来像是一个 Py4J 错误而不是 replace 本身的问题。参见 Support nan/inf between Python and Java

作为解决方法,您可以尝试 UDF(慢速选项):

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, lit, udf, when

df = sc.parallelize([(None, None), (1.0, np.inf), (None, 2.0)]).toDF(["x", "y"])

replace_infs_udf = udf(
    lambda x, v: float(v) if x and np.isinf(x) else x, DoubleType()
)

df.withColumn("x1", replace_infs_udf(col("y"), lit(-99.0))).show()

## +----+--------+-----+
## |   x|       y|   x1|
## +----+--------+-----+
## |null|    null| null|
## | 1.0|Infinity|-99.0|
## |null|     2.0|  2.0|
## +----+--------+-----+

或者像这样的表达式:

def replace_infs(c, v):
    is_infinite = c.isin([
        lit("+Infinity").cast("double"),
        lit("-Infinity").cast("double")
    ])
    return when(c.isNotNull() & is_infinite, v).otherwise(c)

df.withColumn("x1", replace_infs(col("y"), lit(-99))).show()

## +----+--------+-----+
## |   x|       y|   x1|
## +----+--------+-----+
## |null|    null| null|
## | 1.0|Infinity|-99.0|
## |null|     2.0|  2.0|
## +----+--------+-----+