转换列并更新 DataFrame

Transforming a column and update the DataFrame

所以,我在下面做的是从 DataFrame 中删除一列 A 因为我想应用转换(这里我只是 json.loads a JSON string) 并将旧列替换为转换后的列。转换后,我只是加入了两个结果数据框。

df = df_data.drop('A').join(
    df_data[['ID', 'A']].rdd\
        .map(lambda x: (x.ID, json.loads(x.A)) 
             if x.A is not None else (x.ID, None))\
        .toDF()\
        .withColumnRenamed('_1', 'ID')\
        .withColumnRenamed('_2', 'A'),
    ['ID']
)

我不喜欢的当然是我面临的开销,因为我必须执行 withColumnRenamed 操作。

有了 pandas 我会做这样的事情:

pdf = pd.DataFrame([json.dumps([0]*np.random.randint(5,10)) for i in range(10)], columns=['A'])
pdf.A = pdf.A.map(lambda x: json.loads(x))
pdf

但以下内容在 pyspark 中不起作用:

df.A = df[['A']].rdd.map(lambda x: json.loads(x.A))

那么有没有比我在第一个代码片段中所做的更简单的方法?

据我了解,您正在努力实现这样的目标吗?

import pyspark.sql.functions as F
import json

json_convert = F.udf(lambda x: json.loads(x) if x is not None else None)

cols = df_data.columns
df = df_data.select([json_convert(F.col('A')).alias('A')] + \
                    [col for col in cols if col != 'A'])

我认为您不需要删除列并进行联接。以下代码应该* 等同于您发布的内容:

cols = df_data.columns
df = df_data.rdd\
    .map(
        lambda row: tuple(
            [row[c] if c != 'A' else (json.loads(row[c]) if row[c] is not None else None) 
             for c in cols]
        )
    )\
    .toDF(cols)

*我还没有实际测试过这段代码,但我认为这应该可行。

但要回答您的一般问题,您可以使用 withColumn().

转换列 in-place
df = df_data.withColumn("A", my_transformation_function("A").alias("A"))

其中 my_transformation_function() 可以是 udfpyspark sql function