Python 中的 Spark 数据帧 - 使用 UDF 时执行卡住

Spark dataframe in Python - execution stuck when using UDFs

我有一个用 Python 编写的 spark 作业,它使用 DataBricks CSV reader.

从 CSV 文件中读取数据

我想通过应用实际上也在更改浮点分隔符的 udf 函数将某些列从字符串转换为双精度。

convert_udf = F.udf(
    lambda decimal_str: _to_float(decimal_separator, decimal_str), 
    returnType=FloatType())

for name in columns:
     df = df.withColumn(name, convert_udf(df[name]))

def _to_float(decimal_separator, decimal_str):
    if isinstance(decimal_str, str) or isinstance(decimal_str, unicode):
        return (None if len(decimal_str.strip()) == 0 
               else float(decimal_str.replace(decimal_separator, '.')))
    else:
        return decimal_str

Spark 作业在调用 udf 函数时卡住。我尝试从 _to_float 函数中 return 一个固定的双精度值,但没有成功。使用 SQL context.

的 udf 和数据框之间似乎有问题

长话短说,除非必要,否则不要使用 Python UDF(和一般的 UDF):

  • 由于通过 Python 解释器
  • 进行完整往返,因此效率低下
  • Catalyst 无法优化
  • 如果反复使用,会产生长谱系

对于像这样的简单操作,只需使用内置函数即可:

from pyspark.sql.functions import regexp_replace

decimal_separator = ","
exprs = [
    regexp_replace(c, decimal_separator, ".").cast("float").alias(c) 
    if c in columns else c 
    for c in df.columns
]

df.select(*exprs)