Pyspark 将元组的 RDD 转换为 Dataframe

Pyspark Convert RDD of tuples to Dataframe

我有一个 rdd 元组,其中前两行如下所示:

[[('n', 12.012457082117459), ('s', 0.79112758892014912)],
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]]

在每个元组中,第一个值,例如:'n'、's'、't',是所需的列名,第二个值,例如:12.012、0.7911。 ... 是每列的所需值。然而,在rdd的每个列表(行)中,我们可以看到并不是所有的列名都在那里。比如第一行只有

'n', 's' 

出现了,而没有

's' 

在第二行。所以我想将此 rdd 转换为数据框,其中对于未显示在原始元组中的列,值应为 0。换句话说,前两行可能如下所示:

n     s      t       vn     omitted.....
12    0.79   0       0      ..... 
52    0      3.62    3.62    .......

我试过以下方法:

row = Row('l','eng','q','g','j','b','nt','z','n','d','f','i','k','s','vn','nz','v','nrt','tg','nrfg','t','ng','zg','a')
df = tup_sum_data.map(row).toDF()

Row() 中的字符串是我想要的列名。但我收到以下错误:

TypeError                                 Traceback (most recent call last)
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
968         try:
--> 969             return _infer_schema(obj)
970         except TypeError:

/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_schema(row)
991     else:
--> 992         raise TypeError("Can not infer schema for type: %s" % type(row))
993 

TypeError: Can not infer schema for type: <class 'numpy.float64'>
During handling of the above exception, another exception occurred:
TypeError                                 Traceback (most recent call last)
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
968         try:
--> 969             return _infer_schema(obj)
970         except TypeError:

/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj)
969             return _infer_schema(obj)
970         except TypeError:
--> 971             raise TypeError("not supported type: %s" % type(obj))
972 
973 

TypeError: not supported type: <class 'tuple'>

省略了错误代码中的某些行。谁能帮我弄清楚如何处理这个问题?谢谢!

更新 我将数据类型从np.float64转为float,没有报错。但是,数据框看起来不像我想要的;它看起来像这样:

+--------------------+
|                   l|
+--------------------+
|[[n,12.0124570821...|
|[[t,3.62434093297...|
|[[a,0.44628710262...|
|[[n,16.7534769832...|
|[[n,17.6017774340...|
+--------------------+
only showing top 5 rows

那么谁能帮助我如何获得格式正确的数据框?谢谢!

from pyspark.sql.types import *
from pyspark.sql import *

data_frame_schema = StructType([
    StructField("n", FloatType()),
    StructField("s", FloatType()),
    StructField("t", FloatType()),
    StructField("v", FloatType()),
    StructField("vn", FloatType())
])

raw_list = [[('n', 12.012457082117459), ('s', 0.79112758892014912)], \
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]]

raw_rdd = sc.parallelize(raw_list)

# dict_to_row = lambda d: Row(n=d.get("n"), s=d.get("s"), t=d.get("t"), v=d.get("v"), vn=d.get("vn"))
dict_to_row = lambda d: Row(n=d.get("n", 0.0), s=d.get("s", 0.0), t=d.get("t", 0.0), v=d.get("v", 0.0), vn=d.get("vn", 0.0))

row_rdd = raw_rdd.map(lambda l: dict_to_row(dict(l)))
df = spark.createDataFrame(row_rdd, data_frame_schema)
df.show()

将以上内容粘贴到 pyspark shell 会产生输出:

+---------+----------+--------+---------+--------+
|        n|         s|       t|        v|      vn|
+---------+----------+--------+---------+--------+
|12.012457|0.79112756|     0.0|      0.0|     0.0|
| 52.74325|       0.0|3.624341|11.644348|3.624341|
+---------+----------+--------+---------+--------+