从 RDD 流水线到 DF Pyspark
From RDD pipeline to DF Pyspark
当我开始寻求帮助时,这个问题似乎很好解决,但我还没有找到解决方案。事实上,您可能会发现一些您认为可能重复的内容,但我想我在过去几个小时内已经尝试了所有这些内容。据我了解,sqlContext 将在这里发挥作用,但我愿意接受任何有效的答案。我正在使用 Spark 2.1
我从 mongodb 下拉的 ID 列表开始。
示例输出:
[u'182028', u'161936', u'12333', u'120677']
'rated_game_ids_lst type:' <type 'list'>
然后我继续尝试创建一个我想变成 DF 的 RDD:
user_unrated_games = ugr_rdd.filter(lambda x: x[1] not in rated_game_ids_lst).map(lambda x: (19, x[1], x[2]))
示例输出:
'user_unrated_games:' [(19, u'174430', 3.4), (19, u'169786', 3.4)]
'user_unrated_games type:' <class 'pyspark.rdd.PipelinedRDD'>
和我在上面使用的 urg_rdd 的示例(第一行):
'ugr_rdd:'[Row(user_id=5, game_id=u'182028', rating=9.15)]
'ugr_rdd_type:' pyspark.rdd.RDD
然后我试试这个:
df = sqlContext.createDataFrame(user_unrated_games, ['user_id', 'game_id', 'rating'])
那个方法失败了所以我试了这个:
user_unrated_games = ugr_rdd.filter(lambda x: x[1] not in rated_game_ids_lst).map(lambda x: Row(user_id=19, game_id=x[1], rating= x[2]))
示例输出:
('user_unrated_games type:', <class 'pyspark.rdd.PipelinedRDD'>)
('user_unrated_games:', [Row(game_id=u'174430', rating=3.4, user_id=19), Row(game_id=u'169786', rating=3.4, user_id=19)])
然后是这个:
df = sqlContext.createDataFrame(user_unrated_games)
这两种尝试都出现此错误:
IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':"
从那里开始,我开始尝试组合更改 "user_id" 的类型等,尝试按原样传递 RDD,尝试将我的管道转换为 RDD...坦率地说,我尝试了很多东西,但上面的两个看起来最接近似乎对其他人有用的东西。
问题是调用createDataFrame
方法时需要指定包含数据类型的架构。这样的事情应该可以解决问题:
from pyspark.sql.types import *
rdd = sc.parallelize([(19, 174430, 3.4), (19, 169786, 3.4)])
schema = StructType( [
StructField('user_id', IntegerType()),
StructField('game_id', IntegerType()),
StructField('rating', FloatType())
])
df = spark.createDataFrame(rdd, schema)
df.show()
注意:我已经使用 spark 2.1.0 对此进行了测试。在这种情况下 spark
是一个 SparkSession
对象。
当我开始寻求帮助时,这个问题似乎很好解决,但我还没有找到解决方案。事实上,您可能会发现一些您认为可能重复的内容,但我想我在过去几个小时内已经尝试了所有这些内容。据我了解,sqlContext 将在这里发挥作用,但我愿意接受任何有效的答案。我正在使用 Spark 2.1
我从 mongodb 下拉的 ID 列表开始。 示例输出:
[u'182028', u'161936', u'12333', u'120677']
'rated_game_ids_lst type:' <type 'list'>
然后我继续尝试创建一个我想变成 DF 的 RDD:
user_unrated_games = ugr_rdd.filter(lambda x: x[1] not in rated_game_ids_lst).map(lambda x: (19, x[1], x[2]))
示例输出:
'user_unrated_games:' [(19, u'174430', 3.4), (19, u'169786', 3.4)]
'user_unrated_games type:' <class 'pyspark.rdd.PipelinedRDD'>
和我在上面使用的 urg_rdd 的示例(第一行):
'ugr_rdd:'[Row(user_id=5, game_id=u'182028', rating=9.15)]
'ugr_rdd_type:' pyspark.rdd.RDD
然后我试试这个:
df = sqlContext.createDataFrame(user_unrated_games, ['user_id', 'game_id', 'rating'])
那个方法失败了所以我试了这个:
user_unrated_games = ugr_rdd.filter(lambda x: x[1] not in rated_game_ids_lst).map(lambda x: Row(user_id=19, game_id=x[1], rating= x[2]))
示例输出:
('user_unrated_games type:', <class 'pyspark.rdd.PipelinedRDD'>)
('user_unrated_games:', [Row(game_id=u'174430', rating=3.4, user_id=19), Row(game_id=u'169786', rating=3.4, user_id=19)])
然后是这个:
df = sqlContext.createDataFrame(user_unrated_games)
这两种尝试都出现此错误:
IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':"
从那里开始,我开始尝试组合更改 "user_id" 的类型等,尝试按原样传递 RDD,尝试将我的管道转换为 RDD...坦率地说,我尝试了很多东西,但上面的两个看起来最接近似乎对其他人有用的东西。
问题是调用createDataFrame
方法时需要指定包含数据类型的架构。这样的事情应该可以解决问题:
from pyspark.sql.types import *
rdd = sc.parallelize([(19, 174430, 3.4), (19, 169786, 3.4)])
schema = StructType( [
StructField('user_id', IntegerType()),
StructField('game_id', IntegerType()),
StructField('rating', FloatType())
])
df = spark.createDataFrame(rdd, schema)
df.show()
注意:我已经使用 spark 2.1.0 对此进行了测试。在这种情况下 spark
是一个 SparkSession
对象。