如何部署 Google 数据流工作者并将文件加载到内存中?
How to deploy a Google dataflow worker with a file loaded into memory?
我正在尝试部署 Google 数据流流式处理以用于我的机器学习流式处理管道,但似乎无法使用已加载到内存中的文件来部署工作程序。目前,我已设置作业以从 GCS 存储桶中提取 pickle 文件,将其加载到内存中,并将其用于模型预测。但这是在作业的每个周期执行的,即每次新对象进入数据流管道时从 GCS 中提取 - 这意味着管道的当前执行速度比需要的慢得多。
我真正需要的是一种在设置每个工作人员时在工作人员节点内分配变量的方法。然后在管道中使用该变量,而无需在每次执行管道时重新加载。
有没有办法在部署作业之前执行此步骤,例如
with open('model.pkl', 'rb') as file:
pickle_model = pickle.load(file)
但是在我的 setup.py 文件中?
##### based on - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/juliaset/setup.py
"""Setup.py module for the workflow's worker utilities.
All the workflow related code is gathered in a package that will be built as a
source distribution, staged in the staging area for the workflow being run and
then installed in the workers when they start running.
This behavior is triggered by specifying the --setup_file command line option
when running the workflow for remote execution.
"""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import print_function
import subprocess
from distutils.command.build import build as _build # type: ignore
import setuptools
# This class handles the pip install mechanism.
class build(_build): # pylint: disable=invalid-name
"""A build command class that will be invoked during package install.
The package built using the current setup.py will be staged and later
installed in the worker using `pip install package'. This class will be
instantiated during install for this specific scenario and will trigger
running the custom commands specified.
"""
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [['pip', 'install', 'scikit-learn==0.23.1']]
CUSTOM_COMMANDS = [['pip', 'install', 'google-cloud-storage']]
CUSTOM_COMMANDS = [['pip', 'install', 'mlxtend']]
class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)
REQUIRED_PACKAGES = [
'google-cloud-storage',
'mlxtend',
'scikit-learn==0.23.1',
]
setuptools.setup(
name='ML pipeline',
version='0.0.1',
description='ML set workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
})
当前 ML 加载机制的片段:
class MlModel(beam.DoFn):
def __init__(self):
self._model = None
from google.cloud import storage
import pandas as pd
import pickle as pkl
self._storage = storage
self._pkl = pkl
self._pd = pd
def process(self,element):
if self._model is None:
bucket = self._storage.Client().get_bucket(myBucket)
blob = bucket.get_blob(myBlob)
self._model = self._pkl.loads(blob.download_as_string())
new_df = self._pd.read_json(element, orient='records').iloc[:, 3:-1]
predict = self._model.predict(new_df)
df = self._pd.DataFrame(data=predict, columns=["A", "B"])
A = df.iloc[0]['A']
B = df.iloc[0]['B']
d = {'A':A, 'B':B}
return [d]
您可以在 MlModel
DoFn
方法中使用 @Setup
方法,您可以在其中加载模型,然后在 @Process
方法中使用它。 @Setup
方法在每个 worker 初始化时被调用一次。
我写过类似的回答here
HTH
我正在尝试部署 Google 数据流流式处理以用于我的机器学习流式处理管道,但似乎无法使用已加载到内存中的文件来部署工作程序。目前,我已设置作业以从 GCS 存储桶中提取 pickle 文件,将其加载到内存中,并将其用于模型预测。但这是在作业的每个周期执行的,即每次新对象进入数据流管道时从 GCS 中提取 - 这意味着管道的当前执行速度比需要的慢得多。
我真正需要的是一种在设置每个工作人员时在工作人员节点内分配变量的方法。然后在管道中使用该变量,而无需在每次执行管道时重新加载。
有没有办法在部署作业之前执行此步骤,例如
with open('model.pkl', 'rb') as file:
pickle_model = pickle.load(file)
但是在我的 setup.py 文件中?
##### based on - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/juliaset/setup.py
"""Setup.py module for the workflow's worker utilities.
All the workflow related code is gathered in a package that will be built as a
source distribution, staged in the staging area for the workflow being run and
then installed in the workers when they start running.
This behavior is triggered by specifying the --setup_file command line option
when running the workflow for remote execution.
"""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import print_function
import subprocess
from distutils.command.build import build as _build # type: ignore
import setuptools
# This class handles the pip install mechanism.
class build(_build): # pylint: disable=invalid-name
"""A build command class that will be invoked during package install.
The package built using the current setup.py will be staged and later
installed in the worker using `pip install package'. This class will be
instantiated during install for this specific scenario and will trigger
running the custom commands specified.
"""
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [['pip', 'install', 'scikit-learn==0.23.1']]
CUSTOM_COMMANDS = [['pip', 'install', 'google-cloud-storage']]
CUSTOM_COMMANDS = [['pip', 'install', 'mlxtend']]
class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)
REQUIRED_PACKAGES = [
'google-cloud-storage',
'mlxtend',
'scikit-learn==0.23.1',
]
setuptools.setup(
name='ML pipeline',
version='0.0.1',
description='ML set workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
})
当前 ML 加载机制的片段:
class MlModel(beam.DoFn):
def __init__(self):
self._model = None
from google.cloud import storage
import pandas as pd
import pickle as pkl
self._storage = storage
self._pkl = pkl
self._pd = pd
def process(self,element):
if self._model is None:
bucket = self._storage.Client().get_bucket(myBucket)
blob = bucket.get_blob(myBlob)
self._model = self._pkl.loads(blob.download_as_string())
new_df = self._pd.read_json(element, orient='records').iloc[:, 3:-1]
predict = self._model.predict(new_df)
df = self._pd.DataFrame(data=predict, columns=["A", "B"])
A = df.iloc[0]['A']
B = df.iloc[0]['B']
d = {'A':A, 'B':B}
return [d]
您可以在 MlModel
DoFn
方法中使用 @Setup
方法,您可以在其中加载模型,然后在 @Process
方法中使用它。 @Setup
方法在每个 worker 初始化时被调用一次。
我写过类似的回答here
HTH