airflow.providers.amazon.aws.sensors.s3

模块内容

S3KeySensor

等待一个或多个键(S3 上的类文件实例)出现在 S3 存储桶中。

S3KeysUnchangedSensor

如果 inactivity_period 过去,且与前缀匹配的对象数量没有增加,则返回 True。

class airflow.providers.amazon.aws.sensors.s3.S3KeySensor(*, bucket_key, bucket_name=None, wildcard_match=False, check_fn=None, aws_conn_id='aws_default', verify=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), use_regex=False, metadata_keys=None, **kwargs)[源代码]

基类:airflow.sensors.base.BaseSensorOperator

等待一个或多个键(S3 上的类文件实例)出现在 S3 存储桶中。

该路径只是给定 S3 路径的资源的键/值指针。注意:S3 不直接支持文件夹,仅提供键/值对。

另请参阅

有关如何使用此传感器的更多信息,请查看指南:等待 Amazon S3 键

参数
  • bucket_key (str | list[str]) – 要等待的键。支持完整的 s3:// 样式 URL 或从根级别的相对路径。当它被指定为完整的 s3:// URL 时,请将 bucket_name 保留为 None

  • bucket_name (str | None) – S3 存储桶的名称。仅当 bucket_key 未作为完整的 s3:// URL 提供时才需要。指定后,传递给 bucket_key 的所有键都指向此存储桶

  • wildcard_match (bool) – bucket_key 是否应被解释为 Unix 通配符模式

  • check_fn (Callable[Ellipsis, bool] | None) –

    接收具有上下文值的 S3 对象列表并返回布尔值的函数: - True:满足条件 - False:不满足条件 示例:等待任何 S3 对象大小超过 1 兆字节

    def check_fn(files: List, **kwargs) -> bool:
        return any(f.get('Size', 0) > 1048576 for f in files)
    

  • aws_conn_id (str | None) – 对 s3 连接的引用

  • verify (str | bool | None) –

    是否验证 S3 连接的 SSL 证书。默认情况下,SSL 证书已验证。您可以提供以下值

    • False:不验证 SSL 证书。仍将使用 SSL

      (除非 use_ssl 为 False),但不会验证 SSL 证书。

    • path/to/cert/bundle.pem:要使用的 CA 证书捆绑包的文件名。

      如果您想使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,则可以指定此参数。

  • deferrable (bool) – 在可延迟模式下运行操作符

  • use_regex (bool) – 是否使用正则表达式来检查存储桶

  • metadata_keys (list[str] | None) – 要收集并发送到 check_fn 的 head_object 属性列表。可接受的值:s3.head_object 返回的任何顶级属性。指定 * 以返回所有可用的属性。默认值:“Size”。如果未找到请求的属性,则该键仍会被包含,并且值为 None。

template_fields: collections.abc.Sequence[str] = ('bucket_key', 'bucket_name')[源代码]
poke(context)[源代码]

在派生此类时覆盖。

execute(context)[源代码]

Airflow 在 worker 上运行此方法,并使用触发器进行延迟。

execute_complete(context, event)[源代码]

当触发器触发时执行 - 立即返回。

依赖触发器抛出异常,否则它会假定执行成功。

hook()[源代码]
class airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor(*, bucket_name, prefix, aws_conn_id='aws_default', verify=None, inactivity_period=60 * 60, min_objects=1, previous_objects=None, allow_delete=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源代码]

基类:airflow.sensors.base.BaseSensorOperator

如果 inactivity_period 过去,且与前缀匹配的对象数量没有增加,则返回 True。

请注意,此传感器在重新调度模式下不会正常工作,因为在重新调度的调用之间,S3 存储桶中列出的对象的状态将会丢失。

另请参阅

有关如何使用此传感器的更多信息,请查看指南:等待 Amazon S3 前缀更改

参数
  • bucket_name (str) – S3 存储桶的名称

  • prefix (str) – 等待的前缀。从存储桶根目录级别的相对路径。

  • aws_conn_id (str | None) – 对 s3 连接的引用

  • verify (bool | str | None) –

    是否验证 S3 连接的 SSL 证书。默认情况下,会验证 SSL 证书。您可以提供以下值:

    • False:不验证 SSL 证书。仍将使用 SSL

      (除非 use_ssl 为 False),但不会验证 SSL 证书。

    • path/to/cert/bundle.pem:要使用的 CA 证书捆绑包的文件名。

      如果您想使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,则可以指定此参数。

  • inactivity_period (float) – 指定键未更改的总非活动秒数。请注意,此机制不是实时的,并且此运算符可能要等到此时间段过后一个 poke_interval 后才返回,并且没有检测到其他对象。

  • min_objects (int) – 键未更改传感器被视为有效的所需最少对象数。

  • previous_objects (set[str] | None) – 上次 poke 期间找到的对象 ID 集合。

  • allow_delete (bool) – 此传感器是否应将 poke 之间被删除的对象视为有效行为。如果为 true,则发生这种情况时将记录一条警告消息。如果为 false,则将引发错误。

  • deferrable (bool) – 在可延迟模式下运行传感器

template_fields: collections.abc.Sequence[str] = ('bucket_name', 'prefix')[源代码]
hook()[源代码]

返回 S3Hook。

is_keys_unchanged(current_objects)[源代码]

检查 inactivity_period 后是否有新对象,并相应地更新传感器状态。

参数

current_objects (set[str]) – 上次 poke 期间存储桶中的对象 ID 集合。

poke(context)[源代码]

在派生此类时覆盖。

execute(context)[源代码]

如果 deferrable 为 True,Airflow 将在 worker 上运行此方法并使用触发器进行延迟。

execute_complete(context, event=None)[源代码]

当触发器触发时执行 - 立即返回。

依赖触发器抛出异常,否则它会假定执行成功。

此条目是否有帮助?