在 Pyspark 中,我如何比较两列并在它们不相同时使用 x

In Pyspark how do I compare two columns and use x whenever they are not the same

我如何比较两列并说当它们不相同时我想使用 x 列 这就是我现在正在做的事情。

 SUMMARY = SUMMARY.withColumn("type_description", F.when((SUMMARY.type_description != SUMMARY.rename_description), F.lit("rename_description")

您可以使用列表达式代替文字。这看起来与您所拥有的非常接近,

SUMMARY = SUMMARY.withColumn(
    "type_description",
    F.when(SUMMARY.type_description != SUMMARY.rename_description, SUMMARY.x).otherwise(
        SUMMARY.type_description
    )
)

IIUC,你想比较两列是否相同,return如果不相同则比较y列的值,如果相同则比较x列的值。如果那样,您可以使用 whencol 来自 pyspark.sql.functions:

from pyspark.sql.functions import when, col

df = df.withColumn('type_description_new',
                   when(col('type_description')!=col('rename_description'),
                                               col('rename_description')).otherwise(col('type_description')))

结果和设置:

df = spark.createDataFrame(
    [(1,1), (2,2),(3,4)], 
    ['type_description', 'rename_description']
)

>>> df.show(truncate=False)
+----------------+------------------+--------------------+
|type_description|rename_description|type_description_new|
+----------------+------------------+--------------------+
|               1|                 1|                   1|
|               2|                 2|                   2|
|               3|                 4|                   4|
+----------------+------------------+--------------------+

你非常接近,你只是错过了 .otherwise 电话。

from pyspark.sql import types as T, functions as F, SparkSession

spark = SparkSession.builder.getOrCreate()


# Synthesize DataFrames
schema = T.StructType([
  T.StructField("type_description", T.StringType(), False),
  T.StructField("rename_description", T.StringType(), False),
  T.StructField("col_3", T.StringType(), False),
  T.StructField("col_4", T.IntegerType(), False),
])
data = [
  {"type_description": "key_1", "rename_description": "key_2", "col_3": "CREATE", "col_4": 0},
  {"type_description": "key_2", "rename_description": "key_2", "col_3": "CREATE", "col_4": 0},
  {"type_description": "key_3", "rename_description": "OVERRIDE", "col_3": "CREATE", "col_4": 0},
]

df = spark.createDataFrame(data, schema)
df.show()
"""
+----------------+------------------+------+-----+
|type_description|rename_description| col_3|col_4|
+----------------+------------------+------+-----+
|           key_1|             key_2|CREATE|    0|
|           key_2|             key_2|CREATE|    0|
|           key_3|          OVERRIDE|CREATE|    0|
+----------------+------------------+------+-----+
"""

SUMMARY_DF = df.withColumn(
  "final_description",
  F.when(
    df.type_description == df.rename_description,
    df.type_description
  ).otherwise(
    df.rename_description
  )
)
SUMMARY_DF.show()
"""
+----------------+------------------+------+-----+-----------------+
|type_description|rename_description| col_3|col_4|final_description|
+----------------+------------------+------+-----+-----------------+
|           key_1|             key_2|CREATE|    0|            key_2|
|           key_2|             key_2|CREATE|    0|            key_2|
|           key_3|          OVERRIDE|CREATE|    0|         OVERRIDE|
+----------------+------------------+------+-----+-----------------+
"""