Amazon SageMaker¶
Amazon SageMaker 是一项完全托管的机器学习服务。借助 Amazon SageMaker,数据科学家和开发人员可以快速构建和训练机器学习模型,然后将它们部署到可用于生产的托管环境中。
Airflow 提供操作符来创建和与 SageMaker 作业和管道交互。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参考 Airflow® 的安装
设置连接.
操作符¶
创建 Amazon SageMaker 处理作业¶
要创建 Amazon Sagemaker 处理作业来清理您的数据集,可以使用 SageMakerProcessingOperator
。
tests/system/amazon/aws/example_sagemaker.py
preprocess_raw_data = SageMakerProcessingOperator(
task_id="preprocess_raw_data",
config=test_setup["processing_config"],
)
# SageMakerProcessingOperator waits by default, setting as False to test the Sensor below.
preprocess_raw_data.wait_for_completion = False
创建 Amazon SageMaker 训练作业¶
要创建 Amazon Sagemaker 训练作业,可以使用 SageMakerTrainingOperator
。
tests/system/amazon/aws/example_sagemaker.py
train_model = SageMakerTrainingOperator(
task_id="train_model",
config=test_setup["training_config"],
)
创建 Amazon SageMaker 模型¶
要创建 Amazon Sagemaker 模型,可以使用 SageMakerModelOperator
。
tests/system/amazon/aws/example_sagemaker.py
create_model = SageMakerModelOperator(
task_id="create_model",
config=test_setup["model_config"],
)
启动超参数调整作业¶
要为 Amazon Sagemaker 模型启动超参数调整作业,可以使用 SageMakerTuningOperator
。
tests/system/amazon/aws/example_sagemaker.py
tune_model = SageMakerTuningOperator(
task_id="tune_model",
config=test_setup["tuning_config"],
)
删除 Amazon SageMaker 模型¶
要删除 Amazon Sagemaker 模型,可以使用 SageMakerDeleteModelOperator
。
tests/system/amazon/aws/example_sagemaker.py
delete_model = SageMakerDeleteModelOperator(
task_id="delete_model",
config={"ModelName": test_setup["model_name"]},
)
创建 Amazon SageMaker 转换作业¶
要创建 Amazon Sagemaker 转换作业,可以使用 SageMakerTransformOperator
。
tests/system/amazon/aws/example_sagemaker.py
test_model = SageMakerTransformOperator(
task_id="test_model",
config=test_setup["transform_config"],
)
创建 Amazon SageMaker 端点配置作业¶
要创建 Amazon Sagemaker 端点配置作业,可以使用 SageMakerEndpointConfigOperator
。
tests/system/amazon/aws/example_sagemaker_endpoint.py
configure_endpoint = SageMakerEndpointConfigOperator(
task_id="configure_endpoint",
config=test_setup["endpoint_config_config"],
)
创建 Amazon SageMaker 端点作业¶
要创建 Amazon Sagemaker 端点,可以使用 SageMakerEndpointOperator
。
tests/system/amazon/aws/example_sagemaker_endpoint.py
deploy_endpoint = SageMakerEndpointOperator(
task_id="deploy_endpoint",
config=test_setup["deploy_endpoint_config"],
)
启动 Amazon SageMaker 管道执行¶
要触发已定义的 Amazon Sagemaker 管道的执行运行,可以使用 SageMakerStartPipelineOperator
。
tests/system/amazon/aws/example_sagemaker_pipeline.py
start_pipeline1 = SageMakerStartPipelineOperator(
task_id="start_pipeline1",
pipeline_name=pipeline_name,
)
停止 Amazon SageMaker 管道执行¶
要停止当前正在运行的 Amazon Sagemaker 管道执行,可以使用 SageMakerStopPipelineOperator
。
tests/system/amazon/aws/example_sagemaker_pipeline.py
stop_pipeline1 = SageMakerStopPipelineOperator(
task_id="stop_pipeline1",
pipeline_exec_arn=start_pipeline1.output,
)
注册 Sagemaker 模型版本¶
要注册模型版本,可以使用 SageMakerRegisterModelVersionOperator
。执行此操作符的结果是一个模型包。模型包是一种可重用的模型工件抽象,它打包了推理所需的所有要素。它由一个推理规范组成,该规范定义了要使用的推理映像以及模型权重的位置。模型包组是模型包的集合。您可以使用此操作符为每个 DAG 运行向组添加新版本和模型包。
tests/system/amazon/aws/example_sagemaker.py
register_model = SageMakerRegisterModelVersionOperator(
task_id="register_model",
image_uri=test_setup["inference_code_image"],
model_url=test_setup["model_trained_weights"],
package_group_name=test_setup["model_package_group_name"],
)
启动 AutoML 实验¶
要启动 AutoML 实验,也就是 SageMaker Autopilot,您可以使用 SageMakerAutoMLOperator
。AutoML 实验将接收 CSV 格式的输入数据和它应该学习预测的列,并在不需要人工监督的情况下在其上训练模型。输出将放置在 S3 存储桶中,如果配置了,则会自动部署。
tests/system/amazon/aws/example_sagemaker.py
automl = SageMakerAutoMLOperator(
task_id="auto_ML",
job_name=test_setup["auto_ml_job_name"],
s3_input=test_setup["input_data_uri"],
target_attribute="class",
s3_output=test_setup["output_data_uri"],
role_arn=test_context[ROLE_ARN_KEY],
time_limit=30, # will stop the job before it can do anything, but it's not the point here
)
创建一个供以后使用的实验¶
要创建 SageMaker 实验,可以使用 SageMakerCreateExperimentOperator
。这将创建一个实验,以便它准备好与处理、训练和转换作业关联。
tests/system/amazon/aws/example_sagemaker.py
create_experiment = SageMakerCreateExperimentOperator(
task_id="create_experiment", name=test_setup["experiment_name"]
)
创建 SageMaker Notebook 实例¶
要创建 SageMaker Notebook 实例,可以使用 SageMakerCreateNotebookOperator
。这将创建一个 SageMaker Notebook 实例,准备好运行 Jupyter 笔记本。
tests/system/amazon/aws/example_sagemaker_notebook.py
instance = SageMakerCreateNotebookOperator(
task_id="create_instance",
instance_name=instance_name,
instance_type="ml.t3.medium",
role_arn=role_arn,
wait_for_completion=True,
)
停止 SageMaker Notebook 实例¶
要终止 SageMaker Notebook 实例,可以使用 SageMakerStopNotebookOperator
。这将终止 ML 计算实例并断开 ML 存储卷的连接。
tests/system/amazon/aws/example_sagemaker_notebook.py
stop_instance = SageMakerStopNotebookOperator(
task_id="stop_instance",
instance_name=instance_name,
)
启动 SageMaker Notebook 实例¶
要启动 SageMaker Notebook 实例并重新连接 ML 存储卷,可以使用 SageMakerStartNotebookOperator
。这将启动一个新的 ML 计算实例,其中包含最新版本的库,并附加您的 ML 存储卷。
tests/system/amazon/aws/example_sagemaker_notebook.py
start_instance = SageMakerStartNoteBookOperator(
task_id="start_instance",
instance_name=instance_name,
)
删除 SageMaker Notebook 实例¶
要删除 SageMaker Notebook 实例,可以使用 SageMakerDeleteNotebookOperator
。这将终止实例并删除与实例关联的 ML 存储卷和网络接口。必须先停止实例才能删除。
tests/system/amazon/aws/example_sagemaker_notebook.py
delete_instance = SageMakerDeleteNotebookOperator(task_id="delete_instance", instance_name=instance_name)
传感器¶
等待 Amazon SageMaker 训练作业状态¶
要检查 Amazon Sagemaker 训练作业的状态,直到它达到终端状态,您可以使用 SageMakerTrainingSensor
。
tests/system/amazon/aws/example_sagemaker.py
await_training = SageMakerTrainingSensor(
task_id="await_training",
job_name=test_setup["training_job_name"],
)
等待 Amazon SageMaker 转换作业状态¶
要检查 Amazon Sagemaker 转换作业的状态,直到它达到终端状态,您可以使用 SageMakerTransformOperator
。
tests/system/amazon/aws/example_sagemaker.py
await_transform = SageMakerTransformSensor(
task_id="await_transform",
job_name=test_setup["transform_job_name"],
)
等待 Amazon SageMaker 调整作业状态¶
要检查 Amazon Sagemaker 超参数调整作业的状态,直到它达到终端状态,您可以使用 SageMakerTuningSensor
。
tests/system/amazon/aws/example_sagemaker.py
await_tuning = SageMakerTuningSensor(
task_id="await_tuning",
job_name=test_setup["tuning_job_name"],
)
等待 Amazon SageMaker 端点状态¶
要检查 Amazon Sagemaker 端点的状态,直到它达到最终状态,您可以使用 SageMakerEndpointSensor
。
tests/system/amazon/aws/example_sagemaker_endpoint.py
await_endpoint = SageMakerEndpointSensor(
task_id="await_endpoint",
endpoint_name=test_setup["endpoint_name"],
)
等待 Amazon SageMaker 管道执行状态¶
要检查 Amazon Sagemaker 管道执行的状态,直到它达到最终状态,您可以使用 SageMakerPipelineSensor
。
tests/system/amazon/aws/example_sagemaker_pipeline.py
await_pipeline2 = SageMakerPipelineSensor(
task_id="await_pipeline2",
pipeline_exec_arn=start_pipeline2.output,
)
等待 Amazon SageMaker AutoML 实验状态¶
要检查 Amazon Sagemaker AutoML 作业的状态,直到它达到最终状态,您可以使用 SageMakerAutoMLSensor
。
tests/system/amazon/aws/example_sagemaker.py
automl = SageMakerAutoMLOperator(
task_id="auto_ML",
job_name=test_setup["auto_ml_job_name"],
s3_input=test_setup["input_data_uri"],
target_attribute="class",
s3_output=test_setup["output_data_uri"],
role_arn=test_context[ROLE_ARN_KEY],
time_limit=30, # will stop the job before it can do anything, but it's not the point here
)
等待 Amazon SageMaker 处理作业状态¶
要检查 Amazon Sagemaker 处理作业的状态,直到它达到最终状态,您可以使用 SageMakerProcessingSensor
。
tests/system/amazon/aws/example_sagemaker.py
await_preprocess = SageMakerProcessingSensor(
task_id="await_preprocess", job_name=test_setup["processing_job_name"]
)