PySpark 在嵌套数组中反转 StringIndexer
PySpark reversing StringIndexer in nested array
我正在使用 PySpark 通过 ALS 进行协同过滤。我的原始用户和项目 ID 是字符串,所以我使用 StringIndexer
将它们转换为数字索引(PySpark 的 ALS 模型要求我们这样做)。
拟合模型后,我可以像这样为每个用户获得前 3 个推荐:
recs = (
model
.recommendForAllUsers(3)
)
recs
数据框如下所示:
+-----------+--------------------+
|userIdIndex| recommendations|
+-----------+--------------------+
| 1580|[[10096,3.6725707...|
| 4900|[[10096,3.0137873...|
| 5300|[[10096,2.7274625...|
| 6620|[[10096,2.4493625...|
| 7240|[[10096,2.4928937...|
+-----------+--------------------+
only showing top 5 rows
root
|-- userIdIndex: integer (nullable = false)
|-- recommendations: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- productIdIndex: integer (nullable = true)
| | |-- rating: float (nullable = true)
我想用这个数据框创建一个巨大的 JSOM 转储,我可以这样:
(
recs
.toJSON()
.saveAsTextFile("name_i_must_hide.recs")
)
这些 json 的示例是:
{
"userIdIndex": 1580,
"recommendations": [
{
"productIdIndex": 10096,
"rating": 3.6725707
},
{
"productIdIndex": 10141,
"rating": 3.61542
},
{
"productIdIndex": 11591,
"rating": 3.536216
}
]
}
userIdIndex
和 productIdIndex
键是由于 StringIndexer
转换。
如何取回这些列的原始值?我怀疑我必须使用 IndexToString
变换器,但我不太清楚如何,因为数据嵌套在 recs
数据框内的数组中。
我尝试使用 Pipeline
求值器 (stages=[StringIndexer, ALS, IndexToString]
),但该求值器似乎不支持这些索引器。
干杯!
在这两种情况下,您都需要访问标签列表。这可以使用 StringIndexerModel
user_indexer_model = ... # type: StringIndexerModel
user_labels = user_indexer_model.labels
product_indexer_model = ... # type: StringIndexerModel
product_labels = product_indexer_model.labels
或列元数据。
对于 userIdIndex
你可以申请 IndexToString
:
from pyspark.ml.feature import IndexToString
user_id_to_label = IndexToString(
inputCol="userIdIndex", outputCol="userId", labels=user_labels)
user_id_to_label.transform(recs)
对于推荐,您需要 udf
或这样的表达式:
from pyspark.sql.functions import array, col, lit, struct
n = 3 # Same as numItems
product_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(
product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])
recs.withColumn("recommendations", recommendations)
作为性能问题给出的答案至少在我的情况下花费了太长时间。
你可以使用 IndexToString
我提供了一个简单的代码片段(假设有两个 StringIndexer
用于用户和产品
from pyspark.ml.feature import StringIndexer, IndexToString
idx_to_user = IndexToString(inputCol='userIdIndex',outputCol='user_id').setLabels(self.user_indexer.labels)
idx_to_prod = IndexToString(inputCol='productIdIndex',outputCol='product_id').setLabels(self.prod_indexer.labels)
recoms = idx_to_user.transform(recs)
res = self.idx_to_prod.transform(recoms.select(F.col('user_id'),F.explode('recommendations')).select('user_id','col.productIdIndex','col.rating'))
result = res.select('user_id','product_id','rating')
我正在使用 PySpark 通过 ALS 进行协同过滤。我的原始用户和项目 ID 是字符串,所以我使用 StringIndexer
将它们转换为数字索引(PySpark 的 ALS 模型要求我们这样做)。
拟合模型后,我可以像这样为每个用户获得前 3 个推荐:
recs = (
model
.recommendForAllUsers(3)
)
recs
数据框如下所示:
+-----------+--------------------+
|userIdIndex| recommendations|
+-----------+--------------------+
| 1580|[[10096,3.6725707...|
| 4900|[[10096,3.0137873...|
| 5300|[[10096,2.7274625...|
| 6620|[[10096,2.4493625...|
| 7240|[[10096,2.4928937...|
+-----------+--------------------+
only showing top 5 rows
root
|-- userIdIndex: integer (nullable = false)
|-- recommendations: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- productIdIndex: integer (nullable = true)
| | |-- rating: float (nullable = true)
我想用这个数据框创建一个巨大的 JSOM 转储,我可以这样:
(
recs
.toJSON()
.saveAsTextFile("name_i_must_hide.recs")
)
这些 json 的示例是:
{
"userIdIndex": 1580,
"recommendations": [
{
"productIdIndex": 10096,
"rating": 3.6725707
},
{
"productIdIndex": 10141,
"rating": 3.61542
},
{
"productIdIndex": 11591,
"rating": 3.536216
}
]
}
userIdIndex
和 productIdIndex
键是由于 StringIndexer
转换。
如何取回这些列的原始值?我怀疑我必须使用 IndexToString
变换器,但我不太清楚如何,因为数据嵌套在 recs
数据框内的数组中。
我尝试使用 Pipeline
求值器 (stages=[StringIndexer, ALS, IndexToString]
),但该求值器似乎不支持这些索引器。
干杯!
在这两种情况下,您都需要访问标签列表。这可以使用 StringIndexerModel
user_indexer_model = ... # type: StringIndexerModel
user_labels = user_indexer_model.labels
product_indexer_model = ... # type: StringIndexerModel
product_labels = product_indexer_model.labels
或列元数据。
对于 userIdIndex
你可以申请 IndexToString
:
from pyspark.ml.feature import IndexToString
user_id_to_label = IndexToString(
inputCol="userIdIndex", outputCol="userId", labels=user_labels)
user_id_to_label.transform(recs)
对于推荐,您需要 udf
或这样的表达式:
from pyspark.sql.functions import array, col, lit, struct
n = 3 # Same as numItems
product_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(
product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])
recs.withColumn("recommendations", recommendations)
作为性能问题给出的答案至少在我的情况下花费了太长时间。
你可以使用 IndexToString
我提供了一个简单的代码片段(假设有两个 StringIndexer
用于用户和产品
from pyspark.ml.feature import StringIndexer, IndexToString
idx_to_user = IndexToString(inputCol='userIdIndex',outputCol='user_id').setLabels(self.user_indexer.labels)
idx_to_prod = IndexToString(inputCol='productIdIndex',outputCol='product_id').setLabels(self.prod_indexer.labels)
recoms = idx_to_user.transform(recs)
res = self.idx_to_prod.transform(recoms.select(F.col('user_id'),F.explode('recommendations')).select('user_id','col.productIdIndex','col.rating'))
result = res.select('user_id','product_id','rating')