airflow.providers.amazon.aws.operators.batch

AWS Batch 服务。

BatchOperator

在 AWS Batch 上执行作业。

BatchCreateComputeEnvironmentOperator

创建 AWS Batch 计算环境。

模块内容

class airflow.providers.amazon.aws.operators.batch.BatchOperator(*, job_name, job_definition, job_queue, container_overrides=None, array_properties=None, ecs_properties_override=None, eks_properties_override=None, node_overrides=None, share_identifier=None, scheduling_priority_override=None, parameters=None, retry_strategy=None, job_id=None, waiters=None, max_retries=4200, status_retries=None, tags=None, wait_for_completion=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=30, awslogs_enabled=False, awslogs_fetch_interval=timedelta(seconds=30), submit_job_timeout=None, **kwargs)[source]

基类: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook]

在 AWS Batch 上执行作业。

另请参阅

如需了解如何使用此算子,请参阅指南:提交新的 AWS Batch 作业

参数:
  • job_name (str) – 将在 AWS Batch 上运行的作业名称(支持模板化)

  • job_definition (str) – AWS Batch 上的作业定义名称

  • job_queue (str) – AWS Batch 上的队列名称

  • container_overrides (dict | None) – boto3 的 containerOverrides 参数(支持模板化)

  • ecs_properties_override (dict | None) – boto3 的 ecsPropertiesOverride 参数(支持模板化)

  • eks_properties_override (dict | None) – boto3 的 eksPropertiesOverride 参数(支持模板化)

  • node_overrides (dict | None) – boto3 的 nodeOverrides 参数(支持模板化)

  • share_identifier (str | None) – 作业的共享标识符。如果作业队列没有调度策略,则不要指定此参数。

  • scheduling_priority_override (int | None) – 作业的调度优先级。优先级更高的作业会在优先级更低的作业之前调度。此参数会覆盖作业定义中的任何调度优先级。

  • array_properties (dict | None) – boto3 的 arrayProperties 参数

  • parameters (dict | None) – boto3 的 parameters(支持模板化)

  • job_id (str | None) – 作业 ID,通常在 submit_job 操作获取到 AWS Batch 定义的 jobId 之前为 None

  • waiters (Any | None) – BatchWaiters 对象(见下文说明);若为 None,则使用 max_retries 和 status_retries 进行轮询。

  • max_retries (int) – 指数回退重试次数,4200 ≈ 48 小时;仅当 waiters 为 None 时使用轮询。

  • status_retries (int | None) – 获取作业状态的 HTTP 重试次数,默认 10;仅当 waiters 为 None 时使用轮询。

  • aws_conn_id – Airflow 用于 AWS 凭证的连接 ID。如果该值为 None 或为空,则使用默认的 boto3 行为。若在分布式方式运行 Airflow 且 aws_conn_idNone 或空,则会使用默认的 boto3 配置(并且需在每个工作节点上维护该配置)。

  • region_name – AWS 区域名称(region_name)。如果未指定,则使用默认的 boto3 行为。

  • verify – 是否验证 SSL 证书。参见: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • tags (dict | None) – 提交 AWS Batch 作业时要应用的标签集合;若为 None,则不提交标签。

  • deferrable (bool) – 以可延迟模式运行操作符。

  • awslogs_enabled (bool) – 是否打印 CloudWatch 日志,默认 False。若为数组作业,仅打印第一个任务的日志。

  • awslogs_fetch_interval (datetime.timedelta) – 拉取 CloudWatch 日志的时间间隔,默认 30 秒。

  • poll_interval (int) – (仅在可延迟模式下)两次轮询之间的等待时间(秒)。

  • submit_job_timeout (int | None) – 提交的 Batch 作业的执行超时时间(秒)。

注意

任何自定义 waiters 必须返回下面这些调用对应的 waiter: .. code-block:: python

waiter = waiters.get_waiter(“JobExists”) waiter = waiters.get_waiter(“JobRunning”) waiter = waiters.get_waiter(“JobComplete”)

aws_hook_class[source]
ui_color = '#c3dae0'[source]
arn: str | None = None[source]
template_fields: collections.abc.Sequence[str][source]
template_fields_renderers[source]
job_id = None[source]
job_name[source]
job_definition[source]
job_queue[source]
container_overrides = None[source]
ecs_properties_override = None[source]
eks_properties_override = None[source]
node_overrides = None[source]
share_identifier = None[source]
scheduling_priority_override = None[source]
array_properties = None[source]
parameters[source]
retry_strategy = None[source]
waiters = None[source]
tags[source]
wait_for_completion = True[source]
deferrable[source]
poll_interval = 30[source]
awslogs_enabled = False[source]
awslogs_fetch_interval[source]
submit_job_timeout = None[source]
max_retries = 4200[source]
status_retries = None[source]
execute(context)[source]

提交并监控一个 AWS Batch 作业。

抛出:

AirflowException

execute_complete(context, event=None)[source]
on_kill()[source]

当任务实例被终止时,重写此方法以清理子进程。

在算子内部使用 threading、subprocess 或 multiprocessing 模块时,需要进行清理,否则会留下僵尸进程。

submit_job(context)[source]

提交一个 AWS Batch 作业。

抛出:

AirflowException

monitor_job(context)[source]

监控一个 AWS Batch 作业。

如果任务使用了 execution_timeout,此方法可能会抛出异常或 AirflowTaskTimeout。

class airflow.providers.amazon.aws.operators.batch.BatchCreateComputeEnvironmentOperator(compute_environment_name, environment_type, state, compute_resources, unmanaged_v_cpus=None, service_role=None, tags=None, poll_interval=30, max_retries=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook]

创建 AWS Batch 计算环境。

另请参阅

如需了解如何使用此算子,请参阅指南:创建 AWS Batch 计算环境

参数:
  • compute_environment_name (str) – AWS Batch 计算环境的名称(支持模板化)。

  • environment_type (str) – 计算环境的类型。

  • state (str) – 计算环境的状态。

  • compute_resources (dict) – 计算环境管理的资源详细信息(支持模板化)。更多细节请参见:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.create_compute_environment

  • unmanaged_v_cpus (int | None) – 对于非托管计算环境的最大 vCPU 数。仅当 type 参数设置为 UNMANAGED 时支持此参数。

  • service_role (str | None) – 允许 Batch 代表您调用其他 AWS 服务的 IAM 角色(支持模板化)。

  • tags (dict | None) – 您为计算环境应用的标签,用于帮助对资源进行分类和组织。

  • poll_interval (int) – 环境状态两次轮询之间的等待时间(秒)。仅在 deferrable 为 True 时有意义。

  • max_retries (int | None) – 轮询环境状态的最大次数。仅在 deferrable 为 True 时有意义。

  • aws_conn_id – Airflow 用于 AWS 凭证的连接 ID。如果该值为 None 或为空,则使用默认的 boto3 行为。若在分布式方式运行 Airflow 且 aws_conn_idNone 或空,则会使用默认的 boto3 配置(并且需在每个工作节点上维护该配置)。

  • region_name – AWS 区域名称(region_name)。如果未指定,则使用默认的 boto3 行为。

  • verify – 是否验证 SSL 证书。参见: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • deferrable (bool) – 若为 True,算子将异步等待环境创建完成。该模式需要安装 aiobotocore 模块。(默认:False)

aws_hook_class[source]
template_fields: collections.abc.Sequence[str][source]
template_fields_renderers[source]
compute_environment_name[source]
environment_type[source]
state[source]
unmanaged_v_cpus = None[source]
compute_resources[source]
service_role = None[source]
tags[source]
poll_interval = 30[source]
max_retries = 120[source]
deferrable[source]
execute(context)[source]

创建一个 AWS Batch 计算环境。

execute_complete(context, event=None)[source]

此条目是否有帮助?