airflow.providers.amazon.aws.hooks.sagemaker

LogState

一种枚举风格的类,包含 CloudWatch 日志流的所有可能状态。

Position

SageMakerHook

与 Amazon SageMaker 交互。

函数

argmin(arr, f)

给定可调用对象 f,在 arr 中找到使 f(arr[i]) 最小化的索引。

secondary_training_status_changed(...)

检查训练作业的二级状态消息是否已更改。

secondary_training_status_message(job_description, ...)

格式化包含开始时间和训练作业二级状态消息的字符串。

模块内容

class airflow.providers.amazon.aws.hooks.sagemaker.LogState[source]

一种枚举风格的类,包含 CloudWatch 日志流的所有可能状态。

https://sagemaker.readthedocs.io/en/stable/session.html#sagemaker.session.LogState

STARTING = 1[source]
WAIT_IN_PROGRESS = 2[source]
TAILING = 3[source]
JOB_COMPLETE = 4[source]
COMPLETE = 5[source]
class airflow.providers.amazon.aws.hooks.sagemaker.Position[source]

基类: tuple

timestamp[source]
skip[source]
airflow.providers.amazon.aws.hooks.sagemaker.argmin(arr, f)[source]

给定可调用对象 f,在 arr 中找到使 f(arr[i]) 最小化的索引。

如果 arr 为空,则返回 None。

airflow.providers.amazon.aws.hooks.sagemaker.secondary_training_status_changed(current_job_description, prev_job_description)[source]

检查训练作业的二级状态消息是否已更改。

参数:
  • current_job_description (dict) – 当前作业描述,来自 DescribeTrainingJob 调用返回。

  • prev_job_description (dict) – 之前的作业描述,来自 DescribeTrainingJob 调用返回。

返回:

训练作业的二级状态消息是否已更改。

返回类型:

bool

airflow.providers.amazon.aws.hooks.sagemaker.secondary_training_status_message(job_description, prev_description)[source]

格式化包含开始时间和训练作业二级状态消息的字符串。

参数:
  • job_description (dict[str, list[Any]]) – DescribeTrainingJob 调用返回的响应

  • prev_description (dict | None) – DescribeTrainingJob 调用返回的之前的作业描述

返回:

要打印的作业状态字符串。

返回类型:

str

class airflow.providers.amazon.aws.hooks.sagemaker.SageMakerHook(*args, **kwargs)[source]

基类: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 Amazon SageMaker 交互。

提供对 boto3.client("sagemaker") 的厚封装。

可以指定其他参数(例如 aws_conn_id),这些参数会传递给底层的 AwsBaseHook。

non_terminal_states[source]
endpoint_non_terminal_states[source]
pipeline_non_terminal_states[source]
processing_job_non_terminal_states[source]
failed_states[source]
processing_job_failed_states[source]
training_failed_states[source]
s3_hook[source]
logs_hook[source]
tar_and_s3_upload(path, key, bucket)[source]

将本地文件或目录打包成 tar 文件并上传到 S3。

参数:
  • path (str) – 本地文件或目录

  • key (str) – S3 键

  • bucket (str) – S3 存储桶

configure_s3_resources(config)[source]

从配置中提取 S3 操作并执行它们。

参数:

config (dict) – SageMaker 操作的配置

check_s3_url(s3url)[source]

检查 S3 URL 是否存在。

参数:

s3url (str) – S3 URL

check_training_config(training_config)[source]

检查训练配置是否有效。

参数:

training_config (dict) – 训练配置

check_tuning_config(tuning_config)[source]

检查调优配置是否有效。

参数:

tuning_config (dict) – 调优配置

multi_stream_iter(log_group, streams, positions=None)[source]

遍历可用事件。

从单个日志组中的一组日志流中提取事件,并将来自每个流的事件交织在一起,以便按时间戳顺序生成。

参数:
  • log_group (str) – 日志组的名称。

  • streams (list) – 日志流名称列表。流在此列表中的位置即为流编号。

  • positions – 一个由 (timestamp, skip) 对组成的列表,表示从每个流读取的最后一条记录。

返回:

一个 (流编号, CloudWatch 日志事件) 元组。

返回类型:

collections.abc.Generator

create_training_job(config, wait_for_completion=True, print_log=True, check_interval=30, max_ingestion_time=None)[source]

启动一个模型训练作业。

训练完成后,Amazon SageMaker 会将生成的模型制品保存到您指定的 Amazon S3 位置。

参数:
  • config (dict) – 训练配置

  • wait_for_completion (bool) – 如果程序应持续运行直到作业完成

  • check_interval (int) – Operator 检查任何 SageMaker 作业状态的时间间隔(秒)

  • max_ingestion_time (int | None) – 最大摄取时间(秒)。任何运行时间超过此值的 SageMaker 作业将失败。将其设置为 None 表示对任何 SageMaker 作业都没有超时限制。

返回:

训练作业创建的响应

create_tuning_job(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

启动一个超参数调优作业。

超参数调优作业通过使用您选择的算法和在指定范围内超参数值,在您的数据集上运行许多训练作业来找到模型的最佳版本。然后,它会选择能够产生最佳性能模型的超参数值(以您选择的目标指标衡量)。

参数:
  • config (dict) – 调优配置

  • wait_for_completion (bool) – 如果程序应持续运行直到作业完成

  • check_interval (int) – Operator 检查任何 SageMaker 作业状态的时间间隔(秒)

  • max_ingestion_time (int | None) – 最大摄取时间(秒)。任何运行时间超过此值的 SageMaker 作业将失败。将其设置为 None 表示对任何 SageMaker 作业都没有超时限制。

返回:

调优作业创建的响应

create_transform_job(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

启动一个转换作业。

转换作业使用训练好的模型在数据集上进行推理,并将结果保存到您指定的 Amazon S3 位置。

参数:
  • config (dict) – 转换作业配置

  • wait_for_completion (bool) – 如果程序应持续运行直到作业完成

  • check_interval (int) – Operator 检查任何 SageMaker 作业状态的时间间隔(秒)

  • max_ingestion_time (int | None) – 最大摄取时间(秒)。任何运行时间超过此值的 SageMaker 作业将失败。将其设置为 None 表示对任何 SageMaker 作业都没有超时限制。

返回:

转换作业创建的响应

create_processing_job(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

使用 Amazon SageMaker Processing 分析数据和评估模型。

通过 Processing,您可以在 SageMaker 上使用简化的托管体验来运行数据处理工作负载,例如特征工程、数据验证、模型评估和模型解释。

参数:
  • config (dict) – 处理作业配置

  • wait_for_completion (bool) – 如果程序应持续运行直到作业完成

  • check_interval (int) – Operator 检查任何 SageMaker 作业状态的时间间隔(秒)

  • max_ingestion_time (int | None) – 最大摄取时间(秒)。任何运行时间超过此值的 SageMaker 作业将失败。将其设置为 None 表示对任何 SageMaker 作业都没有超时限制。

返回:

转换作业创建的响应

create_model(config)[source]

在 Amazon SageMaker 中创建模型。

在请求中,您命名模型并描述一个主容器。对于主容器,您指定包含推理代码、制品(来自之前的训练)以及部署模型进行预测时推理代码使用的自定义环境变量映射的 Docker 镜像。

参数:

config (dict) – 模型配置

返回:

模型创建的响应

create_endpoint_config(config)[source]

创建用于部署模型的端点配置。

在配置中,您确定要部署的一个或多个模型(使用 CreateModel API 创建)以及您希望 Amazon SageMaker 预置的资源。

参数:

config (dict) – 端点配置

返回:

端点配置创建的响应

create_endpoint(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

从配置创建端点。

创建无服务器端点时,SageMaker 会为您预置和管理计算资源。然后,您可以向端点发送推理请求并接收模型预测作为响应。SageMaker 会根据请求流量需要扩展或缩减计算资源。

参数:
  • config (dict) – 端点配置

  • wait_for_completion (bool) – 如果程序应持续运行直到作业完成

  • check_interval (int) – Operator 检查任何 SageMaker 作业状态的时间间隔(秒)

  • max_ingestion_time (int | None) – 最大摄取时间(秒)。任何运行时间超过此值的 SageMaker 作业将失败。将其设置为 None 表示对任何 SageMaker 作业都没有超时限制。

返回:

端点创建的响应

update_endpoint(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

部署请求中的配置并切换到使用新端点。

使用之前的 EndpointConfig 为端点预置的资源将被删除(不会丢失可用性)。

参数:
  • config (dict) – 端点配置

  • wait_for_completion (bool) – 如果程序应持续运行直到作业完成

  • check_interval (int) – Operator 检查任何 SageMaker 作业状态的时间间隔(秒)

  • max_ingestion_time (int | None) – 最大摄取时间(秒)。任何运行时间超过此值的 SageMaker 作业将失败。将其设置为 None 表示对任何 SageMaker 作业都没有超时限制。

返回:

端点更新的响应

describe_training_job(name)[source]

获取与名称关联的训练作业信息。

参数:

name (str) – 训练作业的名称

返回:

包含所有训练作业信息的字典

describe_training_job_with_log(job_name, positions, stream_names, instance_count, state, last_description, last_describe_job_call)[source]

获取关联的训练作业信息并打印 CloudWatch 日志。

describe_tuning_job(name)[source]

获取与名称关联的调优作业信息。

参数:

name (str) – 调优作业的名称

返回:

一个字典包含所有微调作业信息

返回类型:

字典

describe_model(name)[source]

获取与名称关联的 SageMaker 模型信息。

参数:

name (str) – SageMaker 模型的名称

返回:

一个字典包含所有模型信息

返回类型:

字典

describe_transform_job(name)[source]

获取与名称关联的转换作业信息。

参数:

name (str) – 转换作业的名称

返回:

一个字典包含所有转换作业信息

返回类型:

字典

describe_processing_job(name)[source]

获取与名称关联的处理作业信息。

参数:

name (str) – 处理作业的名称

返回:

一个字典包含所有处理作业信息

返回类型:

字典

describe_endpoint_config(name)[source]

获取与名称关联的终端节点配置信息。

参数:

name (str) – 终端节点配置的名称

返回:

一个字典包含所有终端节点配置信息

返回类型:

字典

describe_endpoint(name)[source]

获取终端节点的描述。

参数:

name (str) – 终端节点的名称

返回:

一个字典包含所有终端节点信息

返回类型:

字典

check_status(job_name, key, describe_function, check_interval, max_ingestion_time=None, non_terminal_states=None)[source]

检查 SageMaker 资源的状态。

参数:
  • job_name (str) – 要检查状态的资源的名称,可以是作业,例如也可以是管道。

  • key (str) – 响应字典中指向状态的键

  • describe_function (Callable) – 用于检索状态的函数

  • args – 函数的参数

  • check_interval (int) – 操作符检查任何 SageMaker 资源状态的时间间隔(秒)

  • max_ingestion_time (int | None) – 最大摄取时间(秒)。任何运行时间超过此值的 SageMaker 资源都将失败。将其设置为 None 表示对任何 SageMaker 资源都没有超时限制。

  • non_terminal_states (set | None) – 非终止状态的集合

返回:

资源完成后 describe 调用的响应

返回类型:

字典

check_training_status_with_log(job_name, non_terminal_states, failed_states, wait_for_completion, check_interval, max_ingestion_time=None)[source]

显示给定训练作业的日志。

可选地跟踪它们,直到作业完成。

参数:
  • job_name (str) – 要检查状态和显示日志的训练作业的名称

  • non_terminal_states (set) – 非终止状态的集合

  • failed_states (set) – 失败状态的集合

  • wait_for_completion (bool) – 是否持续查找新的日志条目,直到作业完成

  • check_interval (int) – 轮询新日志条目和作业完成之间的间隔(秒)

  • max_ingestion_time (int | None) – 最大摄取时间(秒)。任何运行时间超过此值的 SageMaker 作业将失败。将其设置为 None 表示对任何 SageMaker 作业都没有超时限制。

list_training_jobs(name_contains=None, max_results=None, **kwargs)[source]

调用 boto3 的 list_training_jobs

训练作业名称和最大结果数可通过参数配置。其他参数不可通过参数配置,应通过 kwargs 提供。请注意,boto3 期望这些参数采用驼峰命名法(CamelCase),例如

list_training_jobs(name_contains="myjob", StatusEquals="Failed")
参数:
  • name_contains (str | None) – (可选)要匹配的部分名称

  • max_results (int | None) – (可选)要返回的最大结果数。None 表示返回无限结果

  • kwargs – (可选)传递给 boto3 的 list_training_jobs 方法的 kwargs 参数

返回:

list_training_jobs 请求的结果

返回类型:

list[dict]

list_transform_jobs(name_contains=None, max_results=None, **kwargs)[source]

调用 boto3 的 list_transform_jobs

转换作业名称和最大结果数可通过参数配置。其他参数不可通过参数配置,应通过 kwargs 提供。请注意,boto3 期望这些参数采用驼峰命名法(CamelCase),例如

list_transform_jobs(name_contains="myjob", StatusEquals="Failed")
参数:
  • name_contains (str | None) – (可选)要匹配的部分名称。

  • max_results (int | None) – (可选)要返回的最大结果数。None 表示返回无限结果。

  • kwargs – (可选)传递给 boto3 的 list_transform_jobs 方法的 kwargs 参数。

返回:

list_transform_jobs 请求的结果。

返回类型:

list[dict]

list_processing_jobs(**kwargs)[source]

调用 boto3 的 list_processing_jobs

所有参数都应通过 kwargs 提供。请注意,boto3 期望这些参数采用驼峰命名法(CamelCase),例如

list_processing_jobs(NameContains="myjob", StatusEquals="Failed")
参数:

kwargs – (可选)传递给 boto3 的 list_training_jobs 方法的 kwargs 参数

返回:

list_processing_jobs 请求的结果

返回类型:

list[dict]

count_processing_jobs_by_name(processing_job_name, job_name_suffix=None, throttle_retry_delay=2, retries=3)[source]

获取找到的具有指定名称前缀的处理作业数量。

参数:
  • processing_job_name (str) – 要查找的前缀。

  • job_name_suffix (str | None) – 可选的后缀,可附加用于消除现有作业名称的重复。

  • throttle_retry_delay (int) – 遇到 ThrottlingException 时等待的秒数。

  • retries (int) – 最大重试次数。

返回:

以指定前缀开头的处理作业数量。

返回类型:

int

delete_model(model_name)[source]

删除 SageMaker 模型。

参数:

model_name (str) – 模型的名称

describe_pipeline_exec(pipeline_exec_arn, verbose=False)[source]

获取有关 SageMaker 管道执行的信息。

参数:
  • pipeline_exec_arn (str) – 管道执行的 ARN

  • verbose (bool) – 是否记录管道执行中步骤状态的详细信息

start_pipeline(pipeline_name, display_name='airflow-triggered-execution', pipeline_params=None)[source]

启动 SageMaker 管道的新执行。

参数:
  • pipeline_name (str) – 要启动的管道的名称(这不是 ARN)。

  • display_name (str) – 此管道执行将在 UI 中显示的名称。无需唯一。

  • pipeline_params (dict | None) – 管道的可选参数。提供的所有参数都需要已存在于管道定义中。

返回:

启动的管道执行的 ARN。

返回类型:

str

stop_pipeline(pipeline_exec_arn, fail_if_not_running=False)[source]

停止 SageMaker 管道执行。

参数:
  • pipeline_exec_arn (str) – 管道执行的 Amazon Resource Name (ARN)。它是管道本身的 ARN 加上“/execution/”和一个 id。

  • fail_if_not_running (bool) – 如果在我们尝试停止管道时,该管道在调用发送时未处于“Executing”(执行中)状态(这意味着管道已经处于停止中或已停止),此方法将引发异常。请注意,将其设置为 True 将在管道在停止前已成功完成时引发错误。

返回:

操作后管道执行的状态。是 ‘Executing’|’Stopping’|’Stopped’|’Failed’|’Succeeded’ 中的一个。

返回类型:

str

create_model_package_group(package_group_name, package_group_desc='')[source]

如果模型包组尚不存在,则创建它。

参数:
  • package_group_name (str) – 如果模型包组尚不存在,要创建的模型包组的名称。

  • package_group_desc (str) – 模型包组的描述,如果它将被创建(可选)。

返回:

如果创建了模型包组,则为 True;如果它已存在,则为 False。

返回类型:

bool

create_auto_ml_job(job_name, s3_input, target_attribute, s3_output, role_arn, compressed_input=False, time_limit=None, autodeploy_endpoint_name=None, extras=None, wait_for_completion=True, check_interval=30)[source]

创建一个 AutoML 作业来预测给定列。

学习输入基于通过 S3 提供的数据,输出写入指定的 S3 位置。

参数:
  • job_name (str) – 要创建的作业名称,在账户内必须唯一。

  • s3_input (str) – 用于获取数据的 S3 位置(文件夹或文件)。默认情况下,它期望带标题行的 csv。

  • target_attribute (str) – 包含要预测的值的列的名称。

  • s3_output (str) – 写入模型工件的 S3 文件夹。必须少于或等于 128 个字符。

  • role_arn (str) – 与 S3 交互时使用的 ARN 或 IAM 角色。必须对输入文件夹有读访问权限,对输出文件夹有写访问权限。

  • compressed_input (bool) – 如果输入是 gzipped 格式,则设置为 True。

  • time_limit (int | None) – 用于训练模型的最长时间(秒)。

  • autodeploy_endpoint_name (str | None) – 如果指定,最佳模型将部署到具有该名称的终端节点。否则不进行部署。

  • extras (dict | None) – 使用此字典设置此函数参数未提供的任何可变输入参数,用于作业创建。格式说明见:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_auto_ml_job

  • wait_for_completion (bool) – 是否等待作业完成后再返回。默认为 True。

  • check_interval (int) – 等待完成时两次状态检查之间的间隔(秒)。

返回:

仅当等待完成时,返回一个详细说明最佳模型的字典。其结构与以下链接中“BestCandidate”键的结构相同:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_auto_ml_job

返回类型:

dict | None

static count_billable_seconds(training_start_time, training_end_time, instance_count)[source]
async describe_training_job_async(job_name)[source]

返回与名称关联的训练作业信息。

参数:

job_name (str) – 训练作业的名称

async describe_training_job_with_log_async(job_name, positions, stream_names, instance_count, state, last_description, last_describe_job_call)[source]

返回与 job_name 关联的训练作业信息并打印 CloudWatch 日志。

参数:
  • job_name (str) – 要检查状态的作业名称

  • positions (dict[str, Any]) – 一个由 (timestamp, skip) 对组成的列表,表示从每个流读取的最后一条记录。

  • stream_names (list[str]) – 日志流名称的列表。流在此列表中的位置是流编号。

  • instance_count (int) – 作业最初创建的实例数量

  • state (int) – 日志状态

  • last_description (dict[str, Any]) – 训练作业的最新描述

  • last_describe_job_call (float) – 上次调用作业的时间

async get_multi_stream(log_group, streams, positions)[source]

迭代处理可用的事件,并将来自每个流的事件交错排列,以便它们按时间戳顺序产生。

参数:
  • log_group (str) – 日志组的名称。

  • streams (list[str]) – 日志流名称的列表。流在此列表中的位置是流编号。

  • positions (dict[str, Any]) – 一个由 (timestamp, skip) 对组成的列表,表示从每个流读取的最后一条记录。

此条目是否有帮助?