airflow.providers.amazon.aws.hooks.batch_waiters

AWS Batch 服务等待器。

模块内容

BatchWaitersHook

一个用于管理 AWS Batch 服务等待器的实用工具。

class airflow.providers.amazon.aws.hooks.batch_waiters.BatchWaitersHook(*args, waiter_config=None, **kwargs)[源代码]

基类: airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook

一个用于管理 AWS Batch 服务等待器的实用工具。

import random
from airflow.providers.amazon.aws.operators.batch_waiters import BatchWaiters

# to inspect default waiters
waiters = BatchWaiters()
config = waiters.default_config  # type: Dict
waiter_names = waiters.list_waiters()  # -> ["JobComplete", "JobExists", "JobRunning"]

# The default_config is a useful stepping stone to creating custom waiters, e.g.
custom_config = waiters.default_config  # this is a deepcopy
# modify custom_config['waiters'] as necessary and get a new instance:
waiters = BatchWaiters(waiter_config=custom_config)
waiters.waiter_config  # check the custom configuration (this is a deepcopy)
waiters.list_waiters()  # names of custom waiters

# During the init for BatchWaiters, the waiter_config is used to build a waiter_model;
# and note that this only occurs during the class init, to avoid any accidental mutations
# of waiter_config leaking into the waiter_model.
waiters.waiter_model  # -> botocore.waiter.WaiterModel object

# The waiter_model is combined with the waiters.client to get a specific waiter
# and the details of the config on that waiter can be further modified without any
# accidental impact on the generation of new waiters from the defined waiter_model, e.g.
waiters.get_waiter("JobExists").config.delay  # -> 5
waiter = waiters.get_waiter("JobExists")  # -> botocore.waiter.Batch.Waiter.JobExists object
waiter.config.delay = 10
waiters.get_waiter("JobExists").config.delay  # -> 5 as defined by waiter_model

# To use a specific waiter, update the config and call the `wait()` method for jobId, e.g.
waiter = waiters.get_waiter("JobExists")  # -> botocore.waiter.Batch.Waiter.JobExists object
waiter.config.delay = random.uniform(1, 10)  # seconds
waiter.config.max_attempts = 10
waiter.wait(jobs=[jobId])
参数
property default_config: dict[源代码]

一个不可变的默认等待器配置。

返回

AWS Batch 服务的等待器配置

返回类型

dict

property waiter_config: dict[源代码]

此实例的不可变等待器配置;此属性返回 deepcopy

在 BatchWaiters 的初始化期间,waiter_config 用于构建 waiter_model,这仅在类初始化期间发生,以避免 waiter_config 的任何意外突变泄漏到 waiter_model 中。

返回

AWS Batch 服务的等待器配置

返回类型

dict

property waiter_model: botocore.waiter.WaiterModel[源代码]

一个配置的等待器模型,用于在 AWS Batch 服务上生成等待器。

返回

AWS Batch 服务的等待器模型

返回类型

botocore.waiter.WaiterModel

get_waiter(waiter_name, _=None, deferrable=False, client=None)[源代码]

使用配置的 .waiter_model 获取 AWS Batch 服务等待器。

.waiter_model.client 结合使用以获取特定的等待器,并且可以修改该等待器的属性,而不会对从 .waiter_model 生成新等待器产生任何意外影响,例如。

waiters.get_waiter("JobExists").config.delay  # -> 5
waiter = waiters.get_waiter("JobExists")  # a new waiter object
waiter.config.delay = 10
waiters.get_waiter("JobExists").config.delay  # -> 5 as defined by waiter_model

要使用特定的等待器,请更新配置并调用 jobId 的 wait() 方法,例如。

import random

waiter = waiters.get_waiter("JobExists")  # a new waiter object
waiter.config.delay = random.uniform(1, 10)  # seconds
waiter.config.max_attempts = 10
waiter.wait(jobs=[jobId])
参数
  • waiter_name (str) – 等待器的名称。 该名称应与等待器模型文件中键名称的名称(包括大小写)匹配;请参阅 .list_waiters

  • _ (dict[str, str] | None) – 未使用,仅在此处以匹配 base_aws 中的方法签名

返回

指定名称的 AWS Batch 服务的等待器对象

返回类型

botocore.waiter.Waiter

list_waiters()[源代码]

列出 AWS Batch 服务的等待器配置中的等待器。

返回

AWS Batch 服务的等待器名称

返回类型

list[str]

wait_for_job(job_id, delay=None, get_batch_log_fetcher=None)[源代码]

等待 Batch 作业完成。

这假定 .waiter_model 是使用 .default_config 的某种变体配置的,以便它可以生成具有以下名称的等待器:“JobExists”、“JobRunning”和“JobComplete”。

参数
引发

AirflowException

注意

此方法向 delay 添加一个小的随机抖动 (+/- 2 秒,>= 1 秒)。 当许多并发任务请求作业描述时,使用随机间隔有助于避免 AWS API 限制。

它还会修改 max_attempts 以使用 sys.maxsize,这允许 Airflow 管理等待超时。

此条目是否有帮助?