用气流清理过去的人工智能平台模型版本
cleaning past ai platform model version with airflow
我正在使用 airflow 来安排 gcloud AI 平台中模型版本的训练
我设法安排了模型的训练,版本的创建,然后我使用这个 DAG 将最后一个版本设置为默认版本:
with DAG('ml_pipeline', schedule_interval=None, default_args=default_args) as dag:
uuid = str(uuid4())
training_op = MLEngineTrainingOperator(
task_id='submit_job_for_training',
project_id=PROJECT_ID,
job_id='training_{}'.format(uuid),
# package_uris=TRAINER_BIN,
package_uris=[os.path.join(TRAINER_BIN)],
training_python_module=TRAINER_MODULE,
runtime_version=RUNTIME_VERSION,
region='us-central1',
training_args=[
'--base-dir={}'.format(BASE_DIR)
],
python_version='3.5')
create_version_op = MLEngineVersionOperator(
task_id='create_version',
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version={
'name': version_name,
'deploymentUri': export_uri,
'runtimeVersion': RUNTIME_VERSION,
'pythonVersion': '3.5',
'framework': 'SCIKIT_LEARN',
},
operation='create')
set_version_default_op = MLEngineVersionOperator(
task_id='set_version_as_default',
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version={'name': version_name},
operation='set_default')
training_op >> create_version_op >> set_version_default_op
我想在这个 dag 中清理模型的以前版本。我想我应该使用像这样的 MLEngineVersionOperator 的 "list" 和 "delete" 操作:
list_model_versions = MLEngineVersionOperator(
task_id="list_versions",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
operation="list",
)
delete_other_version = MLEngineVersionOperator(
task_id="delete_precedent_version",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
operation="delete",
version={'name': some_name}
)
我读到过有关使用 Xcom 在删除中使用列表运算符的结果,但我不知道该怎么做。
任何关于如何进行的建议或解决方案将不胜感激。谢谢!
您可以使用模板化 属性 传递先前使用 Xcom 的运算符的结果。例如:
delete_other_version = MLEngineVersionOperator(
task_id="delete_precedent_version",
project_id="asimov-foundation",
model_name="IrisPredictor",
version_name="{{task_instance.xcom_pull(task_ids='my_previous_task')}}",
operation="delete",
)
其中 version_name 的值是使用 Jinja 模板来使用 Xcom。现在,前一个运算符的结果是一个版本列表,因此您需要在将要删除的版本名称传递给它之前进行额外的处理。
下面是一个 PythonOperator 示例,它从前一个运算符中获取列表并获得部署的第二个最新版本的编号。
def get_version(**context):
# Get the list of versions from previous operator
versions = context['task_instance'].xcom_pull(task_ids='list_versions')
# Sort the version list by createTime and obtain the name of the second most recent version
full_name = sorted(versions, key=lambda x: x['createTime'], reverse=True)[1]['name']
# The name is in format "projects/PROJECT/models/MODEL/versions/VERSION", so we'll use only VERSION
return full_name.split('/')[-1]
get_version_task = PythonOperator(
task_id='get_version_task',
python_callable=get_version,
provide_context=True,
)
在 PythonOperator 中可以 use xcom_pull through the context。
完整的 dag 是:
def get_version(**context):
# Get the list of versions from previous operator
versions = context['task_instance'].xcom_pull(task_ids='list_versions')
# Sort the version list by createTime and obtain the name of the second most recent version
full_name = sorted(versions, key=lambda x: x['createTime'], reverse=True)[1]['name']
# The name is in format "projects/PROJECT/models/MODEL/versions/VERSION", so we'll use only VERSION
return full_name.split('/')[-1]
list_model_versions = MLEngineVersionOperator(
task_id="list_versions",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
operation="list",
)
get_version_task = PythonOperator(
task_id='get_version_task',
python_callable=get_version,
provide_context=True,
)
delete_other_version = MLEngineVersionOperator(
task_id="delete_precedent_version",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version_name="{{task_instance.xcom_pull(task_ids='get_version_task')}}",
operation="delete",
)
list_model_versions >> get_version_task >> delete_other_version
我正在使用 airflow 来安排 gcloud AI 平台中模型版本的训练 我设法安排了模型的训练,版本的创建,然后我使用这个 DAG 将最后一个版本设置为默认版本:
with DAG('ml_pipeline', schedule_interval=None, default_args=default_args) as dag:
uuid = str(uuid4())
training_op = MLEngineTrainingOperator(
task_id='submit_job_for_training',
project_id=PROJECT_ID,
job_id='training_{}'.format(uuid),
# package_uris=TRAINER_BIN,
package_uris=[os.path.join(TRAINER_BIN)],
training_python_module=TRAINER_MODULE,
runtime_version=RUNTIME_VERSION,
region='us-central1',
training_args=[
'--base-dir={}'.format(BASE_DIR)
],
python_version='3.5')
create_version_op = MLEngineVersionOperator(
task_id='create_version',
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version={
'name': version_name,
'deploymentUri': export_uri,
'runtimeVersion': RUNTIME_VERSION,
'pythonVersion': '3.5',
'framework': 'SCIKIT_LEARN',
},
operation='create')
set_version_default_op = MLEngineVersionOperator(
task_id='set_version_as_default',
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version={'name': version_name},
operation='set_default')
training_op >> create_version_op >> set_version_default_op
我想在这个 dag 中清理模型的以前版本。我想我应该使用像这样的 MLEngineVersionOperator 的 "list" 和 "delete" 操作:
list_model_versions = MLEngineVersionOperator(
task_id="list_versions",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
operation="list",
)
delete_other_version = MLEngineVersionOperator(
task_id="delete_precedent_version",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
operation="delete",
version={'name': some_name}
)
我读到过有关使用 Xcom 在删除中使用列表运算符的结果,但我不知道该怎么做。
任何关于如何进行的建议或解决方案将不胜感激。谢谢!
您可以使用模板化 属性 传递先前使用 Xcom 的运算符的结果。例如:
delete_other_version = MLEngineVersionOperator(
task_id="delete_precedent_version",
project_id="asimov-foundation",
model_name="IrisPredictor",
version_name="{{task_instance.xcom_pull(task_ids='my_previous_task')}}",
operation="delete",
)
其中 version_name 的值是使用 Jinja 模板来使用 Xcom。现在,前一个运算符的结果是一个版本列表,因此您需要在将要删除的版本名称传递给它之前进行额外的处理。
下面是一个 PythonOperator 示例,它从前一个运算符中获取列表并获得部署的第二个最新版本的编号。
def get_version(**context):
# Get the list of versions from previous operator
versions = context['task_instance'].xcom_pull(task_ids='list_versions')
# Sort the version list by createTime and obtain the name of the second most recent version
full_name = sorted(versions, key=lambda x: x['createTime'], reverse=True)[1]['name']
# The name is in format "projects/PROJECT/models/MODEL/versions/VERSION", so we'll use only VERSION
return full_name.split('/')[-1]
get_version_task = PythonOperator(
task_id='get_version_task',
python_callable=get_version,
provide_context=True,
)
在 PythonOperator 中可以 use xcom_pull through the context。
完整的 dag 是:
def get_version(**context):
# Get the list of versions from previous operator
versions = context['task_instance'].xcom_pull(task_ids='list_versions')
# Sort the version list by createTime and obtain the name of the second most recent version
full_name = sorted(versions, key=lambda x: x['createTime'], reverse=True)[1]['name']
# The name is in format "projects/PROJECT/models/MODEL/versions/VERSION", so we'll use only VERSION
return full_name.split('/')[-1]
list_model_versions = MLEngineVersionOperator(
task_id="list_versions",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
operation="list",
)
get_version_task = PythonOperator(
task_id='get_version_task',
python_callable=get_version,
provide_context=True,
)
delete_other_version = MLEngineVersionOperator(
task_id="delete_precedent_version",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version_name="{{task_instance.xcom_pull(task_ids='get_version_task')}}",
operation="delete",
)
list_model_versions >> get_version_task >> delete_other_version