Amazon SageMaker¶
Amazon SageMaker 是一项完全托管的机器学习服务。借助 Amazon SageMaker,数据科学家和开发人员可以快速构建和训练机器学习模型,然后将其部署到生产就绪型托管环境中。
Airflow 提供了用于创建和与 SageMaker Jobs 和 Pipelines 交互的 Operator。
前提任务¶
要使用这些 Operator,您必须执行以下几项操作:
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 安装 Airflow®
设置连接.
Operators¶
创建 Amazon SageMaker 处理任务¶
要创建用于清理数据集的 Amazon Sagemaker 处理任务,可以使用 SageMakerProcessingOperator
。
tests/system/amazon/aws/example_sagemaker.py
preprocess_raw_data = SageMakerProcessingOperator(
task_id="preprocess_raw_data",
config=test_setup["processing_config"],
)
# SageMakerProcessingOperator waits by default, setting as False to test the Sensor below.
preprocess_raw_data.wait_for_completion = False
创建 Amazon SageMaker 训练任务¶
要创建 Amazon Sagemaker 训练任务,可以使用 SageMakerTrainingOperator
。
tests/system/amazon/aws/example_sagemaker.py
train_model = SageMakerTrainingOperator(
task_id="train_model",
config=test_setup["training_config"],
)
创建 Amazon SageMaker 模型¶
要创建 Amazon Sagemaker 模型,可以使用 SageMakerModelOperator
。
tests/system/amazon/aws/example_sagemaker.py
create_model = SageMakerModelOperator(
task_id="create_model",
config=test_setup["model_config"],
)
启动超参数调优任务¶
要为 Amazon Sagemaker 模型启动超参数调优任务,可以使用 SageMakerTuningOperator
。
tests/system/amazon/aws/example_sagemaker.py
tune_model = SageMakerTuningOperator(
task_id="tune_model",
config=test_setup["tuning_config"],
)
删除 Amazon SageMaker 模型¶
要删除 Amazon Sagemaker 模型,可以使用 SageMakerDeleteModelOperator
。
tests/system/amazon/aws/example_sagemaker.py
delete_model = SageMakerDeleteModelOperator(
task_id="delete_model",
config={"ModelName": test_setup["model_name"]},
)
创建 Amazon SageMaker 转换任务¶
要创建 Amazon Sagemaker 转换任务,可以使用 SageMakerTransformOperator
。
tests/system/amazon/aws/example_sagemaker.py
test_model = SageMakerTransformOperator(
task_id="test_model",
config=test_setup["transform_config"],
)
创建 Amazon SageMaker 端点配置任务¶
要创建 Amazon Sagemaker 端点配置任务,可以使用 SageMakerEndpointConfigOperator
。
tests/system/amazon/aws/example_sagemaker_endpoint.py
configure_endpoint = SageMakerEndpointConfigOperator(
task_id="configure_endpoint",
config=test_setup["endpoint_config_config"],
)
创建 Amazon SageMaker 端点任务¶
要创建 Amazon Sagemaker 端点,可以使用 SageMakerEndpointOperator
。
tests/system/amazon/aws/example_sagemaker_endpoint.py
deploy_endpoint = SageMakerEndpointOperator(
task_id="deploy_endpoint",
config=test_setup["deploy_endpoint_config"],
)
启动 Amazon SageMaker pipeline 执行¶
要触发已定义的 Amazon Sagemaker pipeline 的执行运行,可以使用 SageMakerStartPipelineOperator
。
tests/system/amazon/aws/example_sagemaker_pipeline.py
start_pipeline1 = SageMakerStartPipelineOperator(
task_id="start_pipeline1",
pipeline_name=pipeline_name,
)
停止 Amazon SageMaker pipeline 执行¶
要停止当前正在运行的 Amazon Sagemaker pipeline 执行,可以使用 SageMakerStopPipelineOperator
。
tests/system/amazon/aws/example_sagemaker_pipeline.py
stop_pipeline1 = SageMakerStopPipelineOperator(
task_id="stop_pipeline1",
pipeline_exec_arn=start_pipeline1.output,
)
注册 SageMaker 模型版本¶
要注册模型版本,可以使用 SageMakerRegisterModelVersionOperator
。执行此 Operator 的结果是一个模型包 (model package)。模型包是可重用的模型 artifact 抽象,它打包了推理所需的所有元素。它包含一个推理规范 (inference specification),该规范定义了要使用的推理镜像 (inference image) 以及模型权重位置 (model weights location)。模型包组 (model package group) 是模型包的集合。您可以使用此 Operator 为每个 DAG 运行向组中添加新版本和模型包。
tests/system/amazon/aws/example_sagemaker.py
register_model = SageMakerRegisterModelVersionOperator(
task_id="register_model",
image_uri=test_setup["inference_code_image"],
model_url=test_setup["model_trained_weights"],
package_group_name=test_setup["model_package_group_name"],
)
启动 AutoML 实验¶
要启动 AutoML 实验(也称为 SageMaker Autopilot),可以使用 SageMakerAutoMLOperator
。AutoML 实验将接收 CSV 格式的输入数据以及要学习预测的列,并在无人监督的情况下对其训练模型。输出将放置在 S3 存储桶中,如果已配置,则会自动部署。
tests/system/amazon/aws/example_sagemaker.py
automl = SageMakerAutoMLOperator(
task_id="auto_ML",
job_name=test_setup["auto_ml_job_name"],
s3_input=test_setup["input_data_uri"],
target_attribute="class",
s3_output=test_setup["output_data_uri"],
role_arn=test_context[ROLE_ARN_KEY],
time_limit=30, # will stop the job before it can do anything, but it's not the point here
)
创建供以后使用的实验¶
要创建 SageMaker 实验,可以使用 SageMakerCreateExperimentOperator
。这将创建一个实验,使其准备好与处理、训练和转换任务关联。
tests/system/amazon/aws/example_sagemaker.py
create_experiment = SageMakerCreateExperimentOperator(
task_id="create_experiment", name=test_setup["experiment_name"]
)
创建 SageMaker Notebook 实例¶
要创建 SageMaker Notebook 实例,可以使用 SageMakerCreateNotebookOperator
。这将创建一个准备好运行 Jupyter notebook 的 SageMaker Notebook 实例。
tests/system/amazon/aws/example_sagemaker_notebook.py
instance = SageMakerCreateNotebookOperator(
task_id="create_instance",
instance_name=instance_name,
instance_type="ml.t3.medium",
role_arn=role_arn,
wait_for_completion=True,
)
停止 SageMaker Notebook 实例¶
要终止 SageMaker Notebook 实例,可以使用 SageMakerStopNotebookOperator
。这将终止 ML 计算实例并断开 ML 存储卷的连接。
tests/system/amazon/aws/example_sagemaker_notebook.py
stop_instance = SageMakerStopNotebookOperator(
task_id="stop_instance",
instance_name=instance_name,
)
启动 SageMaker Notebook 实例¶
要启动 SageMaker Notebook 实例并重新连接 ML 存储卷,可以使用 SageMakerStartNotebookOperator。这将启动一个新的 ML 计算实例,其中包含最新版本的库并连接您的 ML 存储卷。
tests/system/amazon/aws/example_sagemaker_notebook.py
start_instance = SageMakerStartNoteBookOperator(
task_id="start_instance",
instance_name=instance_name,
)
删除 SageMaker Notebook 实例¶
要删除 SageMaker Notebook 实例,可以使用 SageMakerDeleteNotebookOperator
。这将终止实例并删除与实例关联的 ML 存储卷和网络接口。实例必须先停止才能删除。
tests/system/amazon/aws/example_sagemaker_notebook.py
delete_instance = SageMakerDeleteNotebookOperator(task_id="delete_instance", instance_name=instance_name)
Sensors¶
等待 Amazon SageMaker 训练任务状态¶
要检查 Amazon Sagemaker 训练任务的状态直到其达到终端状态,可以使用 SageMakerTrainingSensor
。
tests/system/amazon/aws/example_sagemaker.py
await_training = SageMakerTrainingSensor(
task_id="await_training",
job_name=test_setup["training_job_name"],
)
等待 Amazon SageMaker 转换任务状态¶
要检查 Amazon Sagemaker 转换任务的状态直到其达到终端状态,可以使用 SageMakerTransformOperator
。
tests/system/amazon/aws/example_sagemaker.py
await_transform = SageMakerTransformSensor(
task_id="await_transform",
job_name=test_setup["transform_job_name"],
)
等待 Amazon SageMaker 调优任务状态¶
要检查 Amazon Sagemaker 超参数调优任务的状态直到其达到终端状态,可以使用 SageMakerTuningSensor
。
tests/system/amazon/aws/example_sagemaker.py
await_tuning = SageMakerTuningSensor(
task_id="await_tuning",
job_name=test_setup["tuning_job_name"],
)
等待 Amazon SageMaker 端点状态¶
要检查 Amazon Sagemaker 端点的状态直到其达到终端状态,可以使用 SageMakerEndpointSensor
。
tests/system/amazon/aws/example_sagemaker_endpoint.py
await_endpoint = SageMakerEndpointSensor(
task_id="await_endpoint",
endpoint_name=test_setup["endpoint_name"],
)
等待 Amazon SageMaker pipeline 执行状态¶
要检查 Amazon Sagemaker pipeline 执行的状态直到其达到终端状态,可以使用 SageMakerPipelineSensor
。
tests/system/amazon/aws/example_sagemaker_pipeline.py
await_pipeline2 = SageMakerPipelineSensor(
task_id="await_pipeline2",
pipeline_exec_arn=start_pipeline2.output,
)
等待 Amazon SageMaker AutoML 实验状态¶
要检查 Amazon Sagemaker AutoML 任务的状态直到其达到终端状态,可以使用 SageMakerAutoMLSensor
。
tests/system/amazon/aws/example_sagemaker.py
automl = SageMakerAutoMLOperator(
task_id="auto_ML",
job_name=test_setup["auto_ml_job_name"],
s3_input=test_setup["input_data_uri"],
target_attribute="class",
s3_output=test_setup["output_data_uri"],
role_arn=test_context[ROLE_ARN_KEY],
time_limit=30, # will stop the job before it can do anything, but it's not the point here
)
等待 Amazon SageMaker 处理任务状态¶
要检查 Amazon Sagemaker 处理任务的状态直到其达到终端状态,可以使用 SageMakerProcessingSensor
。
tests/system/amazon/aws/example_sagemaker.py
await_preprocess = SageMakerProcessingSensor(
task_id="await_preprocess", job_name=test_setup["processing_job_name"]
)