airflow.providers.amazon.aws.hooks.sagemaker

模块内容

LogState

保存 CloudWatch 日志流的所有可能状态的枚举风格类。

SageMakerHook

与 Amazon SageMaker 交互。

函数

argmin(arr, f)

给定可调用对象 f,查找 arr 中使 f(arr[i]) 最小化的索引。

secondary_training_status_changed(...)

检查训练作业的二级状态消息是否已更改。

secondary_training_status_message(job_description, ...)

格式化包含开始时间和二级训练作业状态消息的字符串。

属性

Position

class airflow.providers.amazon.aws.hooks.sagemaker.LogState[源代码]

保存 CloudWatch 日志流的所有可能状态的枚举风格类。

https://sagemaker.readthedocs.io/en/stable/session.html#sagemaker.session.LogState

STARTING = 1[源代码]
WAIT_IN_PROGRESS = 2[源代码]
TAILING = 3[源代码]
JOB_COMPLETE = 4[源代码]
COMPLETE = 5[源代码]
airflow.providers.amazon.aws.hooks.sagemaker.Position[源代码]
airflow.providers.amazon.aws.hooks.sagemaker.argmin(arr, f)[源代码]

给定可调用对象 f,查找 arr 中使 f(arr[i]) 最小化的索引。

如果 arr 为空,则返回 None。

airflow.providers.amazon.aws.hooks.sagemaker.secondary_training_status_changed(current_job_description, prev_job_description)[源代码]

检查训练作业的二级状态消息是否已更改。

参数
  • current_job_description (dict) – 当前作业描述,从 DescribeTrainingJob 调用返回。

  • prev_job_description (dict) – 上一个作业描述,从 DescribeTrainingJob 调用返回。

返回

训练作业的二级状态消息是否更改。

返回类型

bool

airflow.providers.amazon.aws.hooks.sagemaker.secondary_training_status_message(job_description, prev_description)[源代码]

格式化包含开始时间和二级训练作业状态消息的字符串。

参数
  • job_description (dict[str, list[Any]]) – 从 DescribeTrainingJob 调用返回的响应

  • prev_description (dict | None) – 来自 DescribeTrainingJob 调用的上一个作业描述

返回

要打印的作业状态字符串。

返回类型

str

class airflow.providers.amazon.aws.hooks.sagemaker.SageMakerHook(*args, **kwargs)[源代码]

基类: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 Amazon SageMaker 交互。

提供 boto3.client("sagemaker") 的厚包装器。

可以指定其他参数(例如 aws_conn_id),并将它们传递给底层的 AwsBaseHook。

non_terminal_states[源代码]
endpoint_non_terminal_states[源代码]
pipeline_non_terminal_states[源代码]
processing_job_non_terminal_states[源代码]
failed_states[源代码]
processing_job_failed_states[源代码]
training_failed_states[源代码]
tar_and_s3_upload(path, key, bucket)[源代码]

将本地文件或目录打包成 tar 文件并上传到 S3。

参数
  • path (str) – 本地文件或目录

  • key (str) – S3 键

  • bucket (str) – S3 存储桶

configure_s3_resources(config)[源代码]

从配置中提取 S3 操作并执行它们。

参数

config (dict) – SageMaker 操作的配置

check_s3_url(s3url)[源代码]

检查 S3 URL 是否存在。

参数

s3url (str) – S3 URL

check_training_config(training_config)[源代码]

检查训练配置是否有效。

参数

training_config (dict) – 训练配置

check_tuning_config(tuning_config)[源代码]

检查调优配置是否有效。

参数

tuning_config (dict) – 调优配置

multi_stream_iter(log_group, streams, positions=None)[源代码]

迭代可用的事件。

来自单个日志组中的一组日志流的事件,交错来自每个流的事件,以便它们按时间戳顺序产生。

参数
  • log_group (str) – 日志组的名称。

  • streams (list) – 日志流名称的列表。此列表中流的位置是流号。

  • positions – (时间戳, skip) 对的列表,表示从每个流读取的最后一条记录。

返回

一个 (流号, cloudwatch 日志事件) 的元组。

返回类型

collections.abc.Generator

create_training_job(config, wait_for_completion=True, print_log=True, check_interval=30, max_ingestion_time=None)[源代码]

启动模型训练作业。

训练完成后,Amazon SageMaker 会将生成的模型工件保存到您指定的 Amazon S3 位置。

参数
  • config (dict) – 训练的配置

  • wait_for_completion (bool) – 如果程序应保持运行直到作业完成

  • check_interval (int) – 运算符将检查任何 SageMaker 作业状态的时间间隔(以秒为单位)

  • max_ingestion_time (int | None) – 最大摄取时间(以秒为单位)。任何运行时间超过此时间的 SageMaker 作业都将失败。将其设置为 None 表示任何 SageMaker 作业都没有超时。

返回

对训练作业创建的响应

create_tuning_job(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[源代码]

启动超参数调优作业。

超参数调优作业通过在您的数据集上使用您选择的算法和您指定的范围内的超参数值运行多个训练作业来找到模型的最佳版本。然后,它选择超参数值,这些值通过您选择的目标指标来衡量,从而获得性能最佳的模型。

参数
  • config (dict) – 调优的配置

  • wait_for_completion (bool) – 如果程序应保持运行直到作业完成

  • check_interval (int) – 运算符将检查任何 SageMaker 作业状态的时间间隔(以秒为单位)

  • max_ingestion_time (int | None) – 最大摄取时间(以秒为单位)。任何运行时间超过此时间的 SageMaker 作业都将失败。将其设置为 None 表示任何 SageMaker 作业都没有超时。

返回

对调优作业创建的响应

create_transform_job(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[源代码]

启动转换作业。

转换作业使用训练好的模型来获取数据集的推论,并将这些结果保存到您指定的 Amazon S3 位置。

参数
  • config (dict) – 转换作业的配置

  • wait_for_completion (bool) – 如果程序应保持运行直到作业完成

  • check_interval (int) – 运算符将检查任何 SageMaker 作业状态的时间间隔(以秒为单位)

  • max_ingestion_time (int | None) – 最大摄取时间(以秒为单位)。任何运行时间超过此时间的 SageMaker 作业都将失败。将其设置为 None 表示任何 SageMaker 作业都没有超时。

返回

对转换作业创建的响应

create_processing_job(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[源代码]

使用 Amazon SageMaker Processing 来分析数据和评估模型。

借助 Processing,您可以在 SageMaker 上使用简化的托管体验来运行数据处理工作负载,例如特征工程、数据验证、模型评估和模型解释。

参数
  • config (dict) – 处理作业的配置

  • wait_for_completion (bool) – 如果程序应保持运行直到作业完成

  • check_interval (int) – 运算符将检查任何 SageMaker 作业状态的时间间隔(以秒为单位)

  • max_ingestion_time (int | None) – 最大摄取时间(以秒为单位)。任何运行时间超过此时间的 SageMaker 作业都将失败。将其设置为 None 表示任何 SageMaker 作业都没有超时。

返回

对转换作业创建的响应

create_model(config)[源代码]

在 Amazon SageMaker 中创建模型。

在请求中,您需要指定模型名称并描述一个主要容器。对于主要容器,您需要指定包含推理代码、工件(来自先前的训练)以及推理代码在部署模型进行预测时使用的自定义环境映射的 Docker 镜像。

参数

config (dict) – 模型的配置

返回

模型创建的响应

create_endpoint_config(config)[source]

创建端点配置以部署模型。

在配置中,您需要识别一个或多个使用 CreateModel API 创建的要部署的模型,以及您希望 Amazon SageMaker 配置的资源。

参数

config (dict) – 端点配置的配置

返回

端点配置创建的响应

create_endpoint(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

从配置创建端点。

当您创建无服务器端点时,SageMaker 会为您配置和管理计算资源。然后,您可以向端点发出推理请求并接收模型预测作为响应。SageMaker 会根据需要向上或向下扩展计算资源以处理您的请求流量。

参数
  • config (dict) – 端点的配置

  • wait_for_completion (bool) – 如果程序应保持运行直到作业完成

  • check_interval (int) – 运算符将检查任何 SageMaker 作业状态的时间间隔(以秒为单位)

  • max_ingestion_time (int | None) – 最大摄取时间(以秒为单位)。任何运行时间超过此时间的 SageMaker 作业都将失败。将其设置为 None 表示任何 SageMaker 作业都没有超时。

返回

端点创建的响应

update_endpoint(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

部署请求中的配置并切换到使用新端点。

使用之前的 EndpointConfig 为端点配置的资源将被删除(不会出现可用性损失)。

参数
  • config (dict) – 端点的配置

  • wait_for_completion (bool) – 如果程序应保持运行直到作业完成

  • check_interval (int) – 运算符将检查任何 SageMaker 作业状态的时间间隔(以秒为单位)

  • max_ingestion_time (int | None) – 最大摄取时间(以秒为单位)。任何运行时间超过此时间的 SageMaker 作业都将失败。将其设置为 None 表示任何 SageMaker 作业都没有超时。

返回

端点更新的响应

describe_training_job(name)[source]

获取与名称关联的训练作业信息。

参数

name (str) – 训练作业的名称

返回

一个包含所有训练作业信息的字典

describe_training_job_with_log(job_name, positions, stream_names, instance_count, state, last_description, last_describe_job_call)[source]

获取相关的训练作业信息并打印 CloudWatch 日志。

describe_tuning_job(name)[source]

获取与名称关联的调优作业信息。

参数

name (str) – 调优作业的名称

返回

一个包含所有调优作业信息的字典

返回类型

字典

describe_model(name)[source]

获取与名称关联的 SageMaker 模型信息。

参数

name (str) – SageMaker 模型的名称

返回

一个包含所有模型信息的字典

返回类型

字典

describe_transform_job(name)[source]

获取与名称关联的转换作业信息。

参数

name (str) – 转换作业的名称

返回

一个包含所有转换作业信息的字典

返回类型

字典

describe_processing_job(name)[source]

获取与名称关联的处理作业信息。

参数

name (str) – 处理作业的名称

返回

一个包含所有处理作业信息的字典

返回类型

字典

describe_endpoint_config(name)[source]

获取与名称关联的端点配置信息。

参数

name (str) – 端点配置的名称

返回

一个包含所有端点配置信息的字典

返回类型

字典

describe_endpoint(name)[source]

获取端点的描述信息。

参数

name (str) – 端点的名称

返回

一个包含所有端点信息的字典

返回类型

字典

check_status(job_name, key, describe_function, check_interval, max_ingestion_time=None, non_terminal_states=None)[source]

检查 SageMaker 资源的状态。

参数
  • job_name (str) – 要检查状态的资源的名称,可以是作业,也可以是管道等。

  • key (str) – 响应字典中指向状态的键

  • describe_function (Callable) – 用于检索状态的函数

  • args – 函数的参数

  • check_interval (int) – 运算符检查任何 SageMaker 资源状态的时间间隔(以秒为单位)

  • max_ingestion_time (int | None) – 最大摄取时间(秒)。任何运行时间超过此值的 SageMaker 资源都将失败。将其设置为 None 表示任何 SageMaker 资源都没有超时。

  • non_terminal_states (set | None) – 非终端状态的集合

返回

资源完成后描述调用的响应

返回类型

字典

check_training_status_with_log(job_name, non_terminal_states, failed_states, wait_for_completion, check_interval, max_ingestion_time=None)[源代码]

显示给定训练作业的日志。

可选择跟踪它们直到作业完成。

参数
  • job_name (str) – 要检查状态并显示日志的训练作业的名称

  • non_terminal_states (set) – 非终端状态的集合

  • failed_states (set) – 失败状态的集合

  • wait_for_completion (bool) – 是否继续查找新的日志条目直到作业完成

  • check_interval (int) – 轮询新日志条目和作业完成之间的时间间隔(秒)

  • max_ingestion_time (int | None) – 最大摄取时间(以秒为单位)。任何运行时间超过此时间的 SageMaker 作业都将失败。将其设置为 None 表示任何 SageMaker 作业都没有超时。

list_training_jobs(name_contains=None, max_results=None, **kwargs)[源代码]

调用 boto3 的 list_training_jobs

训练作业名称和最大结果可以通过参数配置。其他参数不是,应通过 kwargs 提供。请注意,boto3 期望这些参数为驼峰式命名,例如

list_training_jobs(name_contains="myjob", StatusEquals="Failed")
参数
  • name_contains (str | None) – (可选)要匹配的部分名称

  • max_results (int | None) – (可选)要返回的最大结果数。None 返回无限结果

  • kwargs – (可选) 传递给 boto3 的 list_training_jobs 方法的 kwargs

返回

list_training_jobs 请求的结果

返回类型

list[dict]

list_transform_jobs(name_contains=None, max_results=None, **kwargs)[源代码]

调用 boto3 的 list_transform_jobs

转换作业名称和最大结果可以通过参数配置。其他参数不是,应通过 kwargs 提供。请注意,boto3 期望这些参数为驼峰式命名,例如

list_transform_jobs(name_contains="myjob", StatusEquals="Failed")
参数
  • name_contains (str | None) – (可选) 要匹配的部分名称。

  • max_results (int | None) – (可选)要返回的最大结果数。None 返回无限结果。

  • kwargs – (可选) 传递给 boto3 的 list_transform_jobs 方法的 kwargs。

返回

list_transform_jobs 请求的结果。

返回类型

list[dict]

list_processing_jobs(**kwargs)[源代码]

调用 boto3 的 list_processing_jobs

所有参数应通过 kwargs 提供。请注意,boto3 期望这些参数为驼峰式命名,例如

list_processing_jobs(NameContains="myjob", StatusEquals="Failed")
参数

kwargs – (可选) 传递给 boto3 的 list_training_jobs 方法的 kwargs

返回

list_processing_jobs 请求的结果

返回类型

list[dict]

count_processing_jobs_by_name(processing_job_name, job_name_suffix=None, throttle_retry_delay=2, retries=3)[源代码]

获取以提供的名称前缀找到的处理作业的数量。

参数
  • processing_job_name (str) – 要查找的前缀。

  • job_name_suffix (str | None) – 可选的后缀,可以附加到现有作业名称以进行重复数据删除。

  • throttle_retry_delay (int) – 如果遇到 ThrottlingException,则等待的秒数。

  • retries (int) – 最大重试次数。

返回

以提供的前缀开头的处理作业的数量。

返回类型

int

delete_model(model_name)[源代码]

删除 SageMaker 模型。

参数

model_name (str) – 模型的名称

describe_pipeline_exec(pipeline_exec_arn, verbose=False)[源代码]

获取有关 SageMaker 管道执行的信息。

参数
  • pipeline_exec_arn (str) – 管道执行的 arn

  • verbose (bool) – 是否记录有关管道执行中步骤状态的详细信息

start_pipeline(pipeline_name, display_name='airflow-triggered-execution', pipeline_params=None)[源代码]

为 SageMaker 管道启动新的执行。

参数
  • pipeline_name (str) – 要启动的管道的名称(这不是 ARN)。

  • display_name (str) – 此管道执行将在 UI 中显示的名称。不需要是唯一的。

  • pipeline_params (dict | None) – 管道的可选参数。提供的所有参数都需要已存在于管道定义中。

返回

启动的管道执行的 ARN。

返回类型

str

stop_pipeline(pipeline_exec_arn, fail_if_not_running=False)[源代码]

停止 SageMaker 管道执行。

参数
  • pipeline_exec_arn (str) – 管道执行的 Amazon 资源名称 (ARN)。它是管道本身的 ARN,后跟“/execution/”和一个 ID。

  • fail_if_not_running (bool) – 如果尝试停止的管道在发送调用时未处于“执行中”状态(这意味着管道已在停止或已停止),则此方法将引发异常。请注意,如果管道在停止之前已成功完成,则将此项设置为 True 会引发错误。

返回

操作后管道执行的状态。为 ‘Executing’|’Stopping’|’Stopped’|’Failed’|’Succeeded’ 其中之一。

返回类型

str

create_model_package_group(package_group_name, package_group_desc='')[源代码]

如果模型包组尚不存在,则创建模型包组。

参数
  • package_group_name (str) – 要创建的模型包组的名称(如果尚不存在)。

  • package_group_desc (str) – 如果要创建模型包组的描述(可选)。

返回

如果创建了模型包组,则为 True,如果已存在,则为 False。

返回类型

bool

create_auto_ml_job(job_name, s3_input, target_attribute, s3_output, role_arn, compressed_input=False, time_limit=None, autodeploy_endpoint_name=None, extras=None, wait_for_completion=True, check_interval=30)[源代码]

创建自动 ML 作业以预测给定的列。

学习输入基于通过 S3 提供的数据,输出写入指定的 S3 位置。

参数
  • job_name (str) – 要创建的作业的名称,在帐户中必须是唯一的。

  • s3_input (str) – 从中获取数据的 S3 位置(文件夹或文件)。默认情况下,它期望带有标题的 csv。

  • target_attribute (str) – 包含要预测的值的列的名称。

  • s3_output (str) – 写入模型构件的 S3 文件夹。必须为 128 个字符或更少。

  • role_arn (str) – 与 S3 交互时使用的 IAM 角色的 ARN。必须具有对输入的读取权限和对输出文件夹的写入权限。

  • compressed_input (bool) – 如果输入已压缩为 gzip,则设置为 True。

  • time_limit (int | None) – 用于训练模型的最长时间(以秒为单位)。

  • autodeploy_endpoint_name (str | None) – 如果指定,最佳模型将部署到具有该名称的端点。否则不进行部署。

  • extras (dict | None) – 使用此字典来设置未通过此函数的参数提供的作业创建的任何可变输入变量。格式在:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_auto_ml_job 中描述。

  • wait_for_completion (bool) – 是否等待作业完成再返回。默认为 True。

  • check_interval (int) – 等待完成时两次状态检查之间的间隔(以秒为单位)。

返回

仅在等待完成时,才会返回一个详细描述最佳模型的字典。该结构是:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_auto_ml_job 中的 “BestCandidate” 键的结构。

返回类型

dict | None

static count_billable_seconds(training_start_time, training_end_time, instance_count)[源代码]
async describe_training_job_async(job_name)[源代码]

返回与该名称关联的训练作业信息。

参数

job_name (str) – 训练作业的名称

async describe_training_job_with_log_async(job_name, positions, stream_names, instance_count, state, last_description, last_describe_job_call)[源代码]

返回与 job_name 关联的训练作业信息并打印 CloudWatch 日志。

参数
  • job_name (str) – 要检查状态的作业的名称

  • positions (dict[str, Any]) – (时间戳,跳过) 对的列表,表示从每个流读取的最后一条记录。

  • stream_names (list[str]) – 日志流名称列表。此列表中流的位置是流编号。

  • instance_count (int) – 最初为作业创建的实例计数

  • state (int) – 日志状态

  • last_description (dict[str, Any]) – 训练作业的最新描述

  • last_describe_job_call (float) – 上次调用作业的时间

async get_multi_stream(log_group, streams, positions)[source]

迭代可用的事件,并交错来自每个流的事件,以便它们按时间戳顺序生成。

参数
  • log_group (str) – 日志组的名称。

  • streams (list[str]) – 日志流名称的列表。此列表中流的位置是流编号。

  • positions (dict[str, Any]) – (时间戳,跳过) 对的列表,表示从每个流读取的最后一条记录。

此条目是否有帮助?