Amazon SageMaker

Amazon SageMaker 是一项完全托管的机器学习服务。借助 Amazon SageMaker,数据科学家和开发人员可以快速构建和训练机器学习模型,然后将其部署到可投入生产的托管环境中。

Airflow 提供了用于创建 SageMaker 作业和管道并与其进行交互的操作器。

先决条件任务

要使用这些操作器,您必须执行以下操作

操作器

创建 Amazon SageMaker 处理作业

要创建 Amazon Sagemaker 处理作业以清理您的数据集,您可以使用 SageMakerProcessingOperator

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

preprocess_raw_data = SageMakerProcessingOperator(
    task_id="preprocess_raw_data",
    config=test_setup["processing_config"],
)

创建 Amazon SageMaker 训练作业

要创建 Amazon Sagemaker 训练作业,您可以使用 SageMakerTrainingOperator

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

train_model = SageMakerTrainingOperator(
    task_id="train_model",
    config=test_setup["training_config"],
)

创建 Amazon SageMaker 模型

要创建 Amazon Sagemaker 模型,您可以使用 SageMakerModelOperator

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

create_model = SageMakerModelOperator(
    task_id="create_model",
    config=test_setup["model_config"],
)

启动超参数调整作业

要为 Amazon Sagemaker 模型启动超参数调整作业,您可以使用 SageMakerTuningOperator

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

tune_model = SageMakerTuningOperator(
    task_id="tune_model",
    config=test_setup["tuning_config"],
)

删除 Amazon SageMaker 模型

要删除 Amazon Sagemaker 模型,您可以使用 SageMakerDeleteModelOperator

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

delete_model = SageMakerDeleteModelOperator(
    task_id="delete_model",
    config={"ModelName": test_setup["model_name"]},
)

创建 Amazon SageMaker 转换作业

要创建 Amazon Sagemaker 转换作业,您可以使用 SageMakerTransformOperator

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

test_model = SageMakerTransformOperator(
    task_id="test_model",
    config=test_setup["transform_config"],
)

创建 Amazon SageMaker 端点配置作业

要创建 Amazon Sagemaker 端点配置作业,您可以使用 SageMakerEndpointConfigOperator

tests/system/providers/amazon/aws/example_sagemaker_endpoint.py[源代码]

configure_endpoint = SageMakerEndpointConfigOperator(
    task_id="configure_endpoint",
    config=test_setup["endpoint_config_config"],
)

创建 Amazon SageMaker 端点作业

要创建 Amazon Sagemaker 端点,您可以使用 SageMakerEndpointOperator

tests/system/providers/amazon/aws/example_sagemaker_endpoint.py[源代码]

deploy_endpoint = SageMakerEndpointOperator(
    task_id="deploy_endpoint",
    config=test_setup["deploy_endpoint_config"],
)

启动 Amazon SageMaker 管道执行

要为已定义的 Amazon Sagemaker 管道触发执行运行,您可以使用 SageMakerStartPipelineOperator

tests/system/providers/amazon/aws/example_sagemaker_pipeline.py[源代码]

start_pipeline1 = SageMakerStartPipelineOperator(
    task_id="start_pipeline1",
    pipeline_name=pipeline_name,
)

停止 Amazon SageMaker 管道执行

要停止当前正在运行的 Amazon Sagemaker 管道执行,您可以使用 SageMakerStopPipelineOperator

tests/system/providers/amazon/aws/example_sagemaker_pipeline.py[源代码]

stop_pipeline1 = SageMakerStopPipelineOperator(
    task_id="stop_pipeline1",
    pipeline_exec_arn=start_pipeline1.output,
)

注册 Sagemaker 模型版本

要注册模型版本,您可以使用 SageMakerRegisterModelVersionOperator。执行此操作器的结果是一个模型包。模型包是一个可重用的模型工件抽象,它打包了推理所需的所有成分。它包含一个推理规范,该规范定义了要使用的推理镜像以及模型权重位置。模型包组是模型包的集合。您可以使用此操作器为每次 DAG 运行向组中添加新版本和模型包。

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

register_model = SageMakerRegisterModelVersionOperator(
    task_id="register_model",
    image_uri=test_setup["inference_code_image"],
    model_url=test_setup["model_trained_weights"],
    package_group_name=test_setup["model_package_group_name"],
)

启动 AutoML 实验

要启动 AutoML 实验(也称为 SageMaker Autopilot),您可以使用 SageMakerAutoMLOperator。AutoML 实验将获取 CSV 格式的一些输入数据以及它应该学习预测的列,并在无需人工监督的情况下对其进行模型训练。输出结果将放置在 S3 存储桶中,并在配置后自动部署。

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)

创建实验以供以后使用

要创建 SageMaker 实验,您可以使用 SageMakerCreateExperimentOperator。这将创建一个实验,以便它可以与处理、训练和转换作业相关联。

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

create_experiment = SageMakerCreateExperimentOperator(
    task_id="create_experiment", name=test_setup["experiment_name"]
)

创建 SageMaker 笔记本实例

要创建 SageMaker 笔记本实例,您可以使用 SageMakerCreateNotebookOperator。这将创建一个 SageMaker 笔记本实例,准备运行 Jupyter 笔记本。

tests/system/providers/amazon/aws/example_sagemaker_notebook.py[源代码]

instance = SageMakerCreateNotebookOperator(
    task_id="create_instance",
    instance_name=instance_name,
    instance_type="ml.t3.medium",
    role_arn=role_arn,
    wait_for_completion=True,
)

停止 SageMaker 笔记本实例

要终止 SageMaker 笔记本实例,您可以使用 SageMakerStopNotebookOperator。这将终止 ML 计算实例并断开 ML 存储卷的连接。

tests/system/providers/amazon/aws/example_sagemaker_notebook.py[源代码]

stop_instance = SageMakerStopNotebookOperator(
    task_id="stop_instance",
    instance_name=instance_name,
)

启动 SageMaker 笔记本实例

要启动 SageMaker 笔记本实例并重新连接 ML 存储卷,您可以使用 SageMakerStartNotebookOperator。这将使用最新版本的库启动新的 ML 计算实例,并连接您的 ML 存储卷。

tests/system/providers/amazon/aws/example_sagemaker_notebook.py[源代码]

start_instance = SageMakerStartNoteBookOperator(
    task_id="start_instance",
    instance_name=instance_name,
)

删除 SageMaker 笔记本实例

要删除 SageMaker 笔记本实例,您可以使用 SageMakerDeleteNotebookOperator。这将终止实例并删除与该实例关联的 ML 存储卷和网络接口。必须先停止实例,然后才能将其删除。

tests/system/providers/amazon/aws/example_sagemaker_notebook.py[源代码]

delete_instance = SageMakerDeleteNotebookOperator(task_id="delete_instance", instance_name=instance_name)

传感器

等待 Amazon SageMaker 训练作业状态

要检查 Amazon Sagemaker 训练作业的状态,直到其达到最终状态,您可以使用 SageMakerTrainingSensor

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

await_training = SageMakerTrainingSensor(
    task_id="await_training",
    job_name=test_setup["training_job_name"],
)

等待 Amazon SageMaker 转换作业状态

要检查 Amazon SageMaker 转换作业的状态,直到其达到最终状态,您可以使用 SageMakerTransformOperator

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

await_transform = SageMakerTransformSensor(
    task_id="await_transform",
    job_name=test_setup["transform_job_name"],
)

等待 Amazon SageMaker 调整作业状态

要检查 Amazon SageMaker 超参数调整作业的状态,直到其达到最终状态,您可以使用 SageMakerTuningSensor

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

await_tuning = SageMakerTuningSensor(
    task_id="await_tuning",
    job_name=test_setup["tuning_job_name"],
)

等待 Amazon SageMaker 端点状态

要检查 Amazon SageMaker 端点的状态,直到其达到最终状态,您可以使用 SageMakerEndpointSensor

tests/system/providers/amazon/aws/example_sagemaker_endpoint.py[源代码]

await_endpoint = SageMakerEndpointSensor(
    task_id="await_endpoint",
    endpoint_name=test_setup["endpoint_name"],
)

等待 Amazon SageMaker 管道执行状态

要检查 Amazon SageMaker 管道执行的状态,直到其达到最终状态,您可以使用 SageMakerPipelineSensor

tests/system/providers/amazon/aws/example_sagemaker_pipeline.py[源代码]

await_pipeline2 = SageMakerPipelineSensor(
    task_id="await_pipeline2",
    pipeline_exec_arn=start_pipeline2.output,
)

等待 Amazon SageMaker AutoML 实验状态

要检查 Amazon SageMaker AutoML 作业的状态,直到其达到最终状态,您可以使用 SageMakerAutoMLSensor

tests/system/providers/amazon/aws/example_sagemaker.py[源代码]

automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)

此条目有帮助吗?