airflow.providers.amazon.aws.hooks.batch_client

用于 AWS Batch 服务的客户端。

模块内容

BatchProtocol

boto3.client('batch') -> botocore.client.Batch 的结构化协议。

BatchClientHook

与 AWS Batch 交互。

class airflow.providers.amazon.aws.hooks.batch_client.BatchProtocol[源代码]

基类:airflow.typing_compat.Protocol

boto3.client('batch') -> botocore.client.Batch 的结构化协议。

这用于 BatchClient.client() 的类型提示;它仅涵盖所需客户端方法的子集。

describe_jobs(jobs)[源代码]

从 AWS Batch 获取作业描述。

参数

jobs (list[str]) – 要描述的 JobId 列表

返回

描述作业的 API 响应

返回类型

dict

get_waiter(waiterName)[源代码]

获取 AWS Batch 服务等待器。

参数

waiterName (str) – 等待器的名称。该名称应与等待器模型文件中的键名称(通常是 CamelCasing)的名称(包括大小写)匹配。

返回

命名 AWS Batch 服务的等待器对象

返回类型

botocore.waiter.Waiter

注意

AWS Batch 可能没有任何等待器(直到 botocore PR-1307 发布)。

import boto3

boto3.client("batch").waiter_names == []
submit_job(jobName, jobQueue, jobDefinition, arrayProperties, parameters, containerOverrides, ecsPropertiesOverride, eksPropertiesOverride, tags)[源代码]

提交一个 Batch 作业。

参数
  • jobName (str) – AWS Batch 作业的名称

  • jobQueue (str) – AWS Batch 上的队列名称

  • jobDefinition (str) – AWS Batch 上的作业定义名称

  • arrayProperties (dict) – boto3 将接收的相同参数

  • parameters (dict) – boto3 将接收的相同参数

  • containerOverrides (dict) – boto3 将接收的相同参数

  • ecsPropertiesOverride (dict) – boto3 将接收的相同参数

  • eksPropertiesOverride (dict) – boto3 将接收的相同参数

  • tags (dict) – boto3 将接收的相同参数

返回

API 响应

返回类型

dict

terminate_job(jobId, reason)[源代码]

终止一个 Batch 作业。

参数
  • jobId (str) – 要终止的作业 ID

  • reason (str) – 终止作业 ID 的原因

返回

API 响应

返回类型

dict

class airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook(*args, max_retries=None, status_retries=None, **kwargs)[源代码]

基类:airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 AWS Batch 交互。

提供对 boto3.client("batch") 的厚封装。

参数
  • max_retries (int | None) – 指数退避重试次数,4200 = 48 小时;仅当等待器为 None 时才使用轮询

  • status_retries (int | None) – 获取作业状态的 HTTP 重试次数,10;仅当等待器为 None 时才使用轮询

注意

一些方法使用默认的随机延迟来检查或轮询作业状态,例如 random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX)。当许多并发任务请求作业描述时,使用随机间隔有助于避免 AWS API 限制。

要修改用于检查 Batch 作业状态时使用的随机延迟范围的全局默认值,请修改这些默认值,例如:.. code-block

BatchClient.DEFAULT_DELAY_MIN = 0
BatchClient.DEFAULT_DELAY_MAX = 5

当使用显式延迟值时,会将 1 秒的随机抖动应用于延迟(例如,0 秒的延迟将是 random.uniform(0, 1) 延迟)。通常建议将随机抖动添加到 API 请求中。为此提供了一个方便的方法,例如,要获得 10 秒 +/- 5 秒的随机延迟:delay = BatchClient.add_jitter(10, width=5, minima=0)

可以指定其他参数(例如 aws_conn_id),并将这些参数传递给底层的 AwsBaseHook。

property client: BatchProtocol | botocore.client.BaseClient[源代码]

用于 Batch 服务的 AWS API 客户端。

返回

用于 .region_name 的 boto3 “batch” 客户端

返回类型

BatchProtocol | botocore.client.BaseClient

MAX_RETRIES = 4200[source]
STATUS_RETRIES = 10[source]
DEFAULT_DELAY_MIN = 1[source]
DEFAULT_DELAY_MAX = 10[source]
FAILURE_STATE = 'FAILED'[source]
SUCCESS_STATE = 'SUCCEEDED'[source]
RUNNING_STATE = 'RUNNING'[source]
INTERMEDIATE_STATES = ('SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING')[source]
COMPUTE_ENVIRONMENT_TERMINAL_STATUS = ('VALID', 'DELETED')[source]
COMPUTE_ENVIRONMENT_INTERMEDIATE_STATUS = ('CREATING', 'UPDATING', 'DELETING')[source]
JOB_QUEUE_TERMINAL_STATUS = ('VALID', 'DELETED')[source]
JOB_QUEUE_INTERMEDIATE_STATUS = ('CREATING', 'UPDATING', 'DELETING')[source]
terminate_job(job_id, reason)[source]

终止一个 Batch 作业。

参数
  • job_id (str) – 要终止的作业 ID

  • reason (str) – 终止作业 ID 的原因

返回

API 响应

返回类型

dict

check_job_success(job_id)[source]

检查 Batch 作业的最终状态。

如果作业“SUCCEEDED”,则返回 True,否则引发 AirflowException。

参数

job_id (str) – Batch 作业 ID

Raises

AirflowException

wait_for_job(job_id, delay=None, get_batch_log_fetcher=None)[source]

等待 Batch 作业完成。

参数
  • job_id (str) – Batch 作业 ID

  • delay (int | float | None) – 轮询作业状态前的延迟

:param get_batch_log_fetcher : 返回 batch_log_fetcher 的方法

Raises

AirflowException

poll_for_job_running(job_id, delay=None)[source]

轮询作业运行状态。

指示作业正在运行或已完成的状态为:“RUNNING”|“SUCCEEDED”|“FAILED”。

因此,此方法将等待的状态转换包括:“SUBMITTED”>“PENDING”>“RUNNABLE”>“STARTING”>“RUNNING”|“SUCCEEDED”|“FAILED”

包括已完成状态选项,以应对状态变化太快而轮询无法检测到从“STARTING”快速移动到“RUNNING”再到完成(通常是失败)的“RUNNING”状态的情况。

参数
  • job_id (str) – Batch 作业 ID

  • delay (int | float | None) – 轮询作业状态前的延迟

Raises

AirflowException

poll_for_job_complete(job_id, delay=None)[source]

轮询作业完成状态。

指示作业完成的状态为:“SUCCEEDED”|“FAILED”。

因此,此方法将等待的状态转换包括:“SUBMITTED”>“PENDING”>“RUNNABLE”>“STARTING”>“RUNNING”>“SUCCEEDED”|“FAILED”

参数
  • job_id (str) – Batch 作业 ID

  • delay (int | float | None) – 轮询作业状态前的延迟

Raises

AirflowException

poll_job_status(job_id, match_status)[source]

使用指数退避策略(具有 max_retries)轮询作业状态。

参数
  • job_id (str) – Batch 作业 ID

  • match_status (list[str]) – 要匹配的作业状态列表;Batch 作业状态包括:“SUBMITTED”|“PENDING”|“RUNNABLE”|“STARTING”|“RUNNING”|“SUCCEEDED”|“FAILED”

Raises

AirflowException

get_job_description(job_id)[source]

获取作业描述(使用 status_retries)。

参数

job_id (str) – Batch 作业 ID

返回

describe jobs 的 API 响应

Raises

AirflowException

返回类型

dict

static parse_job_description(job_id, response)[source]

解析作业描述以提取 job_id 的描述。

参数
  • job_id (str) – Batch 作业 ID

  • response (dict) – describe jobs 的 API 响应

返回

describe job_id 的 API 响应

Raises

AirflowException

返回类型

dict

get_job_awslogs_info(job_id)[source]
get_job_all_awslogs_info(job_id)[source]

解析作业描述以提取 AWS CloudWatch 信息。

参数

job_id (str) – AWS Batch 作业 ID

static add_jitter(delay, width=1, minima=0)[source]

使用延迟 +/- 宽度来产生随机抖动。

在状态轮询中添加抖动可以帮助避免在 Airflow 任务中高并发监控 Batch 作业时触发 AWS Batch API 限制。

参数
  • delay (int | float) – 暂停的秒数;假设延迟为正数。

  • width (int | float) – 用于随机抖动的延迟 +/- 宽度;假设宽度为正数。

  • minima (int | float) – 允许的最小延迟;假设最小延迟为非负数。

返回

均匀分布 (delay - width, delay + width) 的抖动,且为非负数。

返回类型

浮点数

static delay(delay=None)[源代码]

暂停执行 delay 秒。

参数

delay (int | float | None) – 使用 time.sleep(delay) 暂停执行的延迟时间;延迟会应用 1 秒的随机抖动。

注意

此方法使用默认的随机延迟,即 random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX);使用随机间隔有助于在许多并发任务请求作业描述时避免 AWS API 限制。

static exponential_delay(tries)[源代码]

应用带有随机抖动的指数退避延迟。

最大间隔为 10 分钟(随机抖动在 3 到 10 分钟之间)。这在 poll_for_job_status() 方法中使用。

行为示例

def exp(tries):
    max_interval = 600.0  # 10 minutes in seconds
    delay = 1 + pow(tries * 0.6, 2)
    delay = min(max_interval, delay)
    print(delay / 3, delay)


for tries in range(10):
    exp(tries)

#  0.33  1.0
#  0.45  1.35
#  0.81  2.44
#  1.41  4.23
#  2.25  6.76
#  3.33 10.00
#  4.65 13.95
#  6.21 18.64
#  8.01 24.04
# 10.05 30.15
参数

tries (int) – 尝试次数

此条目是否有帮助?