airflow.providers.amazon.aws.hooks.batch_client

一个用于 AWS Batch 服务的客户端。

BatchProtocol

一个结构化协议,用于 boto3.client('batch') -> botocore.client.Batch

BatchClientHook

与 AWS Batch 交互。

模块内容

class airflow.providers.amazon.aws.hooks.batch_client.BatchProtocol[source]

基类: Protocol

一个结构化协议,用于 boto3.client('batch') -> botocore.client.Batch

这用于 BatchClient.client() 的类型提示;它只包含所需客户端方法的一个子集。

describe_jobs(jobs)[source]

从 AWS Batch 获取作业描述。

参数:

jobs (list[str]) – 要描述的 JobId 列表

返回:

描述作业的 API 响应

返回类型:

dict

get_waiter(waiterName)[source]

获取 AWS Batch 服务等待器。

参数:

waiterName (str) – 等待器的名称。名称应与等待器模型文件中键名称(包括大小写)匹配(通常为 CamelCasing)。

返回:

指定名称的 AWS Batch 服务的等待器对象

返回类型:

botocore.waiter.Waiter

注意

AWS Batch 可能没有任何等待器(直到 botocore PR-1307 发布)。

import boto3

boto3.client("batch").waiter_names == []
submit_job(jobName, jobQueue, jobDefinition, arrayProperties, parameters, containerOverrides, ecsPropertiesOverride, eksPropertiesOverride, tags)[source]

提交一个 Batch 作业。

参数:
  • jobName (str) – AWS Batch 作业的名称

  • jobQueue (str) – AWS Batch 上的队列名称

  • jobDefinition (str) – AWS Batch 上的作业定义名称

  • arrayProperties (dict) – boto3 将接收的相同参数

  • parameters (dict) – boto3 将接收的相同参数

  • containerOverrides (dict) – boto3 将接收的相同参数

  • ecsPropertiesOverride (dict) – boto3 将接收的相同参数

  • eksPropertiesOverride (dict) – boto3 将接收的相同参数

  • tags (dict) – boto3 将接收的相同参数

返回:

一个 API 响应

返回类型:

dict

terminate_job(jobId, reason)[source]

终止一个 Batch 作业。

参数:
  • jobId (str) – 要终止的作业 ID

  • reason (str) – 终止作业 ID 的原因

返回:

一个 API 响应

返回类型:

dict

create_compute_environment(**kwargs)[source]

创建一个 AWS Batch 计算环境。

参数:

kwargs – boto3 create_compute_environment 的参数

class airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook(*args, max_retries=None, status_retries=None, **kwargs)[source]

基类: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 AWS Batch 交互。

提供 `boto3.client("batch")` 的厚封装。

参数:
  • max_retries (int | None) – 指数退避重试,4200 = 48 小时;仅当 waiters 为 None 时使用轮询

  • status_retries (int | None) – 获取作业状态的 HTTP 重试次数,10 次;仅当 waiters 为 None 时使用轮询

注意

一些方法使用默认的随机延迟来检查或轮询作业状态,例如 random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX) 使用随机间隔有助于在许多并发任务请求作业描述时避免 AWS API 限制。

要修改在使用随机延迟检查 Batch 作业状态时允许的抖动范围的全局默认值,请修改这些默认值,例如:.. code-block

BatchClient.DEFAULT_DELAY_MIN = 0
BatchClient.DEFAULT_DELAY_MAX = 5

使用显式延迟值时,会将 1 秒的随机抖动应用于延迟(例如,延迟 0 秒将是 random.uniform(0, 1) 的延迟)。通常建议将随机抖动添加到 API 请求中。为此提供了一个方便的方法,例如,要获得 10 秒 +/- 5 秒的随机延迟:delay = BatchClient.add_jitter(10, width=5, minima=0)

可以指定其他参数(例如 aws_conn_id),这些参数将传递给底层的 AwsBaseHook。

MAX_RETRIES = 4200[source]
STATUS_RETRIES = 10[source]
DEFAULT_DELAY_MIN = 1[source]
DEFAULT_DELAY_MAX = 10[source]
FAILURE_STATE = 'FAILED'[source]
SUCCESS_STATE = 'SUCCEEDED'[source]
RUNNING_STATE = 'RUNNING'[source]
INTERMEDIATE_STATES[source]
COMPUTE_ENVIRONMENT_TERMINAL_STATUS = ('VALID', 'DELETED')[source]
COMPUTE_ENVIRONMENT_INTERMEDIATE_STATUS = ('CREATING', 'UPDATING', 'DELETING')[source]
JOB_QUEUE_TERMINAL_STATUS = ('VALID', 'DELETED')[source]
JOB_QUEUE_INTERMEDIATE_STATUS = ('CREATING', 'UPDATING', 'DELETING')[source]
max_retries = 4200[source]
status_retries = 10[source]
property client: BatchProtocol | botocore.client.BaseClient[source]

一个用于 Batch 服务的 AWS API 客户端。

返回:

针对 .region_name 的 boto3 'batch' 客户端

返回类型:

BatchProtocol | botocore.client.BaseClient

terminate_job(job_id, reason)[source]

终止一个 Batch 作业。

参数:
  • job_id (str) – 要终止的作业 ID

  • reason (str) – 终止作业 ID 的原因

返回:

一个 API 响应

返回类型:

dict

check_job_success(job_id)[source]

检查 Batch 作业的最终状态。

如果作业状态为“SUCCEEDED”,则返回 True,否则抛出 AirflowException。

参数:

job_id (str) – Batch 作业 ID

抛出:

AirflowException

wait_for_job(job_id, delay=None, get_batch_log_fetcher=None)[source]

等待 Batch 作业完成。

参数:
  • job_id (str) – Batch 作业 ID

  • delay (int | float | None) – 轮询作业状态前的延迟

:param get_batch_log_fetcher : 返回 batch_log_fetcher 的方法

抛出:

AirflowException

poll_for_job_running(job_id, delay=None)[source]

轮询作业运行状态。

指示作业正在运行或已完成的状态有:“RUNNING”|“SUCCEEDED”|“FAILED”。

因此,此方法将等待的状态转换包括:“SUBMITTED”>“PENDING”>“RUNNABLE”>“STARTING”>“RUNNING”|“SUCCEEDED”|“FAILED”。

包含已完成状态选项是为了处理状态变化过快,导致轮询无法检测到从 STARTING 快速变为 RUNNING 然后完成(通常是失败)的情况。

参数:
  • job_id (str) – Batch 作业 ID

  • delay (int | float | None) – 轮询作业状态前的延迟

抛出:

AirflowException

poll_for_job_complete(job_id, delay=None)[source]

轮询作业完成状态。

指示作业完成的状态有:“SUCCEEDED”|“FAILED”。

因此,此方法将等待的状态转换包括:“SUBMITTED”>“PENDING”>“RUNNABLE”>“STARTING”>“RUNNING”>“SUCCEEDED”|“FAILED”。

参数:
  • job_id (str) – Batch 作业 ID

  • delay (int | float | None) – 轮询作业状态前的延迟

抛出:

AirflowException

poll_job_status(job_id, match_status)[source]

使用指数退避策略(带有 max_retries)轮询作业状态。

参数:
  • job_id (str) – Batch 作业 ID

  • match_status (list[str]) – 要匹配的作业状态列表;Batch 作业状态包括:“SUBMITTED”|“PENDING”|“RUNNABLE”|“STARTING”|“RUNNING”|“SUCCEEDED”|“FAILED”。

抛出:

AirflowException

get_job_description(job_id)[source]

获取作业描述(使用 status_retries)。

参数:

job_id (str) – Batch 作业 ID

返回:

一个描述作业的 API 响应

抛出:

AirflowException

返回类型:

dict

static parse_job_description(job_id, response)[source]

解析作业描述以提取 job_id 的描述。

参数:
  • job_id (str) – Batch 作业 ID

  • response (dict) – 描述作业的 API 响应

返回:

描述 job_id 的 API 响应

抛出:

AirflowException

返回类型:

dict

get_job_awslogs_info(job_id)[source]
get_job_all_awslogs_info(job_id)[source]

解析作业描述以提取 AWS CloudWatch 信息。

参数:

job_id (str) – AWS Batch 作业 ID

static add_jitter(delay, width=1, minima=0)[source]

使用 delay +/- width 实现随机抖动。

在状态轮询中添加抖动有助于在使用 Airflow 任务高并发监控 Batch 作业时避免 AWS Batch API 限制。

参数:
  • delay (int | float) – 暂停的秒数;delay 假定为正数

  • width (int | float) – 随机抖动的 delay +/- width;width 假定为正数

  • minima (int | float) – 允许的最小延迟;minima 假定为非负数

返回:

uniform(delay - width, delay + width) 抖动,且为非负数

返回类型:

float

static delay(delay=None)[source]

暂停执行 delay 秒。

参数:

delay (int | float | None) – 使用 time.sleep(delay) 暂停执行的延迟;延迟将应用小的 1 秒抖动。

注意

此方法使用默认的随机延迟,例如 random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX);使用随机间隔有助于在许多并发任务请求作业描述时避免 AWS API 限制。

static exponential_delay(tries)[source]

应用带有随机抖动的指数退避延迟。

最大间隔为 10 分钟(随机抖动范围在 3 到 10 分钟之间)。这用于 poll_for_job_status() 方法。

行为示例

def exp(tries):
    max_interval = 600.0  # 10 minutes in seconds
    delay = 1 + pow(tries * 0.6, 2)
    delay = min(max_interval, delay)
    print(delay / 3, delay)


for tries in range(10):
    exp(tries)

#  0.33  1.0
#  0.45  1.35
#  0.81  2.44
#  1.41  4.23
#  2.25  6.76
#  3.33 10.00
#  4.65 13.95
#  6.21 18.64
#  8.01 24.04
# 10.05 30.15
参数:

tries (int) – 尝试次数

此条目是否有帮助?