pyspark: TypeError: IntegerType can not accept object in type <type 'unicode'>
pyspark: TypeError: IntegerType can not accept object in type <type 'unicode'>
在 Spark 集群上使用 pyspark 编程,
数据很大而且碎片化,因此无法加载到内存中或无法轻松检查数据的完整性
基本上是这样
af.b Current%20events 1 996
af.b Kategorie:Musiek 1 4468
af.b Spesiaal:RecentChangesLinked/Gebruikerbespreking:Freakazoid 1 5209
af.b Spesiaal:RecentChangesLinked/Sir_Arthur_Conan_Doyle 1 5214
维基百科数据:
我从 aws S3 读取它,然后尝试在 pyspark 解释器中使用以下 python 代码构建 spark Dataframe:
parts = data.map(lambda l: l.split())
wikis = parts.map(lambda p: (p[0], p[1],p[2],p[3]))
fields = [StructField("project", StringType(), True),
StructField("title", StringType(), True),
StructField("count", IntegerType(), True),
StructField("byte_size", StringType(), True)]
schema = StructType(fields)
df = sqlContext.createDataFrame(wikis, schema)
一切看起来都很好,只有 createDataFrame 给我错误
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame
rdd, schema = self._createFromRDD(data, schema, samplingRatio)
File "/usr/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
_verify_type(row, schema)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1152, in _verify_type
_verify_type(v, f.dataType)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1136, in _verify_type
raise TypeError("%s can not accept object in type %s" % (dataType, type(obj)))
TypeError: IntegerType can not accept object in type <type 'unicode'>
为什么我不能将应该计数的第三列设置为 IntegerType ?
我该如何解决这个问题?
如 ccheneson 所述,您传递了错误的类型。
假设您 data
看起来像这样:
data = sc.parallelize(["af.b Current%20events 1 996"])
第一张地图后你得到 RDD[List[String]]
:
parts = data.map(lambda l: l.split())
parts.first()
## ['af.b', 'Current%20events', '1', '996']
第二个map将其转换为元组(String, String, String, String)
:
wikis = parts.map(lambda p: (p[0], p[1], p[2],p[3]))
wikis.first()
## ('af.b', 'Current%20events', '1', '996')
您的 schema
指出第 3 列是一个整数:
[f.dataType for f in schema.fields]
## [StringType, StringType, IntegerType, StringType]
架构最常用于避免完整 table 扫描以推断类型并且不执行任何类型转换。
您可以在上一张地图中投射数据:
wikis = parts.map(lambda p: (p[0], p[1], int(p[2]), p[3]))
或将 count
定义为 StringType
并投射列
fields[2] = StructField("count", StringType(), True)
schema = StructType(fields)
wikis.toDF(schema).withColumn("cnt", col("count").cast("integer")).drop("count")
旁注 count
是 SQL 中的保留字,不应用作列名。在 Spark 中,它会在某些情况下按预期工作,而在另一些情况下会失败。
使用 apache 2.0,您可以让 spark 推断数据的架构。总的来说,你需要按照上面的说法在你的解析器函数中进行转换:
"When schema is None, it will try to infer the schema (column names and types) from data, which should be an RDD of Row, or namedtuple, or dict."
在 Spark 集群上使用 pyspark 编程, 数据很大而且碎片化,因此无法加载到内存中或无法轻松检查数据的完整性
基本上是这样
af.b Current%20events 1 996
af.b Kategorie:Musiek 1 4468
af.b Spesiaal:RecentChangesLinked/Gebruikerbespreking:Freakazoid 1 5209
af.b Spesiaal:RecentChangesLinked/Sir_Arthur_Conan_Doyle 1 5214
维基百科数据:
我从 aws S3 读取它,然后尝试在 pyspark 解释器中使用以下 python 代码构建 spark Dataframe:
parts = data.map(lambda l: l.split())
wikis = parts.map(lambda p: (p[0], p[1],p[2],p[3]))
fields = [StructField("project", StringType(), True),
StructField("title", StringType(), True),
StructField("count", IntegerType(), True),
StructField("byte_size", StringType(), True)]
schema = StructType(fields)
df = sqlContext.createDataFrame(wikis, schema)
一切看起来都很好,只有 createDataFrame 给我错误
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame
rdd, schema = self._createFromRDD(data, schema, samplingRatio)
File "/usr/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
_verify_type(row, schema)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1152, in _verify_type
_verify_type(v, f.dataType)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1136, in _verify_type
raise TypeError("%s can not accept object in type %s" % (dataType, type(obj)))
TypeError: IntegerType can not accept object in type <type 'unicode'>
为什么我不能将应该计数的第三列设置为 IntegerType ? 我该如何解决这个问题?
如 ccheneson 所述,您传递了错误的类型。
假设您 data
看起来像这样:
data = sc.parallelize(["af.b Current%20events 1 996"])
第一张地图后你得到 RDD[List[String]]
:
parts = data.map(lambda l: l.split())
parts.first()
## ['af.b', 'Current%20events', '1', '996']
第二个map将其转换为元组(String, String, String, String)
:
wikis = parts.map(lambda p: (p[0], p[1], p[2],p[3]))
wikis.first()
## ('af.b', 'Current%20events', '1', '996')
您的 schema
指出第 3 列是一个整数:
[f.dataType for f in schema.fields]
## [StringType, StringType, IntegerType, StringType]
架构最常用于避免完整 table 扫描以推断类型并且不执行任何类型转换。
您可以在上一张地图中投射数据:
wikis = parts.map(lambda p: (p[0], p[1], int(p[2]), p[3]))
或将 count
定义为 StringType
并投射列
fields[2] = StructField("count", StringType(), True)
schema = StructType(fields)
wikis.toDF(schema).withColumn("cnt", col("count").cast("integer")).drop("count")
旁注 count
是 SQL 中的保留字,不应用作列名。在 Spark 中,它会在某些情况下按预期工作,而在另一些情况下会失败。
使用 apache 2.0,您可以让 spark 推断数据的架构。总的来说,你需要按照上面的说法在你的解析器函数中进行转换:
"When schema is None, it will try to infer the schema (column names and types) from data, which should be an RDD of Row, or namedtuple, or dict."