Pyspark 朴素贝叶斯批量使用拟合
Pyspark Naive Bayes using fit in batches
我一直在尝试在大型数据库 (30GB) 上训练朴素贝叶斯分类器。
由于内存限制,我必须将数据库查询分成多个批次。
我正在使用此处所示的管道:
categoryIndexer = StringIndexer(inputCol="diff", outputCol="label")
tokenizer = Tokenizer(inputCol="text", outputCol="raw")
remover = StopWordsRemover(inputCol="raw", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=100000)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
pipeline = Pipeline(stages=[categoryIndexer, tokenizer, remover, hashingTF, nb])
然后在 for 循环中使用拟合。
for i in range(0,365):
df = sqlContext.read.jdbc(url=url,table="(SELECT text, diff FROM tweets INNER JOIN djitf ON tweets.created = djitf.day WHERE id > "+ str(i*1000000)+ "AND id <"+ str((i+1)*1000000)+") as table1", properties=properties)
train_data, test_data = df.randomSplit([0.8, 0.2])
model = pipeline.fit(train_data)
但是我的结果表明,每次我在管道上调用拟合函数时,模型都会被覆盖。我怎样才能保留已经拟合的数据,并添加到它?
我是否缺少参数或其他内容?例如在 Sklearn 中有 partial_fit
方法
没有遗漏的参数。 Spark 不支持增量拟合,因此不需要。 Spark 可以轻松处理大于内存的数据,可能使用磁盘缓存。如果 30GB 的数据对于您的资源来说仍然太多,那么您根本不应该使用 Spark。
如果问题只是在读取时使用谓词:
predicates = [
"id > {0} AND id < {1}".format(i * 1000000, (i + 1) * 1000000)
for i in range(0, 365)
]
df = sqlContext.read.jdbc(
url=url,
table="""(SELECT text, diff
FROM tweets INNER
JOIN djitf ON tweets.created = djitf.day") as table1""",
predicates=predicates,
properties=properties)
或 JDBC reader 的范围:
df = sqlContext.read.jdbc(
url=url,
table="""(SELECT cast(id, INTEGER), text, diff
FROM tweets INNER
JOIN djitf ON tweets.created = djitf.day") as table1""",
column="id", lowerBound=0, upperBound=366 * 1000000, numPartitions=300)
我一直在尝试在大型数据库 (30GB) 上训练朴素贝叶斯分类器。 由于内存限制,我必须将数据库查询分成多个批次。
我正在使用此处所示的管道:
categoryIndexer = StringIndexer(inputCol="diff", outputCol="label")
tokenizer = Tokenizer(inputCol="text", outputCol="raw")
remover = StopWordsRemover(inputCol="raw", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=100000)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
pipeline = Pipeline(stages=[categoryIndexer, tokenizer, remover, hashingTF, nb])
然后在 for 循环中使用拟合。
for i in range(0,365):
df = sqlContext.read.jdbc(url=url,table="(SELECT text, diff FROM tweets INNER JOIN djitf ON tweets.created = djitf.day WHERE id > "+ str(i*1000000)+ "AND id <"+ str((i+1)*1000000)+") as table1", properties=properties)
train_data, test_data = df.randomSplit([0.8, 0.2])
model = pipeline.fit(train_data)
但是我的结果表明,每次我在管道上调用拟合函数时,模型都会被覆盖。我怎样才能保留已经拟合的数据,并添加到它?
我是否缺少参数或其他内容?例如在 Sklearn 中有 partial_fit
方法
没有遗漏的参数。 Spark 不支持增量拟合,因此不需要。 Spark 可以轻松处理大于内存的数据,可能使用磁盘缓存。如果 30GB 的数据对于您的资源来说仍然太多,那么您根本不应该使用 Spark。
如果问题只是在读取时使用谓词:
predicates = [
"id > {0} AND id < {1}".format(i * 1000000, (i + 1) * 1000000)
for i in range(0, 365)
]
df = sqlContext.read.jdbc(
url=url,
table="""(SELECT text, diff
FROM tweets INNER
JOIN djitf ON tweets.created = djitf.day") as table1""",
predicates=predicates,
properties=properties)
或 JDBC reader 的范围:
df = sqlContext.read.jdbc(
url=url,
table="""(SELECT cast(id, INTEGER), text, diff
FROM tweets INNER
JOIN djitf ON tweets.created = djitf.day") as table1""",
column="id", lowerBound=0, upperBound=366 * 1000000, numPartitions=300)