PySpark 尝试将前一个字段的模式应用到下一个字段
PySpark trying to apply previous field's schema to next field
PySpark 有这个奇怪的问题。它似乎正在尝试将前一个字段的模式应用到下一个字段,因为它正在处理。
我能想到的最简单的测试用例:
%pyspark
from pyspark.sql.types import (
DateType,
StructType,
StructField,
StringType,
)
from datetime import date
from pyspark.sql import Row
schema = StructType(
[
StructField("date", DateType(), True),
StructField("country", StringType(), True),
]
)
test = spark.createDataFrame(
[
Row(
date=date(2019, 1, 1),
country="RU",
),
],
schema
)
堆栈跟踪:
Fail to execute line 26: schema
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-8579306903394369208.py", line 380, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 26, in <module>
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 691, in createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 423, in _createFromLocal
data = [schema.toInternal(row) for row in data]
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in toInternal
for f, v, c in zip(self.fields, obj, self._needConversion))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in <genexpr>
for f, v, c in zip(self.fields, obj, self._needConversion))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 439, in toInternal
return self.dataType.toInternal(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 175, in toInternal
return d.toordinal() - self.EPOCH_ORDINAL
AttributeError: 'str' object has no attribute 'toordinal'
来自 运行 本地而不是 Zepplin 的奖励信息:
self = DateType, d = 'RU'
def toInternal(self, d):
if d is not None:
> return d.toordinal() - self.EPOCH_ORDINAL
E AttributeError: 'str' object has no attribute 'toordinal'
例如,它正在尝试将 DateType
应用于 country
。如果我去掉 date
,没问题。如果我去掉 country
,没问题。两者一起,是不行的。
有什么想法吗?我是否漏掉了一些明显的东西?
如果您要使用 Row
的列表,您也不需要指定架构。这是因为 Row
已经知道架构。
出现此问题是因为 pyspark.sql.Row
对象没有保持您为字段指定的顺序。
print(Row(date=date(2019, 1, 1), country="RU"))
#Row(country='RU', date=datetime.date(2019, 1, 1))
来自docs:
Row can be used to create a row object by using named arguments, the fields will be sorted by names.
如您所见,country
字段被放在最前面。当 spark 尝试使用指定的 schema
创建 DataFrame 时,它期望第一项是 DateType
.
解决此问题的一种方法是按字母顺序将字段放入 schema
:
schema = StructType(
[
StructField("country", StringType(), True),
StructField("date", DateType(), True)
]
)
test = spark.createDataFrame(
[
Row(date=date(2019, 1, 1), country="RU")
],
schema
)
test.show()
#+-------+----------+
#|country| date|
#+-------+----------+
#| RU|2019-01-01|
#+-------+----------+
或者在这种情况下,甚至不需要将 schema
传递给 createDataFrame
。它将从 Row
s:
中推断出来
test = spark.createDataFrame(
[
Row(date=date(2019, 1, 1), country="RU")
]
)
如果您想对列重新排序,请使用 select
:
test = test.select("date", "country")
test.show()
#+----------+-------+
#| date|country|
#+----------+-------+
#|2019-01-01| RU|
#+----------+-------+
PySpark 有这个奇怪的问题。它似乎正在尝试将前一个字段的模式应用到下一个字段,因为它正在处理。
我能想到的最简单的测试用例:
%pyspark
from pyspark.sql.types import (
DateType,
StructType,
StructField,
StringType,
)
from datetime import date
from pyspark.sql import Row
schema = StructType(
[
StructField("date", DateType(), True),
StructField("country", StringType(), True),
]
)
test = spark.createDataFrame(
[
Row(
date=date(2019, 1, 1),
country="RU",
),
],
schema
)
堆栈跟踪:
Fail to execute line 26: schema
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-8579306903394369208.py", line 380, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 26, in <module>
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 691, in createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 423, in _createFromLocal
data = [schema.toInternal(row) for row in data]
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in toInternal
for f, v, c in zip(self.fields, obj, self._needConversion))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in <genexpr>
for f, v, c in zip(self.fields, obj, self._needConversion))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 439, in toInternal
return self.dataType.toInternal(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 175, in toInternal
return d.toordinal() - self.EPOCH_ORDINAL
AttributeError: 'str' object has no attribute 'toordinal'
来自 运行 本地而不是 Zepplin 的奖励信息:
self = DateType, d = 'RU'
def toInternal(self, d):
if d is not None:
> return d.toordinal() - self.EPOCH_ORDINAL
E AttributeError: 'str' object has no attribute 'toordinal'
例如,它正在尝试将 DateType
应用于 country
。如果我去掉 date
,没问题。如果我去掉 country
,没问题。两者一起,是不行的。
有什么想法吗?我是否漏掉了一些明显的东西?
如果您要使用 Row
的列表,您也不需要指定架构。这是因为 Row
已经知道架构。
出现此问题是因为 pyspark.sql.Row
对象没有保持您为字段指定的顺序。
print(Row(date=date(2019, 1, 1), country="RU"))
#Row(country='RU', date=datetime.date(2019, 1, 1))
来自docs:
Row can be used to create a row object by using named arguments, the fields will be sorted by names.
如您所见,country
字段被放在最前面。当 spark 尝试使用指定的 schema
创建 DataFrame 时,它期望第一项是 DateType
.
解决此问题的一种方法是按字母顺序将字段放入 schema
:
schema = StructType(
[
StructField("country", StringType(), True),
StructField("date", DateType(), True)
]
)
test = spark.createDataFrame(
[
Row(date=date(2019, 1, 1), country="RU")
],
schema
)
test.show()
#+-------+----------+
#|country| date|
#+-------+----------+
#| RU|2019-01-01|
#+-------+----------+
或者在这种情况下,甚至不需要将 schema
传递给 createDataFrame
。它将从 Row
s:
test = spark.createDataFrame(
[
Row(date=date(2019, 1, 1), country="RU")
]
)
如果您想对列重新排序,请使用 select
:
test = test.select("date", "country")
test.show()
#+----------+-------+
#| date|country|
#+----------+-------+
#|2019-01-01| RU|
#+----------+-------+