如何避免一个 Spark Streaming window 阻塞另一个 window 同时 运行 一些原生 Python 代码
How to avoid one Spark Streaming window blocking another window with both running some native Python code
我正在 运行使用两个不同的 windows 宁 Spark Streaming(在 window 上用于使用 SKLearn 训练模型,另一个用于基于该模型预测值)和我我想知道如何避免 window("slow" 训练 window)来训练模型,而没有 "blocking" "fast" 预测 window .
我的简化代码如下所示:
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)
import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)
# send prediction to GUI
except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)
### Window 2 ###
### fit new model ###
def trainModel(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
y = np.array(df.map(lambda lp: lp.label).collect())
# train test split etc...
model = SVR().fit(X_train, y_train)
Custom_ModelContainer.setModel(model)
except Exception, e: print e
modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)
(注:Custom_ModelContainer是我写的一个class用来保存和检索训练好的模型)
我的设置通常工作正常,除了每次在第二个 window 训练新模型(大约需要一分钟)时,第一个 windows 直到模型训练完成。实际上,我想这是有道理的,因为模型拟合和预测都是在主节点上计算的(在非分布式设置中 - 由于 SKLearn)。
所以我的问题如下:是否可以在单个工作节点(而不是主节点)上训练模型?如果是这样,我怎样才能实现后者并真正解决我的问题?
如果没有,关于如何在 window 1 中进行这样的设置而不延迟计算的任何其他建议?
非常感谢任何帮助。
编辑:我想更一般的问题是:
我怎样才能 运行 在两个不同的工作人员上并行执行两个不同的任务?
我想你要找的是 属性: "spark.streaming.concurrentJobs" 默认为 1。增加它应该允许你 运行 并行的多个 foreachRDD 函数。
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
提醒一下,如果您要并行更改和读取,请注意自定义模型容器上的线程安全。 :)
免责声明:这只是一组想法。 None 其中已经过实践检验。
您可以尝试几件事:
不要collect
到predict
。 scikit-learn
模型通常是可序列化的,因此可以在集群上轻松处理预测过程:
def predict(time, rdd):
...
model = Custom_ModelContainer.getmodel()
pred = (df.rdd.map(lambda lp: lp.features.toArray())
.mapPartitions(lambda iter: model.predict(np.array(list(iter)))))
...
它不仅应该并行预测,而且如果未将原始数据传递给 GUI,还应该减少必须收集的数据量。
尝试collect
并异步发送数据。 PySpark 不提供 collectAsync
方法,但您可以尝试使用 concurrent.futures
:
实现类似的效果
from pyspark.rdd import RDD
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def submit_to_gui(*args): ...
def submit_if_success(f):
if not f.exception():
executor.submit(submit_to_gui, f.result())
从 1 继续。
def predict(time, rdd):
...
f = executor.submit(RDD.collect, pred)
f.add_done_callback(submit_if_success)
...
如果您真的想使用本地 scikit-learn
模型,请尝试 collect
和 fit
使用上述期货。您也可以尝试只收集一次,尤其是在数据未缓存的情况下:
def collect_and_train(df):
y, X = zip(*((p.label, p.features.toArray()) for p in df.collect()))
...
return SVR().fit(X_train, y_train)
def set_if_success(f):
if not f.exception():
Custom_ModelContainer.setModel(f.result())
def trainModel(time, rdd):
...
f = excutor.submit(collect_and_train, df)
f.add_done_callback(set_if_success)
...
使用已有的解决方案(如 spark-sklearn
或自定义方法)将训练过程移至集群:
- 简单的解决方案 - 准备数据,
coalesce(1)
并使用 mapPartitions
训练单个模型。
- 分布式解决方案 - 使用
mapPartitions
为每个分区创建和验证一个单独的模型,收集模型并作为一个整体使用,例如通过平均或中值预测。
抛弃scikit-learn
并使用可以在分布式流式环境中训练和维护的模型(例如StreamingLinearRegressionWithSGD
)。
您当前的方法使 Spark 过时了。如果您可以在本地训练模型,那么您很有可能可以在本地机器上更快地执行所有其他任务。否则你的程序将在 collect
.
上失败
我正在 运行使用两个不同的 windows 宁 Spark Streaming(在 window 上用于使用 SKLearn 训练模型,另一个用于基于该模型预测值)和我我想知道如何避免 window("slow" 训练 window)来训练模型,而没有 "blocking" "fast" 预测 window .
我的简化代码如下所示:
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)
import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)
# send prediction to GUI
except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)
### Window 2 ###
### fit new model ###
def trainModel(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
y = np.array(df.map(lambda lp: lp.label).collect())
# train test split etc...
model = SVR().fit(X_train, y_train)
Custom_ModelContainer.setModel(model)
except Exception, e: print e
modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)
(注:Custom_ModelContainer是我写的一个class用来保存和检索训练好的模型)
我的设置通常工作正常,除了每次在第二个 window 训练新模型(大约需要一分钟)时,第一个 windows 直到模型训练完成。实际上,我想这是有道理的,因为模型拟合和预测都是在主节点上计算的(在非分布式设置中 - 由于 SKLearn)。
所以我的问题如下:是否可以在单个工作节点(而不是主节点)上训练模型?如果是这样,我怎样才能实现后者并真正解决我的问题?
如果没有,关于如何在 window 1 中进行这样的设置而不延迟计算的任何其他建议?
非常感谢任何帮助。
编辑:我想更一般的问题是: 我怎样才能 运行 在两个不同的工作人员上并行执行两个不同的任务?
我想你要找的是 属性: "spark.streaming.concurrentJobs" 默认为 1。增加它应该允许你 运行 并行的多个 foreachRDD 函数。
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
提醒一下,如果您要并行更改和读取,请注意自定义模型容器上的线程安全。 :)
免责声明:这只是一组想法。 None 其中已经过实践检验。
您可以尝试几件事:
不要
collect
到predict
。scikit-learn
模型通常是可序列化的,因此可以在集群上轻松处理预测过程:def predict(time, rdd): ... model = Custom_ModelContainer.getmodel() pred = (df.rdd.map(lambda lp: lp.features.toArray()) .mapPartitions(lambda iter: model.predict(np.array(list(iter))))) ...
它不仅应该并行预测,而且如果未将原始数据传递给 GUI,还应该减少必须收集的数据量。
尝试
实现类似的效果collect
并异步发送数据。 PySpark 不提供collectAsync
方法,但您可以尝试使用concurrent.futures
:from pyspark.rdd import RDD from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=4) def submit_to_gui(*args): ... def submit_if_success(f): if not f.exception(): executor.submit(submit_to_gui, f.result())
从 1 继续。
def predict(time, rdd): ... f = executor.submit(RDD.collect, pred) f.add_done_callback(submit_if_success) ...
如果您真的想使用本地
scikit-learn
模型,请尝试collect
和fit
使用上述期货。您也可以尝试只收集一次,尤其是在数据未缓存的情况下:def collect_and_train(df): y, X = zip(*((p.label, p.features.toArray()) for p in df.collect())) ... return SVR().fit(X_train, y_train) def set_if_success(f): if not f.exception(): Custom_ModelContainer.setModel(f.result()) def trainModel(time, rdd): ... f = excutor.submit(collect_and_train, df) f.add_done_callback(set_if_success) ...
使用已有的解决方案(如
spark-sklearn
或自定义方法)将训练过程移至集群:- 简单的解决方案 - 准备数据,
coalesce(1)
并使用mapPartitions
训练单个模型。 - 分布式解决方案 - 使用
mapPartitions
为每个分区创建和验证一个单独的模型,收集模型并作为一个整体使用,例如通过平均或中值预测。
- 简单的解决方案 - 准备数据,
抛弃
scikit-learn
并使用可以在分布式流式环境中训练和维护的模型(例如StreamingLinearRegressionWithSGD
)。您当前的方法使 Spark 过时了。如果您可以在本地训练模型,那么您很有可能可以在本地机器上更快地执行所有其他任务。否则你的程序将在
collect
. 上失败