Amazon SageMaker

Amazon SageMaker 是一项完全托管的机器学习服务。借助 Amazon SageMaker,数据科学家和开发人员可以快速构建和训练机器学习模型,然后将它们部署到可用于生产的托管环境中。

Airflow 提供操作符来创建和与 SageMaker 作业和管道交互。

先决条件任务

要使用这些操作符,您必须执行以下几项操作

操作符

创建 Amazon SageMaker 处理作业

要创建 Amazon Sagemaker 处理作业来清理您的数据集,可以使用 SageMakerProcessingOperator

tests/system/amazon/aws/example_sagemaker.py

preprocess_raw_data = SageMakerProcessingOperator(
    task_id="preprocess_raw_data",
    config=test_setup["processing_config"],
)

# SageMakerProcessingOperator waits by default, setting as False to test the Sensor below.
preprocess_raw_data.wait_for_completion = False

创建 Amazon SageMaker 训练作业

要创建 Amazon Sagemaker 训练作业,可以使用 SageMakerTrainingOperator

tests/system/amazon/aws/example_sagemaker.py

train_model = SageMakerTrainingOperator(
    task_id="train_model",
    config=test_setup["training_config"],
)

创建 Amazon SageMaker 模型

要创建 Amazon Sagemaker 模型,可以使用 SageMakerModelOperator

tests/system/amazon/aws/example_sagemaker.py

create_model = SageMakerModelOperator(
    task_id="create_model",
    config=test_setup["model_config"],
)

启动超参数调整作业

要为 Amazon Sagemaker 模型启动超参数调整作业,可以使用 SageMakerTuningOperator

tests/system/amazon/aws/example_sagemaker.py

tune_model = SageMakerTuningOperator(
    task_id="tune_model",
    config=test_setup["tuning_config"],
)

删除 Amazon SageMaker 模型

要删除 Amazon Sagemaker 模型,可以使用 SageMakerDeleteModelOperator

tests/system/amazon/aws/example_sagemaker.py

delete_model = SageMakerDeleteModelOperator(
    task_id="delete_model",
    config={"ModelName": test_setup["model_name"]},
)

创建 Amazon SageMaker 转换作业

要创建 Amazon Sagemaker 转换作业,可以使用 SageMakerTransformOperator

tests/system/amazon/aws/example_sagemaker.py

test_model = SageMakerTransformOperator(
    task_id="test_model",
    config=test_setup["transform_config"],
)

创建 Amazon SageMaker 端点配置作业

要创建 Amazon Sagemaker 端点配置作业,可以使用 SageMakerEndpointConfigOperator

tests/system/amazon/aws/example_sagemaker_endpoint.py

configure_endpoint = SageMakerEndpointConfigOperator(
    task_id="configure_endpoint",
    config=test_setup["endpoint_config_config"],
)

创建 Amazon SageMaker 端点作业

要创建 Amazon Sagemaker 端点,可以使用 SageMakerEndpointOperator

tests/system/amazon/aws/example_sagemaker_endpoint.py

deploy_endpoint = SageMakerEndpointOperator(
    task_id="deploy_endpoint",
    config=test_setup["deploy_endpoint_config"],
)

启动 Amazon SageMaker 管道执行

要触发已定义的 Amazon Sagemaker 管道的执行运行,可以使用 SageMakerStartPipelineOperator

tests/system/amazon/aws/example_sagemaker_pipeline.py

start_pipeline1 = SageMakerStartPipelineOperator(
    task_id="start_pipeline1",
    pipeline_name=pipeline_name,
)

停止 Amazon SageMaker 管道执行

要停止当前正在运行的 Amazon Sagemaker 管道执行,可以使用 SageMakerStopPipelineOperator

tests/system/amazon/aws/example_sagemaker_pipeline.py

stop_pipeline1 = SageMakerStopPipelineOperator(
    task_id="stop_pipeline1",
    pipeline_exec_arn=start_pipeline1.output,
)

注册 Sagemaker 模型版本

要注册模型版本,可以使用 SageMakerRegisterModelVersionOperator。执行此操作符的结果是一个模型包。模型包是一种可重用的模型工件抽象,它打包了推理所需的所有要素。它由一个推理规范组成,该规范定义了要使用的推理映像以及模型权重的位置。模型包组是模型包的集合。您可以使用此操作符为每个 DAG 运行向组添加新版本和模型包。

tests/system/amazon/aws/example_sagemaker.py

register_model = SageMakerRegisterModelVersionOperator(
    task_id="register_model",
    image_uri=test_setup["inference_code_image"],
    model_url=test_setup["model_trained_weights"],
    package_group_name=test_setup["model_package_group_name"],
)

启动 AutoML 实验

要启动 AutoML 实验,也就是 SageMaker Autopilot,您可以使用 SageMakerAutoMLOperator。AutoML 实验将接收 CSV 格式的输入数据和它应该学习预测的列,并在不需要人工监督的情况下在其上训练模型。输出将放置在 S3 存储桶中,如果配置了,则会自动部署。

tests/system/amazon/aws/example_sagemaker.py

automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)

创建一个供以后使用的实验

要创建 SageMaker 实验,可以使用 SageMakerCreateExperimentOperator。这将创建一个实验,以便它准备好与处理、训练和转换作业关联。

tests/system/amazon/aws/example_sagemaker.py

create_experiment = SageMakerCreateExperimentOperator(
    task_id="create_experiment", name=test_setup["experiment_name"]
)

创建 SageMaker Notebook 实例

要创建 SageMaker Notebook 实例,可以使用 SageMakerCreateNotebookOperator。这将创建一个 SageMaker Notebook 实例,准备好运行 Jupyter 笔记本。

tests/system/amazon/aws/example_sagemaker_notebook.py

instance = SageMakerCreateNotebookOperator(
    task_id="create_instance",
    instance_name=instance_name,
    instance_type="ml.t3.medium",
    role_arn=role_arn,
    wait_for_completion=True,
)

停止 SageMaker Notebook 实例

要终止 SageMaker Notebook 实例,可以使用 SageMakerStopNotebookOperator。这将终止 ML 计算实例并断开 ML 存储卷的连接。

tests/system/amazon/aws/example_sagemaker_notebook.py

stop_instance = SageMakerStopNotebookOperator(
    task_id="stop_instance",
    instance_name=instance_name,
)

启动 SageMaker Notebook 实例

要启动 SageMaker Notebook 实例并重新连接 ML 存储卷,可以使用 SageMakerStartNotebookOperator。这将启动一个新的 ML 计算实例,其中包含最新版本的库,并附加您的 ML 存储卷。

tests/system/amazon/aws/example_sagemaker_notebook.py

start_instance = SageMakerStartNoteBookOperator(
    task_id="start_instance",
    instance_name=instance_name,
)

删除 SageMaker Notebook 实例

要删除 SageMaker Notebook 实例,可以使用 SageMakerDeleteNotebookOperator。这将终止实例并删除与实例关联的 ML 存储卷和网络接口。必须先停止实例才能删除。

tests/system/amazon/aws/example_sagemaker_notebook.py

delete_instance = SageMakerDeleteNotebookOperator(task_id="delete_instance", instance_name=instance_name)

传感器

等待 Amazon SageMaker 训练作业状态

要检查 Amazon Sagemaker 训练作业的状态,直到它达到终端状态,您可以使用 SageMakerTrainingSensor

tests/system/amazon/aws/example_sagemaker.py

await_training = SageMakerTrainingSensor(
    task_id="await_training",
    job_name=test_setup["training_job_name"],
)

等待 Amazon SageMaker 转换作业状态

要检查 Amazon Sagemaker 转换作业的状态,直到它达到终端状态,您可以使用 SageMakerTransformOperator

tests/system/amazon/aws/example_sagemaker.py

await_transform = SageMakerTransformSensor(
    task_id="await_transform",
    job_name=test_setup["transform_job_name"],
)

等待 Amazon SageMaker 调整作业状态

要检查 Amazon Sagemaker 超参数调整作业的状态,直到它达到终端状态,您可以使用 SageMakerTuningSensor

tests/system/amazon/aws/example_sagemaker.py

await_tuning = SageMakerTuningSensor(
    task_id="await_tuning",
    job_name=test_setup["tuning_job_name"],
)

等待 Amazon SageMaker 端点状态

要检查 Amazon Sagemaker 端点的状态,直到它达到最终状态,您可以使用 SageMakerEndpointSensor

tests/system/amazon/aws/example_sagemaker_endpoint.py

await_endpoint = SageMakerEndpointSensor(
    task_id="await_endpoint",
    endpoint_name=test_setup["endpoint_name"],
)

等待 Amazon SageMaker 管道执行状态

要检查 Amazon Sagemaker 管道执行的状态,直到它达到最终状态,您可以使用 SageMakerPipelineSensor

tests/system/amazon/aws/example_sagemaker_pipeline.py

await_pipeline2 = SageMakerPipelineSensor(
    task_id="await_pipeline2",
    pipeline_exec_arn=start_pipeline2.output,
)

等待 Amazon SageMaker AutoML 实验状态

要检查 Amazon Sagemaker AutoML 作业的状态,直到它达到最终状态,您可以使用 SageMakerAutoMLSensor

tests/system/amazon/aws/example_sagemaker.py

automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)

等待 Amazon SageMaker 处理作业状态

要检查 Amazon Sagemaker 处理作业的状态,直到它达到最终状态,您可以使用 SageMakerProcessingSensor

tests/system/amazon/aws/example_sagemaker.py

await_preprocess = SageMakerProcessingSensor(
    task_id="await_preprocess", job_name=test_setup["processing_job_name"]
)

此条目是否有帮助?