使用 PySpark 1.6 为 LDA 训练准备数据
Preparing data for LDA training with PySpark 1.6
我有一个正在读入 spark 数据框的文档语料库。
我对文本进行了标记化和矢量化,现在我想将矢量化数据输入到 mllib LDA
模型中。 LDA
API 文档似乎要求数据为:
rdd – 文档的 RDD,它是文档 ID 和术语(单词)计数向量的元组。术语计数向量是具有固定大小词汇表的“词袋”(其中词汇表大小是向量的长度)。文档 ID 必须是唯一的并且 >= 0。
如何从我的数据框获取合适的 rdd?
from pyspark.mllib.clustering import LDA
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
#read the data
tf = sc.wholeTextFiles("20_newsgroups/*")
#transform into a data frame
df = tf.toDF(schema=['file','text'])
#tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized = tokenizer.transform(df)
#vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
model = cv.fit(tokenized)
result = model.transform(tokenized)
#transform into a suitable rdd
myrdd = ?
#LDA
model = LDA.train(myrdd, k=2, seed=1)
PS : 我正在使用 Apache Spark 1.6.3
让我们先组织导入,读取数据,做一些简单的特殊字符去除并将其转换为 DataFrame
:
import re # needed to remove special character
from pyspark import Row
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.mllib.clustering import LDA
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType
pattern = re.compile('[\W_]+')
rdd = sc.wholeTextFiles("./data/20news-bydate/*/*/*") \
.mapValues(lambda x: pattern.sub(' ', x)).cache() # ref.
df = rdd.toDF(schema=['file', 'text'])
我们需要为每个 Row
添加一个索引。以下代码片段的灵感来自这个关于添加 :
的问题
row_with_index = Row(*["id"] + df.columns)
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
indexed = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
一旦我们添加了索引,我们就可以进行特征清理、提取和转换:
# tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(indexed)
# remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="words")
cleaned = remover.transform(tokenized)
# vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
count_vectorizer_model = cv.fit(cleaned)
result = count_vectorizer_model.transform(cleaned)
现在,让我们将结果数据框转换回 rdd
corpus = result.select(F.col('id').cast("long"), 'vectors').rdd \
.map(lambda x: [x[0], x[1]])
我们的数据现在可以训练了:
# training data
lda_model = LDA.train(rdd=corpus, k=10, seed=12, maxIterations=50)
# extracting topics
topics = lda_model.describeTopics(maxTermsPerTopic=10)
# extraction vocabulary
vocabulary = count_vectorizer_model.vocabulary
我们现在可以打印主题描述如下:
for topic in range(len(topics)):
print("topic {} : ".format(topic))
words = topics[topic][0]
scores = topics[topic][1]
[print(vocabulary[words[word]], "->", scores[word]) for word in range(len(words))]
PS :上面的代码是用 Spark 1.6.3.
测试的
我有一个正在读入 spark 数据框的文档语料库。
我对文本进行了标记化和矢量化,现在我想将矢量化数据输入到 mllib LDA
模型中。 LDA
API 文档似乎要求数据为:
rdd – 文档的 RDD,它是文档 ID 和术语(单词)计数向量的元组。术语计数向量是具有固定大小词汇表的“词袋”(其中词汇表大小是向量的长度)。文档 ID 必须是唯一的并且 >= 0。
如何从我的数据框获取合适的 rdd?
from pyspark.mllib.clustering import LDA
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
#read the data
tf = sc.wholeTextFiles("20_newsgroups/*")
#transform into a data frame
df = tf.toDF(schema=['file','text'])
#tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized = tokenizer.transform(df)
#vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
model = cv.fit(tokenized)
result = model.transform(tokenized)
#transform into a suitable rdd
myrdd = ?
#LDA
model = LDA.train(myrdd, k=2, seed=1)
PS : 我正在使用 Apache Spark 1.6.3
让我们先组织导入,读取数据,做一些简单的特殊字符去除并将其转换为 DataFrame
:
import re # needed to remove special character
from pyspark import Row
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.mllib.clustering import LDA
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType
pattern = re.compile('[\W_]+')
rdd = sc.wholeTextFiles("./data/20news-bydate/*/*/*") \
.mapValues(lambda x: pattern.sub(' ', x)).cache() # ref.
df = rdd.toDF(schema=['file', 'text'])
我们需要为每个 Row
添加一个索引。以下代码片段的灵感来自这个关于添加
row_with_index = Row(*["id"] + df.columns)
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
indexed = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
一旦我们添加了索引,我们就可以进行特征清理、提取和转换:
# tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(indexed)
# remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="words")
cleaned = remover.transform(tokenized)
# vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
count_vectorizer_model = cv.fit(cleaned)
result = count_vectorizer_model.transform(cleaned)
现在,让我们将结果数据框转换回 rdd
corpus = result.select(F.col('id').cast("long"), 'vectors').rdd \
.map(lambda x: [x[0], x[1]])
我们的数据现在可以训练了:
# training data
lda_model = LDA.train(rdd=corpus, k=10, seed=12, maxIterations=50)
# extracting topics
topics = lda_model.describeTopics(maxTermsPerTopic=10)
# extraction vocabulary
vocabulary = count_vectorizer_model.vocabulary
我们现在可以打印主题描述如下:
for topic in range(len(topics)):
print("topic {} : ".format(topic))
words = topics[topic][0]
scores = topics[topic][1]
[print(vocabulary[words[word]], "->", scores[word]) for word in range(len(words))]
PS :上面的代码是用 Spark 1.6.3.
测试的