使用正确的数据类型在 Pyspark 中读取 CSV
Read in CSV in Pyspark with correct Datatypes
当我尝试使用 spark 导入本地 CSV 时,默认情况下每列都作为字符串读入。但是,我的列只包含整数和时间戳类型。更具体地说,CSV 看起来像这样:
"Customer","TransDate","Quantity","PurchAmount","Cost","TransID","TransKey"
149332,"15.11.2005",1,199.95,107,127998739,100000
我找到了应该在 this question 中工作的代码,但是当我执行它时,所有条目都返回为 NULL
。
我使用以下内容创建自定义架构:
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, TimestampType
customSchema = StructType(Array(
StructField("Customer", IntegerType, true),
StructField("TransDate", TimestampType, true),
StructField("Quantity", IntegerType, true),
StructField("Cost", IntegerType, true),
StructField("TransKey", IntegerType, true)))
然后读入 CSV 文件:
myData = spark.read.load('myData.csv', format="csv", header="true", sep=',', schema=customSchema)
哪个returns:
+--------+---------+--------+----+--------+
|Customer|TransDate|Quantity|Cost|Transkey|
+--------+---------+--------+----+--------+
| null| null| null|null| null|
+--------+---------+--------+----+--------+
我错过了关键的一步吗?我怀疑 Date 列是问题的根源。注意:我是 运行 GoogleCollab 中的这个。
给你!
"Customer","TransDate","Quantity","PurchAmount","Cost","TransID","TransKey"
149332,"15.11.2005",1,199.95,107,127998739,100000
PATH_TO_FILE="file:///u/vikrant/LocalTestDateFile"
Loading above file to dataframe:
df = spark.read.format("com.databricks.spark.csv") \
.option("mode", "DROPMALFORMED") \
.option("header", "true") \
.option("inferschema", "true") \
.option("delimiter", ",").load(PATH_TO_FILE)
您的日期将作为字符串列类型加载,但是当您将其更改为日期类型时,它会将此日期格式视为 NULL。
df = (df.withColumn('TransDate',col('TransDate').cast('date'))
+--------+---------+--------+-----------+----+---------+--------+
|Customer|TransDate|Quantity|PurchAmount|Cost| TransID|TransKey|
+--------+---------+--------+-----------+----+---------+--------+
| 149332| null| 1| 199.95| 107|127998739| 100000|
+--------+---------+--------+-----------+----+---------+--------+
因此我们需要将日期格式从 dd.mm.yy 更改为 yy-mm-dd。
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
from pyspark.sql.functions import col
Python函数改变日期格式:
change_dateformat_func = udf (lambda x: datetime.strptime(x, '%d.%m.%Y').strftime('%Y-%m-%d'))
现在为您的数据框列调用此函数:
newdf = df.withColumn('TransDate', change_dateformat_func(col('TransDate')).cast(DateType()))
+--------+----------+--------+-----------+----+---------+--------+
|Customer| TransDate|Quantity|PurchAmount|Cost| TransID|TransKey|
+--------+----------+--------+-----------+----+---------+--------+
| 149332|2005-11-15| 1| 199.95| 107|127998739| 100000|
+--------+----------+--------+-----------+----+---------+--------+
下面是架构:
|-- Customer: integer (nullable = true)
|-- TransDate: date (nullable = true)
|-- Quantity: integer (nullable = true)
|-- PurchAmount: double (nullable = true)
|-- Cost: integer (nullable = true)
|-- TransID: integer (nullable = true)
|-- TransKey: integer (nullable = true)
让我知道它是否适合你。
您可以为 DataFrameReader
指定一个选项 ('dateFormat','d.M.y')
来解析特定格式的日期。
df = spark.read.format("csv").option("header","true").option("dateFormat","M.d.y").schema(my_schema).load("path_to_csv")
参考
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader
https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
当我尝试使用 spark 导入本地 CSV 时,默认情况下每列都作为字符串读入。但是,我的列只包含整数和时间戳类型。更具体地说,CSV 看起来像这样:
"Customer","TransDate","Quantity","PurchAmount","Cost","TransID","TransKey"
149332,"15.11.2005",1,199.95,107,127998739,100000
我找到了应该在 this question 中工作的代码,但是当我执行它时,所有条目都返回为 NULL
。
我使用以下内容创建自定义架构:
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, TimestampType
customSchema = StructType(Array(
StructField("Customer", IntegerType, true),
StructField("TransDate", TimestampType, true),
StructField("Quantity", IntegerType, true),
StructField("Cost", IntegerType, true),
StructField("TransKey", IntegerType, true)))
然后读入 CSV 文件:
myData = spark.read.load('myData.csv', format="csv", header="true", sep=',', schema=customSchema)
哪个returns:
+--------+---------+--------+----+--------+
|Customer|TransDate|Quantity|Cost|Transkey|
+--------+---------+--------+----+--------+
| null| null| null|null| null|
+--------+---------+--------+----+--------+
我错过了关键的一步吗?我怀疑 Date 列是问题的根源。注意:我是 运行 GoogleCollab 中的这个。
给你!
"Customer","TransDate","Quantity","PurchAmount","Cost","TransID","TransKey"
149332,"15.11.2005",1,199.95,107,127998739,100000
PATH_TO_FILE="file:///u/vikrant/LocalTestDateFile"
Loading above file to dataframe:
df = spark.read.format("com.databricks.spark.csv") \
.option("mode", "DROPMALFORMED") \
.option("header", "true") \
.option("inferschema", "true") \
.option("delimiter", ",").load(PATH_TO_FILE)
您的日期将作为字符串列类型加载,但是当您将其更改为日期类型时,它会将此日期格式视为 NULL。
df = (df.withColumn('TransDate',col('TransDate').cast('date'))
+--------+---------+--------+-----------+----+---------+--------+
|Customer|TransDate|Quantity|PurchAmount|Cost| TransID|TransKey|
+--------+---------+--------+-----------+----+---------+--------+
| 149332| null| 1| 199.95| 107|127998739| 100000|
+--------+---------+--------+-----------+----+---------+--------+
因此我们需要将日期格式从 dd.mm.yy 更改为 yy-mm-dd。
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
from pyspark.sql.functions import col
Python函数改变日期格式:
change_dateformat_func = udf (lambda x: datetime.strptime(x, '%d.%m.%Y').strftime('%Y-%m-%d'))
现在为您的数据框列调用此函数:
newdf = df.withColumn('TransDate', change_dateformat_func(col('TransDate')).cast(DateType()))
+--------+----------+--------+-----------+----+---------+--------+
|Customer| TransDate|Quantity|PurchAmount|Cost| TransID|TransKey|
+--------+----------+--------+-----------+----+---------+--------+
| 149332|2005-11-15| 1| 199.95| 107|127998739| 100000|
+--------+----------+--------+-----------+----+---------+--------+
下面是架构:
|-- Customer: integer (nullable = true)
|-- TransDate: date (nullable = true)
|-- Quantity: integer (nullable = true)
|-- PurchAmount: double (nullable = true)
|-- Cost: integer (nullable = true)
|-- TransID: integer (nullable = true)
|-- TransKey: integer (nullable = true)
让我知道它是否适合你。
您可以为 DataFrameReader
指定一个选项 ('dateFormat','d.M.y')
来解析特定格式的日期。
df = spark.read.format("csv").option("header","true").option("dateFormat","M.d.y").schema(my_schema).load("path_to_csv")
参考
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html