airflow.providers.amazon.aws.sensors.s3

S3KeySensor

等待一个或多个 Key(S3 上的文件状实例)存在于 S3 存储桶中。

S3KeysUnchangedSensor

如果在 inactivity_period 过去后匹配前缀的对象数量没有增加,则返回 True。

模块内容

airflow.providers.amazon.aws.sensors.s3.S3KeySensor(*, bucket_key, bucket_name=None, wildcard_match=False, check_fn=None, verify=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), use_regex=False, metadata_keys=None, **kwargs)[源代码]

基类: airflow.providers.amazon.aws.sensors.base_aws.AwsBaseSensor[airflow.providers.amazon.aws.hooks.s3.S3Hook]

等待一个或多个 Key(S3 上的文件状实例)存在于 S3 存储桶中。

路径只是给定 S3 路径资源的键/值指针。注意:S3 不直接支持文件夹,只提供键/值对。

另请参阅

有关如何使用此传感器的更多信息,请参阅指南:等待 Amazon S3 键

参数:
  • bucket_key (str | list[str]) – 等待的 Key(s)。支持完整的 s3:// 风格 URL 或根级别的相对路径。当指定为完整的 s3:// URL 时,请将 bucket_name 留空 None

  • bucket_name (str | None) – S3 存储桶的名称。仅当 bucket_key 未提供为完整的 s3:// URL 时需要。指定时,传递给 bucket_key 的所有键都引用此存储桶

  • wildcard_match (bool) – bucket_key 是否应被解释为 Unix 通配符模式

  • check_fn (Callable[Ellipsis, bool] | None) –

    接收 S3 对象列表以及上下文值的函数,并返回一个布尔值: - True: 满足条件 - False: 不满足条件 示例:等待任何 S3 对象大小大于 1 兆字节

    def check_fn(files: List, **kwargs) -> bool:
        return any(f.get('Size', 0) > 1048576 for f in files)
    

  • deferrable (bool) – 以可延期模式运行操作符

  • use_regex (bool) – 是否使用正则表达式检查存储桶

  • metadata_keys (list[str] | None) – 要收集并发送到 check_fn 的 head_object 属性列表。可接受的值:s3.head_object 返回的任何顶级属性。指定 * 以返回所有可用属性。默认值:“Size”。如果请求的属性未找到,则仍包含该键,值为 None。

  • aws_conn_id – 用于 AWS 凭据的 Airflow 连接。如果此项为 None 或为空,则使用默认的 boto3 行为。如果以分布式方式运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个 Worker 节点上维护)。

  • region_name – AWS region_name。如果未指定,则使用默认的 boto3 行为。

  • verify (str | bool | None) – 是否验证 SSL 证书。参见: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

template_fields: collections.abc.Sequence[str][源代码]
aws_hook_class[源代码]
bucket_name = None[源代码]
bucket_key[源代码]
wildcard_match = False[源代码]
check_fn = None[源代码]
verify = None[源代码]
deferrable = True[源代码]
use_regex = False[源代码]
metadata_keys = ['Size'][源代码]
poke(context)[源代码]

派生此类时重写。

execute(context)[源代码]

Airflow 在 Worker 上运行此方法并使用触发器进行延期。

execute_complete(context, event)[源代码]

当触发器触发时执行 - 立即返回。

依赖触发器抛出异常,否则假定执行成功。

airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor(*, bucket_name, prefix, verify=None, inactivity_period=60 * 60, min_objects=1, previous_objects=None, allow_delete=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源代码]

基类: airflow.providers.amazon.aws.sensors.base_aws.AwsBaseSensor[airflow.providers.amazon.aws.hooks.s3.S3Hook]

如果在 inactivity_period 过去后匹配前缀的对象数量没有增加,则返回 True。

注意,此传感器在 reschedule 模式下无法正常工作,因为 S3 存储桶中列出的对象状态将在 reschedule 调用之间丢失。

另请参阅

有关如何使用此传感器的更多信息,请参阅指南:等待 Amazon S3 前缀更改

参数:
  • bucket_name (str) – S3 存储桶的名称

  • prefix (str) – 正在等待的前缀。从存储桶根级别开始的相对路径。https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • inactivity_period (float) – 用于指定键未更改的总不活动秒数。注意,此机制不是实时的,此操作符可能在此周期过去且没有检测到附加对象之前的 poke_interval 后才会返回。

  • min_objects (int) – 键未更改传感器被视为有效所需的最小对象数。

  • previous_objects (set[str] | None) – 上次 poke 期间找到的对象 ID 集合。

  • allow_delete (bool) – 此传感器是否应将两次 poke 之间删除的对象视为有效行为。如果为 True,发生这种情况时将记录警告消息。如果为 False,将引发错误。

  • deferrable (bool) – 以可延期模式运行传感器

  • aws_conn_id – 用于 AWS 凭据的 Airflow 连接。如果此项为 None 或为空,则使用默认的 boto3 行为。如果以分布式方式运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个 Worker 节点上维护)。

  • region_name – AWS region_name。如果未指定,则使用默认的 boto3 行为。

  • verify (bool | str | None) – 是否验证 SSL 证书。参见: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

template_fields: collections.abc.Sequence[str][源代码]
aws_hook_class[源代码]
bucket_name[源代码]
prefix[源代码]
inactivity_period = 3600[源代码]
min_objects = 1[源代码]
previous_objects[源代码]
inactivity_seconds = 0[源代码]
allow_delete = True[源代码]
deferrable = True[源代码]
verify = None[源代码]
last_activity_time: datetime.datetime | None = None[源代码]
is_keys_unchanged(current_objects)[源代码]

检查 inactivity_period 后是否有新对象,并相应地更新传感器状态。

参数:

current_objects (set[str]) – 上次 poke 期间存储桶中的对象 ID 集合。

poke(context)[源代码]

派生此类时重写。

execute(context)[源代码]

如果 deferrable 为 True,Airflow 在 Worker 上运行此方法并使用触发器进行延期。

execute_complete(context, event=None)[源代码]

当触发器触发时执行 - 立即返回。

依赖触发器抛出异常,否则假定执行成功。

此条目有帮助吗?