稀疏向量 pyspark
Sparse Vector pyspark
我想找到一种使用数据帧在 PySpark 中创建备用向量的有效方法。
假设交易输入:
df = spark.createDataFrame([
(0, "a"),
(1, "a"),
(1, "b"),
(1, "c"),
(2, "a"),
(2, "b"),
(2, "b"),
(2, "b"),
(2, "c"),
(0, "a"),
(1, "b"),
(1, "b"),
(2, "cc"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
+---+--------+
| id|category|
+---+--------+
| 0| a|
| 1| a|
| 1| b|
| 1| c|
| 2| a|
| 2| b|
| 2| b|
| 2| b|
| 2| c|
| 0| a|
| 1| b|
| 1| b|
| 2| cc|
| 3| a|
| 4| a|
| 5| c|
+---+--------+
总结格式:
df.groupBy(df["id"],df["category"]).count().show()
+---+--------+-----+
| id|category|count|
+---+--------+-----+
| 1| b| 3|
| 1| a| 1|
| 1| c| 1|
| 2| cc| 1|
| 2| c| 1|
| 2| a| 1|
| 1| a| 1|
| 0| a| 2|
+---+--------+-----+
我的目标是通过 id 获得此输出:
+---+-----------------------------------------------+
| id| feature |
+---+-----------------------------------------------+
| 2|SparseVector({a: 1.0, b: 3.0, c: 1.0, cc: 1.0})|
你能给我指明正确的方向吗?在 Java 中使用 mapreduce 对我来说似乎更容易。
如果将数据帧转换为 RDD,则可以遵循类似 mapreduce 的框架 reduceByKey
。这里唯一真正棘手的部分是格式化 spark sparseVector
的日期
导入包,创建数据
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
df = sqlContext.createDataFrame([
(0, "a"),
(1, "a"),
(1, "b"),
(1, "c"),
(2, "a"),
(2, "b"),
(2, "b"),
(2, "b"),
(2, "c"),
(0, "a"),
(1, "b"),
(1, "b"),
(2, "cc"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
为类别创建数值表示(稀疏向量需要)
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
df = indexer.fit(df).transform(df)
按索引分组,获取计数
df = df.groupBy(df["id"],df["categoryIndex"]).count()
转换为rdd,将数据映射到id & [categoryIndex, count]的键值对
rdd = df.rdd.map(lambda x: (x.id, [(x.categoryIndex, x['count'])]))
按键归约以获取 id 的键值对以及该 id 的所有 [categoryIndex, count] 的列表
rdd = rdd.reduceByKey(lambda a, b: a + b)
映射数据以将每个 id 的所有 [categoryIndex, count] 的列表转换为稀疏向量
rdd = rdd.map(lambda x: (x[0], Vectors.sparse(len(x[1]), x[1])))
转换回数据帧
finalDf = sqlContext.createDataFrame(rdd, ['id', 'feature'])
数据检查
finalDf.take(5)
[Row(id=0, feature=SparseVector(1, {1: 2.0})),
Row(id=1, feature=SparseVector(3, {0: 3.0, 1: 1.0, 2: 1.0})),
Row(id=2, feature=SparseVector(4, {0: 3.0, 1: 1.0, 2: 1.0, 3: 1.0})),
Row(id=3, feature=SparseVector(1, {1: 1.0})),
Row(id=4, feature=SparseVector(1, {1: 1.0}))]
这可以通过 pivot
和 VectorAssembler
轻松完成。将聚合替换为 pivot
:
pivoted = df.groupBy("id").pivot("category").count().na.fill(0)
和assemble:
from pyspark.ml.feature import VectorAssembler
input_cols = [x for x in pivoted.columns if x != id]
result = (VectorAssembler(inputCols=input_cols, outputCol="features")
.transform(pivoted)
.select("id", "features"))
结果如下。这将根据稀疏性选择更有效的表示:
+---+---------------------+
|id |features |
+---+---------------------+
|0 |(5,[1],[2.0]) |
|5 |(5,[0,3],[5.0,1.0]) |
|1 |[1.0,1.0,3.0,1.0,0.0]|
|3 |(5,[0,1],[3.0,1.0]) |
|2 |[2.0,1.0,3.0,1.0,1.0]|
|4 |(5,[0,1],[4.0,1.0]) |
+---+---------------------+
但当然你仍然可以将其转换为单一表示:
from pyspark.ml.linalg import SparseVector, VectorUDT
import numpy as np
def to_sparse(c):
def to_sparse_(v):
if isinstance(v, SparseVector):
return v
vs = v.toArray()
nonzero = np.nonzero(vs)[0]
return SparseVector(v.size, nonzero, vs[nonzero])
return udf(to_sparse_, VectorUDT())(c)
+---+-------------------------------------+
|id |features |
+---+-------------------------------------+
|0 |(5,[1],[2.0]) |
|5 |(5,[0,3],[5.0,1.0]) |
|1 |(5,[0,1,2,3],[1.0,1.0,3.0,1.0]) |
|3 |(5,[0,1],[3.0,1.0]) |
|2 |(5,[0,1,2,3,4],[2.0,1.0,3.0,1.0,1.0])|
|4 |(5,[0,1],[4.0,1.0]) |
+---+-------------------------------------+
我想找到一种使用数据帧在 PySpark 中创建备用向量的有效方法。
假设交易输入:
df = spark.createDataFrame([
(0, "a"),
(1, "a"),
(1, "b"),
(1, "c"),
(2, "a"),
(2, "b"),
(2, "b"),
(2, "b"),
(2, "c"),
(0, "a"),
(1, "b"),
(1, "b"),
(2, "cc"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
+---+--------+
| id|category|
+---+--------+
| 0| a|
| 1| a|
| 1| b|
| 1| c|
| 2| a|
| 2| b|
| 2| b|
| 2| b|
| 2| c|
| 0| a|
| 1| b|
| 1| b|
| 2| cc|
| 3| a|
| 4| a|
| 5| c|
+---+--------+
总结格式:
df.groupBy(df["id"],df["category"]).count().show()
+---+--------+-----+
| id|category|count|
+---+--------+-----+
| 1| b| 3|
| 1| a| 1|
| 1| c| 1|
| 2| cc| 1|
| 2| c| 1|
| 2| a| 1|
| 1| a| 1|
| 0| a| 2|
+---+--------+-----+
我的目标是通过 id 获得此输出:
+---+-----------------------------------------------+
| id| feature |
+---+-----------------------------------------------+
| 2|SparseVector({a: 1.0, b: 3.0, c: 1.0, cc: 1.0})|
你能给我指明正确的方向吗?在 Java 中使用 mapreduce 对我来说似乎更容易。
如果将数据帧转换为 RDD,则可以遵循类似 mapreduce 的框架 reduceByKey
。这里唯一真正棘手的部分是格式化 spark sparseVector
导入包,创建数据
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
df = sqlContext.createDataFrame([
(0, "a"),
(1, "a"),
(1, "b"),
(1, "c"),
(2, "a"),
(2, "b"),
(2, "b"),
(2, "b"),
(2, "c"),
(0, "a"),
(1, "b"),
(1, "b"),
(2, "cc"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
为类别创建数值表示(稀疏向量需要)
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
df = indexer.fit(df).transform(df)
按索引分组,获取计数
df = df.groupBy(df["id"],df["categoryIndex"]).count()
转换为rdd,将数据映射到id & [categoryIndex, count]的键值对
rdd = df.rdd.map(lambda x: (x.id, [(x.categoryIndex, x['count'])]))
按键归约以获取 id 的键值对以及该 id 的所有 [categoryIndex, count] 的列表
rdd = rdd.reduceByKey(lambda a, b: a + b)
映射数据以将每个 id 的所有 [categoryIndex, count] 的列表转换为稀疏向量
rdd = rdd.map(lambda x: (x[0], Vectors.sparse(len(x[1]), x[1])))
转换回数据帧
finalDf = sqlContext.createDataFrame(rdd, ['id', 'feature'])
数据检查
finalDf.take(5)
[Row(id=0, feature=SparseVector(1, {1: 2.0})),
Row(id=1, feature=SparseVector(3, {0: 3.0, 1: 1.0, 2: 1.0})),
Row(id=2, feature=SparseVector(4, {0: 3.0, 1: 1.0, 2: 1.0, 3: 1.0})),
Row(id=3, feature=SparseVector(1, {1: 1.0})),
Row(id=4, feature=SparseVector(1, {1: 1.0}))]
这可以通过 pivot
和 VectorAssembler
轻松完成。将聚合替换为 pivot
:
pivoted = df.groupBy("id").pivot("category").count().na.fill(0)
和assemble:
from pyspark.ml.feature import VectorAssembler
input_cols = [x for x in pivoted.columns if x != id]
result = (VectorAssembler(inputCols=input_cols, outputCol="features")
.transform(pivoted)
.select("id", "features"))
结果如下。这将根据稀疏性选择更有效的表示:
+---+---------------------+
|id |features |
+---+---------------------+
|0 |(5,[1],[2.0]) |
|5 |(5,[0,3],[5.0,1.0]) |
|1 |[1.0,1.0,3.0,1.0,0.0]|
|3 |(5,[0,1],[3.0,1.0]) |
|2 |[2.0,1.0,3.0,1.0,1.0]|
|4 |(5,[0,1],[4.0,1.0]) |
+---+---------------------+
但当然你仍然可以将其转换为单一表示:
from pyspark.ml.linalg import SparseVector, VectorUDT
import numpy as np
def to_sparse(c):
def to_sparse_(v):
if isinstance(v, SparseVector):
return v
vs = v.toArray()
nonzero = np.nonzero(vs)[0]
return SparseVector(v.size, nonzero, vs[nonzero])
return udf(to_sparse_, VectorUDT())(c)
+---+-------------------------------------+
|id |features |
+---+-------------------------------------+
|0 |(5,[1],[2.0]) |
|5 |(5,[0,3],[5.0,1.0]) |
|1 |(5,[0,1,2,3],[1.0,1.0,3.0,1.0]) |
|3 |(5,[0,1],[3.0,1.0]) |
|2 |(5,[0,1,2,3,4],[2.0,1.0,3.0,1.0,1.0])|
|4 |(5,[0,1],[4.0,1.0]) |
+---+-------------------------------------+