在 DoFn 中下载文件
Downloading a file in a DoFn
尚不清楚在 DoFn
内下载文件是否安全。
我的 DoFn
将下载一个 ~20MB 的文件(一个 ML 模型)以应用于我的管道中的元素。根据 Beam 文档,requirements 包括可串行性和线程兼容性。
一个例子(1, 2)与我的DoFn
非常相似。它演示了从 GCP 存储桶下载(正如我使用 DataflowRunner 所做的那样),但我不确定这种方法是否安全。
是否应该将对象下载到 in-memory bytes buffer 而不是下载到磁盘,或者是否有针对此用例的其他最佳实践?我还没有遇到这种模式的最佳实践方法。
如果它是一个 scikit-learn 模型,那么您可以考虑将其托管在 Cloud ML Engine 中并将其公开为 REST 端点。然后,您可以使用 BagState
之类的东西来优化网络上的模型调用。可以在这个 link https://beam.apache.org/blog/2017/08/28/timely-processing.html
中找到更多详细信息
添加 答案。
如果您的模型数据是静态的,那么您可以使用下面的代码示例将您的模型作为辅助输入传递。
#DoFn to open the model from GCS location
class get_model(beam.DoFn):
def process(self, element):
from apache_beam.io.gcp import gcsio
logging.info('reading model from GCS')
gcs = gcsio.GcsIO()
yield gcs.open(element)
#Pipeline to load pickle file from GCS bucket
model_step = (p
| 'start' >> beam.Create(['gs://somebucket/model'])
| 'load_model' >> beam.ParDo(get_model())
| 'unpickle_model' >> beam.Map(lambda bin: dill.load(bin)))
#DoFn to predict the results.
class predict(beam.DoFn):
def process(self, element, model):
(features, clients) = element
result = model.predict_proba(features)[:, 1]
return [(clients, result)]
#main pipeline to get input and predict results.
_ = (p
| 'get_input' >> #get input based on source and preprocess it.
| 'predict_sk_model' >> beam.ParDo(predict(), beam.pvalue.AsSingleton(model_step))
| 'write' >> #write output based on target.
在流式管道的情况下,如果您想在预定义时间后再次加载模型,您可以检查 "Slowly-changing lookup cache" 模式 here。
尚不清楚在 DoFn
内下载文件是否安全。
我的 DoFn
将下载一个 ~20MB 的文件(一个 ML 模型)以应用于我的管道中的元素。根据 Beam 文档,requirements 包括可串行性和线程兼容性。
一个例子(1, 2)与我的DoFn
非常相似。它演示了从 GCP 存储桶下载(正如我使用 DataflowRunner 所做的那样),但我不确定这种方法是否安全。
是否应该将对象下载到 in-memory bytes buffer 而不是下载到磁盘,或者是否有针对此用例的其他最佳实践?我还没有遇到这种模式的最佳实践方法。
如果它是一个 scikit-learn 模型,那么您可以考虑将其托管在 Cloud ML Engine 中并将其公开为 REST 端点。然后,您可以使用 BagState
之类的东西来优化网络上的模型调用。可以在这个 link https://beam.apache.org/blog/2017/08/28/timely-processing.html
添加
如果您的模型数据是静态的,那么您可以使用下面的代码示例将您的模型作为辅助输入传递。
#DoFn to open the model from GCS location
class get_model(beam.DoFn):
def process(self, element):
from apache_beam.io.gcp import gcsio
logging.info('reading model from GCS')
gcs = gcsio.GcsIO()
yield gcs.open(element)
#Pipeline to load pickle file from GCS bucket
model_step = (p
| 'start' >> beam.Create(['gs://somebucket/model'])
| 'load_model' >> beam.ParDo(get_model())
| 'unpickle_model' >> beam.Map(lambda bin: dill.load(bin)))
#DoFn to predict the results.
class predict(beam.DoFn):
def process(self, element, model):
(features, clients) = element
result = model.predict_proba(features)[:, 1]
return [(clients, result)]
#main pipeline to get input and predict results.
_ = (p
| 'get_input' >> #get input based on source and preprocess it.
| 'predict_sk_model' >> beam.ParDo(predict(), beam.pvalue.AsSingleton(model_step))
| 'write' >> #write output based on target.
在流式管道的情况下,如果您想在预定义时间后再次加载模型,您可以检查 "Slowly-changing lookup cache" 模式 here。