Amazon SageMaker

Amazon SageMaker 是一项完全托管的机器学习服务。借助 Amazon SageMaker,数据科学家和开发人员可以快速构建和训练机器学习模型,然后将其部署到生产就绪型托管环境中。

Airflow 提供了用于创建和与 SageMaker Jobs 和 Pipelines 交互的 Operator。

前提任务

要使用这些 Operator,您必须执行以下几项操作:

Operators

创建 Amazon SageMaker 处理任务

要创建用于清理数据集的 Amazon Sagemaker 处理任务,可以使用 SageMakerProcessingOperator

tests/system/amazon/aws/example_sagemaker.py

preprocess_raw_data = SageMakerProcessingOperator(
    task_id="preprocess_raw_data",
    config=test_setup["processing_config"],
)

# SageMakerProcessingOperator waits by default, setting as False to test the Sensor below.
preprocess_raw_data.wait_for_completion = False

创建 Amazon SageMaker 训练任务

要创建 Amazon Sagemaker 训练任务,可以使用 SageMakerTrainingOperator

tests/system/amazon/aws/example_sagemaker.py

train_model = SageMakerTrainingOperator(
    task_id="train_model",
    config=test_setup["training_config"],
)

创建 Amazon SageMaker 模型

要创建 Amazon Sagemaker 模型,可以使用 SageMakerModelOperator

tests/system/amazon/aws/example_sagemaker.py

create_model = SageMakerModelOperator(
    task_id="create_model",
    config=test_setup["model_config"],
)

启动超参数调优任务

要为 Amazon Sagemaker 模型启动超参数调优任务,可以使用 SageMakerTuningOperator

tests/system/amazon/aws/example_sagemaker.py

tune_model = SageMakerTuningOperator(
    task_id="tune_model",
    config=test_setup["tuning_config"],
)

删除 Amazon SageMaker 模型

要删除 Amazon Sagemaker 模型,可以使用 SageMakerDeleteModelOperator

tests/system/amazon/aws/example_sagemaker.py

delete_model = SageMakerDeleteModelOperator(
    task_id="delete_model",
    config={"ModelName": test_setup["model_name"]},
)

创建 Amazon SageMaker 转换任务

要创建 Amazon Sagemaker 转换任务,可以使用 SageMakerTransformOperator

tests/system/amazon/aws/example_sagemaker.py

test_model = SageMakerTransformOperator(
    task_id="test_model",
    config=test_setup["transform_config"],
)

创建 Amazon SageMaker 端点配置任务

要创建 Amazon Sagemaker 端点配置任务,可以使用 SageMakerEndpointConfigOperator

tests/system/amazon/aws/example_sagemaker_endpoint.py

configure_endpoint = SageMakerEndpointConfigOperator(
    task_id="configure_endpoint",
    config=test_setup["endpoint_config_config"],
)

创建 Amazon SageMaker 端点任务

要创建 Amazon Sagemaker 端点,可以使用 SageMakerEndpointOperator

tests/system/amazon/aws/example_sagemaker_endpoint.py

deploy_endpoint = SageMakerEndpointOperator(
    task_id="deploy_endpoint",
    config=test_setup["deploy_endpoint_config"],
)

启动 Amazon SageMaker pipeline 执行

要触发已定义的 Amazon Sagemaker pipeline 的执行运行,可以使用 SageMakerStartPipelineOperator

tests/system/amazon/aws/example_sagemaker_pipeline.py

start_pipeline1 = SageMakerStartPipelineOperator(
    task_id="start_pipeline1",
    pipeline_name=pipeline_name,
)

停止 Amazon SageMaker pipeline 执行

要停止当前正在运行的 Amazon Sagemaker pipeline 执行,可以使用 SageMakerStopPipelineOperator

tests/system/amazon/aws/example_sagemaker_pipeline.py

stop_pipeline1 = SageMakerStopPipelineOperator(
    task_id="stop_pipeline1",
    pipeline_exec_arn=start_pipeline1.output,
)

注册 SageMaker 模型版本

要注册模型版本,可以使用 SageMakerRegisterModelVersionOperator。执行此 Operator 的结果是一个模型包 (model package)。模型包是可重用的模型 artifact 抽象,它打包了推理所需的所有元素。它包含一个推理规范 (inference specification),该规范定义了要使用的推理镜像 (inference image) 以及模型权重位置 (model weights location)。模型包组 (model package group) 是模型包的集合。您可以使用此 Operator 为每个 DAG 运行向组中添加新版本和模型包。

tests/system/amazon/aws/example_sagemaker.py

register_model = SageMakerRegisterModelVersionOperator(
    task_id="register_model",
    image_uri=test_setup["inference_code_image"],
    model_url=test_setup["model_trained_weights"],
    package_group_name=test_setup["model_package_group_name"],
)

启动 AutoML 实验

要启动 AutoML 实验(也称为 SageMaker Autopilot),可以使用 SageMakerAutoMLOperator。AutoML 实验将接收 CSV 格式的输入数据以及要学习预测的列,并在无人监督的情况下对其训练模型。输出将放置在 S3 存储桶中,如果已配置,则会自动部署。

tests/system/amazon/aws/example_sagemaker.py

automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)

创建供以后使用的实验

要创建 SageMaker 实验,可以使用 SageMakerCreateExperimentOperator。这将创建一个实验,使其准备好与处理、训练和转换任务关联。

tests/system/amazon/aws/example_sagemaker.py

create_experiment = SageMakerCreateExperimentOperator(
    task_id="create_experiment", name=test_setup["experiment_name"]
)

创建 SageMaker Notebook 实例

要创建 SageMaker Notebook 实例,可以使用 SageMakerCreateNotebookOperator。这将创建一个准备好运行 Jupyter notebook 的 SageMaker Notebook 实例。

tests/system/amazon/aws/example_sagemaker_notebook.py

instance = SageMakerCreateNotebookOperator(
    task_id="create_instance",
    instance_name=instance_name,
    instance_type="ml.t3.medium",
    role_arn=role_arn,
    wait_for_completion=True,
)

停止 SageMaker Notebook 实例

要终止 SageMaker Notebook 实例,可以使用 SageMakerStopNotebookOperator。这将终止 ML 计算实例并断开 ML 存储卷的连接。

tests/system/amazon/aws/example_sagemaker_notebook.py

stop_instance = SageMakerStopNotebookOperator(
    task_id="stop_instance",
    instance_name=instance_name,
)

启动 SageMaker Notebook 实例

要启动 SageMaker Notebook 实例并重新连接 ML 存储卷,可以使用 SageMakerStartNotebookOperator。这将启动一个新的 ML 计算实例,其中包含最新版本的库并连接您的 ML 存储卷。

tests/system/amazon/aws/example_sagemaker_notebook.py

start_instance = SageMakerStartNoteBookOperator(
    task_id="start_instance",
    instance_name=instance_name,
)

删除 SageMaker Notebook 实例

要删除 SageMaker Notebook 实例,可以使用 SageMakerDeleteNotebookOperator。这将终止实例并删除与实例关联的 ML 存储卷和网络接口。实例必须先停止才能删除。

tests/system/amazon/aws/example_sagemaker_notebook.py

delete_instance = SageMakerDeleteNotebookOperator(task_id="delete_instance", instance_name=instance_name)

Sensors

等待 Amazon SageMaker 训练任务状态

要检查 Amazon Sagemaker 训练任务的状态直到其达到终端状态,可以使用 SageMakerTrainingSensor

tests/system/amazon/aws/example_sagemaker.py

await_training = SageMakerTrainingSensor(
    task_id="await_training",
    job_name=test_setup["training_job_name"],
)

等待 Amazon SageMaker 转换任务状态

要检查 Amazon Sagemaker 转换任务的状态直到其达到终端状态,可以使用 SageMakerTransformOperator

tests/system/amazon/aws/example_sagemaker.py

await_transform = SageMakerTransformSensor(
    task_id="await_transform",
    job_name=test_setup["transform_job_name"],
)

等待 Amazon SageMaker 调优任务状态

要检查 Amazon Sagemaker 超参数调优任务的状态直到其达到终端状态,可以使用 SageMakerTuningSensor

tests/system/amazon/aws/example_sagemaker.py

await_tuning = SageMakerTuningSensor(
    task_id="await_tuning",
    job_name=test_setup["tuning_job_name"],
)

等待 Amazon SageMaker 端点状态

要检查 Amazon Sagemaker 端点的状态直到其达到终端状态,可以使用 SageMakerEndpointSensor

tests/system/amazon/aws/example_sagemaker_endpoint.py

await_endpoint = SageMakerEndpointSensor(
    task_id="await_endpoint",
    endpoint_name=test_setup["endpoint_name"],
)

等待 Amazon SageMaker pipeline 执行状态

要检查 Amazon Sagemaker pipeline 执行的状态直到其达到终端状态,可以使用 SageMakerPipelineSensor

tests/system/amazon/aws/example_sagemaker_pipeline.py

await_pipeline2 = SageMakerPipelineSensor(
    task_id="await_pipeline2",
    pipeline_exec_arn=start_pipeline2.output,
)

等待 Amazon SageMaker AutoML 实验状态

要检查 Amazon Sagemaker AutoML 任务的状态直到其达到终端状态,可以使用 SageMakerAutoMLSensor

tests/system/amazon/aws/example_sagemaker.py

automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)

等待 Amazon SageMaker 处理任务状态

要检查 Amazon Sagemaker 处理任务的状态直到其达到终端状态,可以使用 SageMakerProcessingSensor

tests/system/amazon/aws/example_sagemaker.py

await_preprocess = SageMakerProcessingSensor(
    task_id="await_preprocess", job_name=test_setup["processing_job_name"]
)

参考

此条目有帮助吗?