Pyspark 的新手 - 导入 CSV 并创建包含数组列的镶木地板文件

New to Pyspark - importing a CSV and creating a parquet file with array columns

我是 Pyspark 的新手,我一直在努力尝试完成我认为相当简单的事情。我正在尝试将 csv 文件转换为镶木地板文件的 ETL 过程。 CSV 文件有几个简单的列,但其中一列是带分隔符的整数数组,我想将其 expand/unzip 放入镶木地板文件中。这个 parquet 文件实际上被 .net 核心微服务使用,它使用 Parquet Reader 来进行下游计算。为了让这个问题简单化,专栏的结构是:

"geomap" 5:3:7|4:2:1|8:2:78 -> 这表示一个包含 3 个项目的数组,它在“|”处拆分然后元组是值 (5,3,7), (4,2,1), (8,2,78)

我已经尝试了各种流程和模式,但我无法做到这一点。通过 UDF,我正在创建一个列表列表或一个元组列表,但我无法获得正确的架构或将数据解压缩到镶木地板写入操作中。我得到空值、错误或其他问题。我需要以不同的方式处理这个问题吗?相关代码如下。为简单起见,我只显示问题列,因为我有其余的工作。这是我第一次尝试 Pyspark,很抱歉遗漏了一些明显的东西:

def convert_geo(geo):
   return [tuple(x.split(':')) for x in geo.split('|')]

compression_type = 'snappy'

schema = ArrayType(StructType([
    StructField("c1", IntegerType(), False),
    StructField("c2", IntegerType(), False),
    StructField("c3", IntegerType(), False)
]))

spark_convert_geo = udf(lambda z: convert_geo(z),schema)

source_path = '...path to csv'
destination_path = 'path for generated parquet file'

df = spark.read.option('delimiter',',').option('header','true').csv(source_path).withColumn("geomap",spark_convert_geo(col('geomap')).alias("geomap"))
df.write.mode("overwrite").format('parquet').option('compression', compression_type).save(destination_path)

编辑:根据添加 printSchema() 输出的请求,我也不确定这里有什么问题。我似乎仍然无法正确显示或呈现字符串拆分值。这包含所有列。我确实看到了 c1、c2 和 c3 结构名称...

root |-- lrsegid: integer (nullable = true) |-- loadsourceid: integer (nullable = true) |-- agencyid: integer (nullable = true) |-- acres: float (nullable = true) |-- sourcemap: array (nullable = true) | |-- element: integer (containsNull = true) |-- geomap: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- c1: integer (nullable = false) | | |-- c2: integer (nullable = false) | | |-- c3: integer (nullable = false) 

问题是 convert_geo 函数 returns 一个包含字符元素的元组列表,而不是架构中指定的整数。如果你修改如下它会起作用:

def convert_geo(geo):
    return [tuple([int(y) for y in x.split(':')]) for x in geo.split('|')]