airflow.providers.amazon.aws.operators.batch

AWS Batch 服务。

BatchOperator

在 AWS Batch 上执行作业。

BatchCreateComputeEnvironmentOperator

创建 AWS Batch 计算环境。

模块内容

class airflow.providers.amazon.aws.operators.batch.BatchOperator(*, job_name, job_definition, job_queue, container_overrides=None, array_properties=None, ecs_properties_override=None, eks_properties_override=None, node_overrides=None, share_identifier=None, scheduling_priority_override=None, parameters=None, retry_strategy=None, job_id=None, waiters=None, max_retries=4200, status_retries=None, tags=None, wait_for_completion=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=30, awslogs_enabled=False, awslogs_fetch_interval=timedelta(seconds=30), submit_job_timeout=None, **kwargs)[source]

基类: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook]

在 AWS Batch 上执行作业。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南:提交新的 AWS Batch 作业

参数:
  • job_name (str) – 将在 AWS Batch 上运行的作业名称(模板化)

  • job_definition (str) – AWS Batch 上的作业定义名称

  • job_queue (str) – AWS Batch 上的队列名称

  • container_overrides (dict | None) – boto3 的 containerOverrides 参数(模板化)

  • ecs_properties_override (dict | None) – boto3 的 ecsPropertiesOverride 参数(模板化)

  • eks_properties_override (dict | None) – boto3 的 eksPropertiesOverride 参数(模板化)

  • node_overrides (dict | None) – boto3 的 nodeOverrides 参数(模板化)

  • share_identifier (str | None) – 作业的共享标识符。如果作业队列没有调度策略,请勿指定此参数。

  • scheduling_priority_override (int | None) – 作业的调度优先级。调度优先级较高的作业将在调度优先级较低的作业之前调度。这将覆盖作业定义中的任何调度优先级。

  • array_properties (dict | None) – boto3 的 arrayProperties 参数

  • parameters (dict | None) – boto3 的 parameters 参数(模板化)

  • job_id (str | None) – 作业 ID,通常在 submit_job 操作获取 AWS Batch 定义的 jobId 之前未知(None)

  • waiters (Any | None) – 一个 BatchWaiters 对象(参见下面的说明);如果为 None,则使用 max_retries 和 status_retries 进行轮询。

  • max_retries (int) – 指数回退重试,4200 = 48 小时;只有在 waiters 为 None 时才使用轮询。

  • status_retries (int | None) – 获取作业状态的 HTTP 重试次数,10 次;只有在 waiters 为 None 时才使用轮询。

  • aws_conn_id – 用于 AWS 凭证的 Airflow 连接。如果此参数为 None 或为空,则使用默认的 boto3 行为。如果以分布式方式运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护此配置)。

  • region_name – AWS region_name。如果未指定,则使用默认的 boto3 行为。

  • verify – 是否验证 SSL 证书。参见:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • tags (dict | None) – 应用于 AWS Batch 作业提交的标签集合;如果为 None,则不提交任何标签。

  • deferrable (bool) – 以可延迟模式运行操作符。

  • awslogs_enabled (bool) – 指定是否打印来自 CloudWatch 的日志,默认为 False。如果是数组作业,将只打印第一个任务的日志。

  • awslogs_fetch_interval (datetime.timedelta) – 获取 CloudWatch 日志的间隔,30 秒。

  • poll_interval (int) – (仅限可延迟模式)两次轮询之间等待的时间间隔(秒)。

  • submit_job_timeout (int | None) – 提交的批处理作业的执行超时时间(秒)。

注意

任何自定义等待器必须为这些调用返回一个等待器:.. code-block:: python

waiter = waiters.get_waiter(“JobExists”) waiter = waiters.get_waiter(“JobRunning”) waiter = waiters.get_waiter(“JobComplete”)

aws_hook_class[source]
ui_color = '#c3dae0'[source]
arn: str | None = None[source]
template_fields: collections.abc.Sequence[str][source]
template_fields_renderers[source]
job_id = None[source]
job_name[source]
job_definition[source]
job_queue[source]
container_overrides = None[source]
ecs_properties_override = None[source]
eks_properties_override = None[source]
node_overrides = None[source]
share_identifier = None[source]
scheduling_priority_override = None[source]
array_properties = None[source]
parameters[source]
retry_strategy = None[source]
waiters = None[source]
tags[source]
wait_for_completion = True[source]
deferrable = True[source]
poll_interval = 30[source]
awslogs_enabled = False[source]
awslogs_fetch_interval[source]
submit_job_timeout = None[source]
max_retries = 4200[source]
status_retries = None[source]
execute(context)[source]

提交并监控 AWS Batch 作业。

引发异常:

AirflowException

execute_complete(context, event=None)[source]
on_kill()[source]

当任务实例被杀死时,覆盖此方法以清理子进程。

在操作符中使用 threading、subprocess 或 multiprocessing 模块的任何地方都需要清理,否则会留下僵尸进程。

submit_job(context)[source]

提交 AWS Batch 作业。

引发异常:

AirflowException

monitor_job(context)[source]

监控 AWS Batch 作业。

如果使用 execution_timeout 创建任务,这可能会引发异常或 AirflowTaskTimeout。

class airflow.providers.amazon.aws.operators.batch.BatchCreateComputeEnvironmentOperator(compute_environment_name, environment_type, state, compute_resources, unmanaged_v_cpus=None, service_role=None, tags=None, poll_interval=30, max_retries=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook]

创建 AWS Batch 计算环境。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南:创建 AWS Batch 计算环境

参数:
  • compute_environment_name (str) – AWS Batch 计算环境的名称(模板化)。

  • environment_type (str) – 计算环境的类型。

  • state (str) – 计算环境的状态。

  • compute_resources (dict) – 关于计算环境管理的资源的详细信息(模板化)。更多详情:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.create_compute_environment

  • unmanaged_v_cpus (int | None) – 非托管计算环境的最大 vCPU 数量。仅当 type 参数设置为 UNMANAGED 时支持此参数。

  • service_role (str | None) – 允许 Batch 代表您调用其他 AWS 服务的 IAM 角色(模板化)。

  • tags (dict | None) – 应用于计算环境的标签,帮助您分类和组织资源。

  • poll_interval (int) – 两次轮询环境状态之间等待的时间间隔(秒)。仅在 deferrable 为 True 时有用。

  • max_retries (int | None) – 轮询环境状态的次数。仅在 deferrable 为 True 时有用。

  • aws_conn_id – 用于 AWS 凭证的 Airflow 连接。如果此参数为 None 或为空,则使用默认的 boto3 行为。如果以分布式方式运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护此配置)。

  • region_name – AWS region_name。如果未指定,则使用默认的 boto3 行为。

  • verify – 是否验证 SSL 证书。参见:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • deferrable (bool) – 如果为 True,操作符将异步等待环境创建完成。此模式需要安装 aiobotocore 模块。(默认为 False)

aws_hook_class[source]
template_fields: collections.abc.Sequence[str][source]
template_fields_renderers[source]
compute_environment_name[source]
environment_type[source]
state[source]
unmanaged_v_cpus = None[source]
compute_resources[source]
service_role = None[source]
tags[source]
poll_interval = 30[source]
max_retries = 120[source]
deferrable = True[source]
execute(context)[source]

创建一个 AWS Batch 计算环境。

execute_complete(context, event=None)[source]

此条目有帮助吗?