Google Cloud VertexAI 操作符

Google Cloud VertexAI 将 AutoML 和 AI Platform 整合到一个统一的 API、客户端库和用户界面中。AutoML 允许您在图像、表格、文本和视频数据集上训练模型,而无需编写代码,而在 AI Platform 中训练允许您运行自定义训练代码。使用 Vertex AI,AutoML 训练和自定义训练都是可用的选项。无论您选择哪种训练选项,您都可以使用 Vertex AI 保存模型、部署模型和请求预测。

创建数据集

要创建 Google VertexAI 数据集,您可以使用 CreateDatasetOperator。该操作符在 XCom 中以 dataset_id 键返回数据集 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源码]

create_image_dataset_job = CreateDatasetOperator(
    task_id="image_dataset",
    dataset=IMAGE_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_tabular_dataset_job = CreateDatasetOperator(
    task_id="tabular_dataset",
    dataset=TABULAR_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_text_dataset_job = CreateDatasetOperator(
    task_id="text_dataset",
    dataset=TEXT_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_video_dataset_job = CreateDatasetOperator(
    task_id="video_dataset",
    dataset=VIDEO_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_time_series_dataset_job = CreateDatasetOperator(
    task_id="time_series_dataset",
    dataset=TIME_SERIES_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)

创建数据集后,您可以使用 ImportDataOperator 导入一些数据。

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源码]

import_data_job = ImportDataOperator(
    task_id="import_data",
    dataset_id=create_image_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
    import_configs=TEST_IMPORT_CONFIG,
)

要导出数据集,您可以使用 ExportDataOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源码]

export_data_job = ExportDataOperator(
    task_id="export_data",
    dataset_id=create_image_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
    export_config=TEST_EXPORT_CONFIG,
)

要删除数据集,您可以使用 DeleteDatasetOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源码]

delete_dataset_job = DeleteDatasetOperator(
    task_id="delete_dataset",
    dataset_id=create_text_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

要获取数据集,您可以使用 GetDatasetOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源码]

get_dataset = GetDatasetOperator(
    task_id="get_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_tabular_dataset_job.output["dataset_id"],
)

要获取数据集列表,您可以使用 ListDatasetsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源码]

list_dataset_job = ListDatasetsOperator(
    task_id="list_dataset",
    region=REGION,
    project_id=PROJECT_ID,
)

要更新数据集,您可以使用 UpdateDatasetOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py[源码]

update_dataset_job = UpdateDatasetOperator(
    task_id="update_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_video_dataset_job.output["dataset_id"],
    dataset=DATASET_TO_UPDATE,
    update_mask=TEST_UPDATE_MASK,
)

创建训练作业

要创建 Google Vertex AI 训练作业,您可以使用三个操作符:CreateCustomContainerTrainingJobOperatorCreateCustomPythonPackageTrainingJobOperatorCreateCustomTrainingJobOperator。它们都将等待操作完成。每个操作符的结果将是用户使用这些操作符训练的模型。

准备步骤

对于每个操作符,您必须准备并创建数据集。然后将数据集 ID 放入操作符的 dataset_id 参数中。

如何运行自定义容器训练作业 CreateCustomContainerTrainingJobOperator

在开始运行此作业之前,您应该创建一个包含训练脚本的 Docker 镜像。有关如何创建镜像的文档,您可以通过以下链接找到:https://cloud.google.com/vertex-ai/docs/training/create-custom-container。之后,您应该将镜像的链接放入 container_uri 参数中。您还可以在 command 参数中输入将从此镜像创建的容器的执行命令。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_container.py[源码]

create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task",
    staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
    display_name=CONTAINER_DISPLAY_NAME,
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    command=["python3", "task.py"],
    model_display_name=MODEL_DISPLAY_NAME,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

CreateCustomContainerTrainingJobOperator 还提供了可延期模式

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_container.py[源码]

create_custom_container_training_job_deferrable = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
    display_name=f"{CONTAINER_DISPLAY_NAME}-def",
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    command=["python3", "task.py"],
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

如何运行 Python 包训练作业 CreateCustomPythonPackageTrainingJobOperator

在开始运行此作业之前,您应该创建一个包含训练脚本的 Python 包。有关如何创建的文档,您可以通过以下链接找到:https://cloud.google.com/vertex-ai/docs/training/create-python-pre-built-container。接下来,您应该将包的链接放入 python_package_gcs_uri 参数中,并且 python_module_name 参数应包含将运行您的训练任务的脚本的名称。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py[源码]

create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task",
    staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
    display_name=PACKAGE_DISPLAY_NAME,
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

CreateCustomPythonPackageTrainingJobOperator 还提供了可延期模式

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py[源码]

create_custom_python_package_training_job_deferrable = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
    display_name=f"{PACKAGE_DISPLAY_NAME}-def",
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

如何运行自定义训练作业 CreateCustomTrainingJobOperator

要创建并运行自定义训练作业,您应该将本地训练脚本的路径放入 script_path 参数中。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[源码]

create_custom_training_job = CreateCustomTrainingJobOperator(
    task_id="custom_task",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=CUSTOM_DISPLAY_NAME,
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)

model_id_v1 = create_custom_training_job.output["model_id"]

相同的操作也可以在可延期模式下执行

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[源码]

create_custom_training_job_deferrable = CreateCustomTrainingJobOperator(
    task_id="custom_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=f"{CUSTOM_DISPLAY_NAME}-def",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)
model_id_deferrable_v1 = create_custom_training_job_deferrable.output["model_id"]

此外,您可以创建现有自定义训练作业的新版本。它将用另一个版本替换现有模型,而不是在模型注册表中创建新模型。这可以通过在运行自定义训练作业时指定 parent_model 参数来完成。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[源码]

create_custom_training_job_v2 = CreateCustomTrainingJobOperator(
    task_id="custom_task_v2",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=CUSTOM_DISPLAY_NAME,
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    parent_model=model_id_v1,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)

相同的操作也可以在可延期模式下执行

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[源码]

create_custom_training_job_deferrable_v2 = CreateCustomTrainingJobOperator(
    task_id="custom_task_deferrable_v2",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=f"{CUSTOM_DISPLAY_NAME}-def",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    parent_model=model_id_deferrable_v1,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

您可以使用 ListCustomTrainingJobOperator 获取训练作业列表。

tests/system/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py[源码]

list_custom_training_job = ListCustomTrainingJobOperator(
    task_id="list_custom_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

如果您希望删除自定义训练作业,可以使用 DeleteCustomTrainingJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py[源码]

delete_custom_training_job = DeleteCustomTrainingJobOperator(
    task_id="delete_custom_training_job",
    training_pipeline_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='training_id') }}",
    custom_job_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='custom_job_id') }}",
    region=REGION,
    project_id=PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

创建 AutoML 训练作业

要创建 Google Vertex AI Auto ML 训练作业,您有五个操作符:CreateAutoMLForecastingTrainingJobOperator CreateAutoMLImageTrainingJobOperator CreateAutoMLTabularTrainingJobOperator SupervisedFineTuningTrainOperator CreateAutoMLVideoTrainingJobOperator。它们每个都会等待操作完成。每个操作符的结果将是由用户使用这些操作符训练的模型。

如何运行 AutoML 预测训练作业 CreateAutoMLForecastingTrainingJobOperator

在开始运行此作业之前,您必须准备并创建 TimeSeries 数据集。之后,您应该将数据集 ID 放入操作符中的 dataset_id 参数。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py[source]

create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
    task_id="auto_ml_forecasting_task",
    display_name=FORECASTING_DISPLAY_NAME,
    optimization_objective="minimize-rmse",
    column_specs=COLUMN_SPECS,
    # run params
    dataset_id=forecast_dataset_id,
    target_column=TEST_TARGET_COLUMN,
    time_column=TEST_TIME_COLUMN,
    time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
    available_at_forecast_columns=[TEST_TIME_COLUMN],
    unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
    time_series_attribute_columns=["city", "zip_code", "county"],
    forecast_horizon=30,
    context_window=30,
    data_granularity_unit="day",
    data_granularity_count=1,
    weight_column=None,
    budget_milli_node_hours=1000,
    model_display_name=MODEL_DISPLAY_NAME,
    predefined_split_column_name=None,
    region=REGION,
    project_id=PROJECT_ID,
)

如何运行 AutoML 图像训练作业 CreateAutoMLImageTrainingJobOperator

在开始运行此作业之前,您必须准备并创建 Image 数据集。之后,您应该将数据集 ID 放入操作符中的 dataset_id 参数。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py[source]

create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
    task_id="auto_ml_image_task",
    display_name=IMAGE_DISPLAY_NAME,
    dataset_id=image_dataset_id,
    prediction_type="classification",
    multi_label=False,
    model_type="CLOUD",
    training_fraction_split=0.6,
    validation_fraction_split=0.2,
    test_fraction_split=0.2,
    budget_milli_node_hours=8000,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

如何运行 AutoML 表格训练作业 CreateAutoMLTabularTrainingJobOperator

在开始运行此作业之前,您必须准备并创建 Tabular 数据集。之后,您应该将数据集 ID 放入操作符中的 dataset_id 参数。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py[source]

create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
    task_id="auto_ml_tabular_task",
    display_name=TABULAR_DISPLAY_NAME,
    optimization_prediction_type="classification",
    column_transformations=COLUMN_TRANSFORMATIONS,
    dataset_id=tabular_dataset_id,
    target_column="Adopted",
    training_fraction_split=0.8,
    validation_fraction_split=0.1,
    test_fraction_split=0.1,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

如何运行 AutoML 视频训练作业 CreateAutoMLVideoTrainingJobOperator

在开始运行此作业之前,您必须准备并创建 Video 数据集。之后,您应该将数据集 ID 放入操作符中的 dataset_id 参数。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py[source]

create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="classification",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)
model_id_v1 = create_auto_ml_video_training_job.output["model_id"]

此外,您可以创建现有 AutoML 视频训练作业的新版本。在这种情况下,结果将是现有模型的新版本,而不是在模型注册表中创建的新模型。这可以通过在运行 AutoML 视频训练作业时指定 parent_model 参数来完成。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py[source]

create_auto_ml_video_training_job_v2 = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_v2_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="classification",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    parent_model=model_id_v1,
    region=REGION,
    project_id=PROJECT_ID,
)

您可以使用 ListAutoMLTrainingJobOperator 获取 AutoML 训练作业的列表。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py[source]

list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
    task_id="list_auto_ml_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

如果您希望删除 Auto ML 训练作业,可以使用 DeleteAutoMLTrainingJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py[source]

delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
    task_id="delete_auto_ml_forecasting_training_job",
    training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_forecasting_task', "
    "key='training_id') }}",
    region=REGION,
    project_id=PROJECT_ID,
)

创建批量预测作业

要创建 Google VertexAI 批量预测作业,您可以使用 CreateBatchPredictionJobOperator。该操作符在 XCom 中的 batch_prediction_job_id 键下返回批量预测作业 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[source]

create_batch_prediction_job = CreateBatchPredictionJobOperator(
    task_id="create_batch_prediction_job",
    job_display_name=JOB_DISPLAY_NAME,
    model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
    predictions_format="csv",
    bigquery_source=BIGQUERY_SOURCE,
    gcs_destination_prefix=GCS_DESTINATION_PREFIX,
    model_parameters=MODEL_PARAMETERS,
    region=REGION,
    project_id=PROJECT_ID,
)

CreateBatchPredictionJobOperator 还提供可延迟模式

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[source]

create_batch_prediction_job_def = CreateBatchPredictionJobOperator(
    task_id="create_batch_prediction_job_def",
    job_display_name=JOB_DISPLAY_NAME,
    model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
    predictions_format="csv",
    bigquery_source=BIGQUERY_SOURCE,
    gcs_destination_prefix=GCS_DESTINATION_PREFIX,
    model_parameters=MODEL_PARAMETERS,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

要删除批量预测作业,您可以使用 DeleteBatchPredictionJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[source]

delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
    task_id="delete_batch_prediction_job",
    batch_prediction_job_id=create_batch_prediction_job.output["batch_prediction_job_id"],
    region=REGION,
    project_id=PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取批量预测作业列表,您可以使用 ListBatchPredictionJobsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py[source]

list_batch_prediction_job = ListBatchPredictionJobsOperator(
    task_id="list_batch_prediction_jobs",
    region=REGION,
    project_id=PROJECT_ID,
)

创建端点服务

要创建 Google VertexAI 端点,您可以使用 CreateEndpointOperator。该操作符在 XCom 中的 endpoint_id 键下返回端点 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

create_endpoint = CreateEndpointOperator(
    task_id="create_endpoint",
    endpoint=ENDPOINT_CONF,
    region=REGION,
    project_id=PROJECT_ID,
)

创建端点后,您可以使用它来部署一些模型,使用 DeployModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

deploy_model = DeployModelOperator(
    task_id="deploy_model",
    endpoint_id=create_endpoint.output["endpoint_id"],
    deployed_model=DEPLOYED_MODEL,
    traffic_split={"0": 100},
    region=REGION,
    project_id=PROJECT_ID,
)

要取消部署模型,您可以使用 UndeployModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

undeploy_model = UndeployModelOperator(
    task_id="undeploy_model",
    endpoint_id=create_endpoint.output["endpoint_id"],
    deployed_model_id=deploy_model.output["deployed_model_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

要删除端点,您可以使用 DeleteEndpointOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

delete_endpoint = DeleteEndpointOperator(
    task_id="delete_endpoint",
    endpoint_id=create_endpoint.output["endpoint_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

要获取端点列表,您可以使用 ListEndpointsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py[source]

list_endpoints = ListEndpointsOperator(
    task_id="list_endpoints",
    region=REGION,
    project_id=PROJECT_ID,
)

创建超参数调整作业

要创建 Google VertexAI 超参数调整作业,您可以使用 CreateHyperparameterTuningJobOperator。该操作符在 XCom 中的 hyperparameter_tuning_job_id 键下返回超参数调整作业 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
    task_id="create_hyperparameter_tuning_job",
    staging_bucket=STAGING_BUCKET,
    display_name=DISPLAY_NAME,
    worker_pool_specs=WORKER_POOL_SPECS,
    region=REGION,
    project_id=PROJECT_ID,
    parameter_spec=PARAM_SPECS,
    metric_spec=METRIC_SPEC,
    max_trial_count=15,
    parallel_trial_count=3,
)

CreateHyperparameterTuningJobOperator 还支持可延迟模式

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

create_hyperparameter_tuning_job_def = CreateHyperparameterTuningJobOperator(
    task_id="create_hyperparameter_tuning_job_def",
    staging_bucket=STAGING_BUCKET,
    display_name=DISPLAY_NAME,
    worker_pool_specs=WORKER_POOL_SPECS,
    region=REGION,
    project_id=PROJECT_ID,
    parameter_spec=PARAM_SPECS,
    metric_spec=METRIC_SPEC,
    max_trial_count=15,
    parallel_trial_count=3,
    deferrable=True,
)

要删除超参数调整作业,您可以使用 DeleteHyperparameterTuningJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
    task_id="delete_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
    "task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取超参数调整作业,您可以使用 GetHyperparameterTuningJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
    task_id="get_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
    "task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
)

要获取超参数调整作业列表,您可以使用 ListHyperparameterTuningJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py[source]

list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
    task_id="list_hyperparameter_tuning_job",
    region=REGION,
    project_id=PROJECT_ID,
)

创建模型服务

要上传 Google VertexAI 模型,您可以使用 UploadModelOperator。该操作符在 XCom 中,使用 model_id 键返回模型 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

upload_model = UploadModelOperator(
    task_id="upload_model",
    region=REGION,
    project_id=PROJECT_ID,
    model=MODEL_OBJ,
)
upload_model_v1 = upload_model.output["model_id"]

要导出模型,您可以使用 ExportModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

export_model = ExportModelOperator(
    task_id="export_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
    output_config=MODEL_OUTPUT_CONFIG,
)

要删除模型,您可以使用 DeleteModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

delete_model = DeleteModelOperator(
    task_id="delete_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取模型列表,您可以使用 ListModelsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

list_models = ListModelsOperator(
    task_id="list_models",
    region=REGION,
    project_id=PROJECT_ID,
)

要通过 ID 检索模型,您可以使用 GetModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

get_model = GetModelOperator(
    task_id="get_model", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)

要列出所有模型版本,您可以使用 ListModelVersionsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

list_model_versions = ListModelVersionsOperator(
    task_id="list_model_versions", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)

要将模型的特定版本设置为默认版本,您可以使用 SetDefaultVersionOnModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

set_default_version = SetDefaultVersionOnModelOperator(
    task_id="set_default_version",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=model_id_v2,
)

要向模型的特定版本添加别名,您可以使用 AddVersionAliasesOnModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

add_version_alias = AddVersionAliasesOnModelOperator(
    task_id="add_version_alias",
    project_id=PROJECT_ID,
    region=REGION,
    version_aliases=["new-version", "beta"],
    model_id=model_id_v2,
)

要从模型的特定版本删除别名,您可以使用 DeleteVersionAliasesOnModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

delete_version_alias = DeleteVersionAliasesOnModelOperator(
    task_id="delete_version_alias",
    project_id=PROJECT_ID,
    region=REGION,
    version_aliases=["new-version"],
    model_id=model_id_v2,
)

要删除模型的特定版本,您可以使用 DeleteModelVersionOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py[源代码]

delete_model_version = DeleteModelVersionOperator(
    task_id="delete_model_version",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=model_id_v1,
    trigger_rule=TriggerRule.ALL_DONE,
)

运行管道作业

要运行 Google VertexAI 管道作业,您可以使用 RunPipelineJobOperator。该操作符在 XCom 中,使用 pipeline_job_id 键返回管道作业 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[源代码]

run_pipeline_job = RunPipelineJobOperator(
    task_id="run_pipeline_job",
    display_name=DISPLAY_NAME,
    template_path=TEMPLATE_PATH,
    parameter_values=PARAMETER_VALUES,
    region=REGION,
    project_id=PROJECT_ID,
)

要删除管道作业,您可以使用 DeletePipelineJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[源代码]

delete_pipeline_job = DeletePipelineJobOperator(
    task_id="delete_pipeline_job",
    project_id=PROJECT_ID,
    region=REGION,
    pipeline_job_id="{{ task_instance.xcom_pull("
    "task_ids='run_pipeline_job', key='pipeline_job_id') }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

要获取管道作业,您可以使用 GetPipelineJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[源代码]

get_pipeline_job = GetPipelineJobOperator(
    task_id="get_pipeline_job",
    project_id=PROJECT_ID,
    region=REGION,
    pipeline_job_id="{{ task_instance.xcom_pull("
    "task_ids='run_pipeline_job', key='pipeline_job_id') }}",
)

要获取管道作业列表,您可以使用 ListPipelineJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py[源代码]

list_pipeline_job = ListPipelineJobOperator(
    task_id="list_pipeline_job",
    region=REGION,
    project_id=PROJECT_ID,
)

与生成式 AI 交互

要生成文本嵌入,您可以使用 TextEmbeddingModelGetEmbeddingsOperator。该操作符在 XCom 中,使用 model_response 键返回模型响应。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

generate_embeddings_task = TextEmbeddingModelGetEmbeddingsOperator(
    task_id="generate_embeddings_task",
    project_id=PROJECT_ID,
    location=REGION,
    prompt=PROMPT,
    pretrained_model=TEXT_EMBEDDING_MODEL,
)

要使用生成式模型生成内容,您可以使用 GenerativeModelGenerateContentOperator。该操作符在 XCom 中,使用 model_response 键返回模型响应。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

generate_content_task = GenerativeModelGenerateContentOperator(
    task_id="generate_content_task",
    project_id=PROJECT_ID,
    contents=CONTENTS,
    tools=TOOLS,
    location=REGION,
    generation_config=GENERATION_CONFIG,
    safety_settings=SAFETY_SETTINGS,
    pretrained_model=MULTIMODAL_MODEL,
)

要运行监督式微调作业,您可以使用 SupervisedFineTuningTrainOperator。该操作符在 XCom 中,使用 tuned_model_endpoint_name 键返回微调后的模型端点名称。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py[源代码]

sft_train_task = SupervisedFineTuningTrainOperator(
    task_id="sft_train_task",
    project_id=PROJECT_ID,
    location=REGION,
    source_model=SOURCE_MODEL,
    train_dataset=TRAIN_DATASET,
    tuned_model_display_name=TUNED_MODEL_DISPLAY_NAME,
)

要在向 Gemini API 发送请求之前计算输入令牌的数量,您可以使用:CountTokensOperator。该操作符在 XCom 中,使用 total_tokens 键返回总令牌数。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

count_tokens_task = CountTokensOperator(
    task_id="count_tokens_task",
    project_id=PROJECT_ID,
    contents=CONTENTS,
    location=REGION,
    pretrained_model=MULTIMODAL_MODEL,
)

要评估模型,您可以使用 RunEvaluationOperator。该操作符在 XCom 中,使用 summary_metrics 键返回评估摘要指标。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

run_evaluation_task = RunEvaluationOperator(
    task_id="run_evaluation_task",
    project_id=PROJECT_ID,
    location=REGION,
    pretrained_model=MULTIMODAL_MODEL,
    eval_dataset=EVAL_DATASET,
    metrics=METRICS,
    experiment_name=EXPERIMENT_NAME,
    experiment_run_name=EXPERIMENT_RUN_NAME,
    prompt_template=PROMPT_TEMPLATE,
)

要创建缓存内容,您可以使用 CreateCachedContentOperator。该操作符在 XCom 中,使用 return_value 键返回缓存内容资源名称。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

create_cached_content_task = CreateCachedContentOperator(
    task_id="create_cached_content_task",
    project_id=PROJECT_ID,
    location=REGION,
    model_name=CACHED_MODEL,
    system_instruction=CACHED_SYSTEM_INSTRUCTION,
    contents=CACHED_CONTENTS,
    ttl_hours=1,
    display_name="example-cache",
)

要从缓存内容生成响应,您可以使用 GenerateFromCachedContentOperator。该操作符在 XCom 中,使用 return_value 键返回缓存内容响应。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py[源代码]

generate_from_cached_content_task = GenerateFromCachedContentOperator(
    task_id="generate_from_cached_content_task",
    project_id=PROJECT_ID,
    location=REGION,
    cached_content_name="{{ task_instance.xcom_pull(task_ids='create_cached_content_task', key='return_value') }}",
    contents=["What are the papers about?"],
    generation_config=GENERATION_CONFIG,
    safety_settings=SAFETY_SETTINGS,
)

与 Vertex AI Feature Store 交互

要获取特征视图同步作业,您可以使用 GetFeatureViewSyncOperator。该操作符在 XCom 中,使用 return_value 键返回同步作业结果。

tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py[源代码]

get_task = GetFeatureViewSyncOperator(
    task_id="get_task",
    location=REGION,
    feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
)

要同步特征视图,您可以使用 SyncFeatureViewOperator。该操作符在 XCom 中,使用 return_value 键返回同步作业名称。

tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py[源代码]

sync_task = SyncFeatureViewOperator(
    task_id="sync_task",
    project_id=PROJECT_ID,
    location=REGION,
    feature_online_store_id=FEATURE_ONLINE_STORE_ID,
    feature_view_id=FEATURE_VIEW_ID,
)

要检查 Feature View 同步是否成功,您可以使用 FeatureViewSyncSensor

tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py[源代码]

wait_for_sync = FeatureViewSyncSensor(
    task_id="wait_for_sync",
    location=REGION,
    feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
    poke_interval=60,  # Check every minute
    timeout=600,  # Timeout after 10 minutes
    mode="reschedule",
)

参考

更多信息,请查看

此条目是否有帮助?