如何将 SparseVectors 传递给 pyspark 中的 `mllib`
How to pass SparseVectors to `mllib` in pyspark
我通过 Zeppelin 使用 pyspark 1.6.3 和 python 3.5.
我正在尝试使用 pyspark CountVectorizer
和 LDA
函数实现 Latent Dirichlet Allocation。首先,问题:这是我正在使用的代码。让 df
成为一个 spark 数据框,在 'tokenized'
列中包含标记化文本
vectors = 'vectors'
cv = CountVectorizer(inputCol = 'tokenized', outputCol = vectors)
model = cv.fit(df)
df = model.transform(df)
corpus = df.select(vectors).rdd.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
ldaModel = LDA.train(corpus, k=25)
此代码或多或少取自pyspark api docs。
在调用 LDA
时出现以下错误:
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
internet 告诉我这是由于类型不匹配造成的。
所以让我们看看 LDA
和 CountVectorizer
的类型。来自 spark 文档的另一个 example 稀疏向量进入 LDA
:
>>> from pyspark.mllib.linalg import Vectors, SparseVector
>>> data = [
... [1, Vectors.dense([0.0, 1.0])],
... [2, SparseVector(2, {0: 1.0})],
... ]
>>> rdd = sc.parallelize(data)
>>> model = LDA.train(rdd, k=2, seed=1)
我自己实现了这个,这就是 rdd
的样子:
>> testrdd.take(2)
[[1, DenseVector([0.0, 1.0])], [2, SparseVector(2, {0: 1.0})]]
另一方面,如果我转到我的原始代码并查看 corpus
输出为 CountVectorizer
的 rdd,我看到(编辑以删除无关的位):
>> corpus.take(3)
[[0, Row(vectors=SparseVector(130593, {0: 30.0, 1: 13.0, ...
[1, Row(vectors=SparseVector(130593, {0: 52.0, 1: 44.0, ...
[2, Row(vectors=SparseVector(130593, {0: 14.0, 1: 6.0, ...
]
所以我使用的示例(来自文档!)不会生成 (index, SparseVector) 的元组,而是生成 (index, Row(SparseVector))... 之类的东西?
问题:
- SparseVector 周围的 Row 包装器是否导致此错误?
- 如果是这样,我该如何摆脱 Row 对象? row是一个df的属性,但是我用
df.rdd
转成了一个rdd;我还需要做什么?
这可能是问题所在。只需从 Row
对象中提取 vectors
。
corpus = df.select(vectors).rdd.zipWithIndex().map(lambda x: [x[1], x[0]['vectors']]).cache()
我通过 Zeppelin 使用 pyspark 1.6.3 和 python 3.5.
我正在尝试使用 pyspark CountVectorizer
和 LDA
函数实现 Latent Dirichlet Allocation。首先,问题:这是我正在使用的代码。让 df
成为一个 spark 数据框,在 'tokenized'
vectors = 'vectors'
cv = CountVectorizer(inputCol = 'tokenized', outputCol = vectors)
model = cv.fit(df)
df = model.transform(df)
corpus = df.select(vectors).rdd.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
ldaModel = LDA.train(corpus, k=25)
此代码或多或少取自pyspark api docs。
在调用 LDA
时出现以下错误:
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
internet 告诉我这是由于类型不匹配造成的。
所以让我们看看 LDA
和 CountVectorizer
的类型。来自 spark 文档的另一个 example 稀疏向量进入 LDA
:
>>> from pyspark.mllib.linalg import Vectors, SparseVector
>>> data = [
... [1, Vectors.dense([0.0, 1.0])],
... [2, SparseVector(2, {0: 1.0})],
... ]
>>> rdd = sc.parallelize(data)
>>> model = LDA.train(rdd, k=2, seed=1)
我自己实现了这个,这就是 rdd
的样子:
>> testrdd.take(2)
[[1, DenseVector([0.0, 1.0])], [2, SparseVector(2, {0: 1.0})]]
另一方面,如果我转到我的原始代码并查看 corpus
输出为 CountVectorizer
的 rdd,我看到(编辑以删除无关的位):
>> corpus.take(3)
[[0, Row(vectors=SparseVector(130593, {0: 30.0, 1: 13.0, ...
[1, Row(vectors=SparseVector(130593, {0: 52.0, 1: 44.0, ...
[2, Row(vectors=SparseVector(130593, {0: 14.0, 1: 6.0, ...
]
所以我使用的示例(来自文档!)不会生成 (index, SparseVector) 的元组,而是生成 (index, Row(SparseVector))... 之类的东西?
问题:
- SparseVector 周围的 Row 包装器是否导致此错误?
- 如果是这样,我该如何摆脱 Row 对象? row是一个df的属性,但是我用
df.rdd
转成了一个rdd;我还需要做什么?
这可能是问题所在。只需从 Row
对象中提取 vectors
。
corpus = df.select(vectors).rdd.zipWithIndex().map(lambda x: [x[1], x[0]['vectors']]).cache()