Pyspark NULL 映射键

Pyspark NULL mapping key

请原谅我的无知,我是 pyspark 的新手。我正在尝试改进 udf,以使用字典基于来自另一列 a_type 的值创建新列 count_adj。我如何在此过程中考虑 None / Null 类型来创建我的新列。这在 pandas (df['adj_count'] = df.a_type.map(count_map)) 中非常容易,但在 pyspark 中很难做到这一点。

示例数据/导入:

# all imports used -- not just for this portion of the script
from pyspark.sql import SparkSession, HiveContext, SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import sql
import pyspark.sql.functions as F
import random
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from datetime import datetime
from datetime import date
from datetime import timedelta
from pyspark.sql import Window
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import rank, row_number, max as max_, col
import sys
import os

spark = SparkSession.builder.appName('a_type_tests').getOrCreate()

# note: sample data has results from the original udf for comparison
dataDictionary = [
(26551, 491, '2022-01-22', '740', -1, 16),
(24192, 338, '2022-01-22', '740', -1, 16),
(26555, 3013, '2022-01-22', '740', -1, 16),
(26571, 937, '2022-01-22', '740', -1, 16),
(24376, 371, '2022-01-22', '740', -1, 16),
(17716, 118, '2022-01-22', '740', -1, 16),
(26554, 3013, '2022-01-22', '740', -1, 16),
(26734, 105, '2022-01-22', '740', -1, 16),
(26051, 415, '2022-01-22', '600', -1, 8),
(26602, 501, '2022-01-22', '740', -1, 16),
(26125, 501, '2022-01-22', None, -1, 0)
        ]

sdf = spark.createDataFrame(data=dataDictionary, schema = ['id', 'loc_id', 'a_date', 'a_type', 'adj_val', 'udf_original'])
sdf.printSchema()
sdf.show(truncate=False)

原始udf类似于:

def count_adj(a_type):
    if a_type is None:
        return 0
    elif a_type in ('703','704','705','708','900','910'):
        return 4
    elif a_type in ('701','702'):
        return 2
    elif a_type in ('711','712'):
        return 1
    elif a_type in ('600', '704'):
        return 8
    elif a_type in ('740'):
        return 16
    elif a_type in ('305','306'):
        return 32
    elif a_type in ('601','612','615'):
        return 64
    else:
        return 128

我已经创建了一个字典来对应这些值。

# remove 0:None type pairing because None is not iterable to invert dict
count_map = {1:['711','712'], \
             2:['701','702'], \
             4:['703','704','705','708','900','910'], \
             8:['600', '704'], \
            16:['740'], \
            32:['305','306'], \
            64:['601','612','615'], \
           128: ['1600', '1601', '1602']
            }

# invert dict
count_map = {c:key for key, vals in count_map.items() for c in vals}

# create None mapping manually
count_map[None] = 0

搜索 SO 我发现 导致以下错误:

# Code Tried:

# Changes None type to NULL -- fine but how do I account for these None/Null Values in my dict?
map_expr = F.create_map([lit(x) for x in chain(*count_map.items())])

sdf2 = sdf.withColumn('count_adj', map_expr.getItem('a_type'))

# or:

sdf2 = sdf.withColumn('count_adj',map_expr[col('a_type')]).show()
# Error

Py4JJavaError: An error occurred while calling o334.showString.
: java.lang.RuntimeException: Cannot use null as map key.

在使用字典根据另一列的值创建新列时,如何考虑 None / NULL 类型?是否可以在我的地图表达式或其他完全包含 NULL 检查?

键列必须全部具有相同的数据类型,并且不能为空。地图的键不接受 None/Null 值。

您可以使用 when 函数代替上面的代码,它会给出您想要的输出,如下所示:

newDF = sdf.withColumn("count_adj",F.when(F.col("a_type").isNull(),0)\
       .when(F.col("a_type").isin('711','712'),1)\
       .when(F.col("a_type").isin('701','702'),2)\
       .when(F.col("a_type").isin('703','704','705','708','900','910'),4)\
       .when(F.col("a_type").isin('600', '704'),8)\
       .when(F.col("a_type").isin('740'),16)\
       .when(F.col("a_type").isin('305','306'),32)\
       .when(F.col("a_type").isin('601','612','615'),64)\
       .otherwise(128))

为了完整起见,我从字典中删除了 None 类型,并使用了类似于 Karthik 的答案的方法以及问题中提到的其他 SO 帖子的组合。

我的最终解决方案依赖于下面的代码并使用 .when().isNull() 来计算 None / NULL 转换。

# Original Mapping
# remove 0:None type pairing because None is not iterable to invert dict
count_map = {1:['711','712'], \
             2:['701','702'], \
             4:['703','704','705','708','900','910'], \
             8:['600', '704'], \
            16:['740'], \
            32:['305','306'], \
            64:['601','612','615'], \
           128: ['1600', '1601', '1602']
            }

# invert dict
count_map = {c:key for key, vals in count_map.items() for c in vals}

# New below:
map_expr = F.create_map([lit(x) for x in chain(*count_map.items())])

sdf2 = sdf.withColumn('count_adj', F.when( col('a_type').isNull(), 0 ).otherwise( map_expr.getItem(col('a_type') ) ) )