Pyspark 将元组的 RDD 转换为 Dataframe
Pyspark Convert RDD of tuples to Dataframe
我有一个 rdd 元组,其中前两行如下所示:
[[('n', 12.012457082117459), ('s', 0.79112758892014912)],
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]]
在每个元组中,第一个值,例如:'n'、's'、't',是所需的列名,第二个值,例如:12.012、0.7911。 ... 是每列的所需值。然而,在rdd的每个列表(行)中,我们可以看到并不是所有的列名都在那里。比如第一行只有
'n', 's'
出现了,而没有
's'
在第二行。所以我想将此 rdd 转换为数据框,其中对于未显示在原始元组中的列,值应为 0。换句话说,前两行可能如下所示:
n s t vn omitted.....
12 0.79 0 0 .....
52 0 3.62 3.62 .......
我试过以下方法:
row = Row('l','eng','q','g','j','b','nt','z','n','d','f','i','k','s','vn','nz','v','nrt','tg','nrfg','t','ng','zg','a')
df = tup_sum_data.map(row).toDF()
Row() 中的字符串是我想要的列名。但我收到以下错误:
TypeError Traceback (most recent call last)
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
968 try:
--> 969 return _infer_schema(obj)
970 except TypeError:
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_schema(row)
991 else:
--> 992 raise TypeError("Can not infer schema for type: %s" % type(row))
993
TypeError: Can not infer schema for type: <class 'numpy.float64'>
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
968 try:
--> 969 return _infer_schema(obj)
970 except TypeError:
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
969 return _infer_schema(obj)
970 except TypeError:
--> 971 raise TypeError("not supported type: %s" % type(obj))
972
973
TypeError: not supported type: <class 'tuple'>
省略了错误代码中的某些行。谁能帮我弄清楚如何处理这个问题?谢谢!
更新
我将数据类型从np.float64转为float,没有报错。但是,数据框看起来不像我想要的;它看起来像这样:
+--------------------+
| l|
+--------------------+
|[[n,12.0124570821...|
|[[t,3.62434093297...|
|[[a,0.44628710262...|
|[[n,16.7534769832...|
|[[n,17.6017774340...|
+--------------------+
only showing top 5 rows
那么谁能帮助我如何获得格式正确的数据框?谢谢!
from pyspark.sql.types import *
from pyspark.sql import *
data_frame_schema = StructType([
StructField("n", FloatType()),
StructField("s", FloatType()),
StructField("t", FloatType()),
StructField("v", FloatType()),
StructField("vn", FloatType())
])
raw_list = [[('n', 12.012457082117459), ('s', 0.79112758892014912)], \
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]]
raw_rdd = sc.parallelize(raw_list)
# dict_to_row = lambda d: Row(n=d.get("n"), s=d.get("s"), t=d.get("t"), v=d.get("v"), vn=d.get("vn"))
dict_to_row = lambda d: Row(n=d.get("n", 0.0), s=d.get("s", 0.0), t=d.get("t", 0.0), v=d.get("v", 0.0), vn=d.get("vn", 0.0))
row_rdd = raw_rdd.map(lambda l: dict_to_row(dict(l)))
df = spark.createDataFrame(row_rdd, data_frame_schema)
df.show()
将以上内容粘贴到 pyspark shell 会产生输出:
+---------+----------+--------+---------+--------+
| n| s| t| v| vn|
+---------+----------+--------+---------+--------+
|12.012457|0.79112756| 0.0| 0.0| 0.0|
| 52.74325| 0.0|3.624341|11.644348|3.624341|
+---------+----------+--------+---------+--------+
我有一个 rdd 元组,其中前两行如下所示:
[[('n', 12.012457082117459), ('s', 0.79112758892014912)],
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]]
在每个元组中,第一个值,例如:'n'、's'、't',是所需的列名,第二个值,例如:12.012、0.7911。 ... 是每列的所需值。然而,在rdd的每个列表(行)中,我们可以看到并不是所有的列名都在那里。比如第一行只有
'n', 's'
出现了,而没有
's'
在第二行。所以我想将此 rdd 转换为数据框,其中对于未显示在原始元组中的列,值应为 0。换句话说,前两行可能如下所示:
n s t vn omitted.....
12 0.79 0 0 .....
52 0 3.62 3.62 .......
我试过以下方法:
row = Row('l','eng','q','g','j','b','nt','z','n','d','f','i','k','s','vn','nz','v','nrt','tg','nrfg','t','ng','zg','a')
df = tup_sum_data.map(row).toDF()
Row() 中的字符串是我想要的列名。但我收到以下错误:
TypeError Traceback (most recent call last)
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
968 try:
--> 969 return _infer_schema(obj)
970 except TypeError:
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_schema(row)
991 else:
--> 992 raise TypeError("Can not infer schema for type: %s" % type(row))
993
TypeError: Can not infer schema for type: <class 'numpy.float64'>
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
968 try:
--> 969 return _infer_schema(obj)
970 except TypeError:
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
969 return _infer_schema(obj)
970 except TypeError:
--> 971 raise TypeError("not supported type: %s" % type(obj))
972
973
TypeError: not supported type: <class 'tuple'>
省略了错误代码中的某些行。谁能帮我弄清楚如何处理这个问题?谢谢!
更新 我将数据类型从np.float64转为float,没有报错。但是,数据框看起来不像我想要的;它看起来像这样:
+--------------------+
| l|
+--------------------+
|[[n,12.0124570821...|
|[[t,3.62434093297...|
|[[a,0.44628710262...|
|[[n,16.7534769832...|
|[[n,17.6017774340...|
+--------------------+
only showing top 5 rows
那么谁能帮助我如何获得格式正确的数据框?谢谢!
from pyspark.sql.types import *
from pyspark.sql import *
data_frame_schema = StructType([
StructField("n", FloatType()),
StructField("s", FloatType()),
StructField("t", FloatType()),
StructField("v", FloatType()),
StructField("vn", FloatType())
])
raw_list = [[('n', 12.012457082117459), ('s', 0.79112758892014912)], \
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]]
raw_rdd = sc.parallelize(raw_list)
# dict_to_row = lambda d: Row(n=d.get("n"), s=d.get("s"), t=d.get("t"), v=d.get("v"), vn=d.get("vn"))
dict_to_row = lambda d: Row(n=d.get("n", 0.0), s=d.get("s", 0.0), t=d.get("t", 0.0), v=d.get("v", 0.0), vn=d.get("vn", 0.0))
row_rdd = raw_rdd.map(lambda l: dict_to_row(dict(l)))
df = spark.createDataFrame(row_rdd, data_frame_schema)
df.show()
将以上内容粘贴到 pyspark shell 会产生输出:
+---------+----------+--------+---------+--------+
| n| s| t| v| vn|
+---------+----------+--------+---------+--------+
|12.012457|0.79112756| 0.0| 0.0| 0.0|
| 52.74325| 0.0|3.624341|11.644348|3.624341|
+---------+----------+--------+---------+--------+