Amazon S3

Amazon Simple Storage Service (Amazon S3)是互联网存储。您可以使用 Amazon S3 随时随地存储和检索任意数量的数据。

先决任务

要使用这些操作符,您必须执行以下操作

操作符

创建 Amazon S3 存储桶

要创建 Amazon S3 存储桶,可以使用 S3CreateBucketOperator

tests/system/providers/amazon/aws/example_s3.py[源代码]

create_bucket = S3CreateBucketOperator(
    task_id="create_bucket",
    bucket_name=bucket_name,
)

删除 Amazon S3 存储桶

要删除 Amazon S3 存储桶,可以使用 S3DeleteBucketOperator

tests/system/providers/amazon/aws/example_s3.py[源代码]

delete_bucket = S3DeleteBucketOperator(
    task_id="delete_bucket",
    bucket_name=bucket_name,
    force_delete=True,
)

设置 Amazon S3 存储桶的标签

要设置 Amazon S3 存储桶的标签,可以使用 S3PutBucketTaggingOperator

tests/system/providers/amazon/aws/example_s3.py[源代码]

put_tagging = S3PutBucketTaggingOperator(
    task_id="put_tagging",
    bucket_name=bucket_name,
    key=TAG_KEY,
    value=TAG_VALUE,
)

获取 Amazon S3 存储桶的标签

要获取与 Amazon S3 存储桶关联的标签集,可以使用 S3GetBucketTaggingOperator

tests/system/providers/amazon/aws/example_s3.py[源代码]

get_tagging = S3GetBucketTaggingOperator(
    task_id="get_tagging",
    bucket_name=bucket_name,
)

删除 Amazon S3 存储桶的标签

要删除 Amazon S3 存储桶的标签,可以使用 S3DeleteBucketTaggingOperator

tests/system/providers/amazon/aws/example_s3.py[源代码]

delete_tagging = S3DeleteBucketTaggingOperator(
    task_id="delete_tagging",
    bucket_name=bucket_name,
)

创建 Amazon S3 对象

要创建新的(或替换)Amazon S3 对象,可以使用 S3CreateObjectOperator

tests/system/providers/amazon/aws/example_s3.py[源代码]

create_object = S3CreateObjectOperator(
    task_id="create_object",
    s3_bucket=bucket_name,
    s3_key=key,
    data=DATA,
    replace=True,
)

复制 Amazon S3 对象

要将 Amazon S3 对象从一个存储桶复制到另一个存储桶,可以使用 S3CopyObjectOperator。此处使用的 Amazon S3 连接需要能够访问源存储桶/密钥和目标存储桶/密钥。

tests/system/providers/amazon/aws/example_s3.py[源代码]

copy_object = S3CopyObjectOperator(
    task_id="copy_object",
    source_bucket_name=bucket_name,
    dest_bucket_name=bucket_name_2,
    source_bucket_key=key,
    dest_bucket_key=key_2,
)

删除 Amazon S3 对象

要删除一个或多个 Amazon S3 对象,可以使用 S3DeleteObjectsOperator

tests/system/providers/amazon/aws/example_s3.py[源代码]

delete_objects = S3DeleteObjectsOperator(
    task_id="delete_objects",
    bucket=bucket_name_2,
    keys=key_2,
)

转换 Amazon S3 对象

要转换来自一个 Amazon S3 对象的数据并将其保存到另一个对象,可以使用 S3FileTransformOperator。您还可以应用一个可选的 Amazon S3 Select 表达式,以使用 select_expressionsource_s3_key 中选择您想要检索的数据。

tests/system/providers/amazon/aws/example_s3.py[源代码]

file_transform = S3FileTransformOperator(
    task_id="file_transform",
    source_s3_key=f"s3://{bucket_name}/{key}",
    dest_s3_key=f"s3://{bucket_name_2}/{key_2}",
    # Use `cp` command as transform script as an example
    transform_script="cp",
    replace=True,
)

列出 Amazon S3 前缀

要列出 Amazon S3 存储桶中的所有 Amazon S3 前缀,可以使用 S3ListPrefixesOperator。请参阅 此处 了解更多有关 Amazon S3 前缀的信息。

tests/system/providers/amazon/aws/example_s3.py[源代码]

list_prefixes = S3ListPrefixesOperator(
    task_id="list_prefixes",
    bucket=bucket_name,
    prefix=PREFIX,
    delimiter=DELIMITER,
)

列出 Amazon S3 对象

要列出 Amazon S3 存储桶中的所有 Amazon S3 对象,可以使用 S3ListOperator。您可以指定 prefix 来过滤名称以该前缀开头的对象。

tests/system/providers/amazon/aws/example_s3.py[源代码]

list_keys = S3ListOperator(
    task_id="list_keys",
    bucket=bucket_name,
    prefix=PREFIX,
)

传感器

等待 Amazon S3 密钥

要等待一个或多个密钥出现在 Amazon S3 存储桶中,可以使用 S3KeySensor。对于每个密钥,它会调用 head_object API(如果 wildcard_matchTrue,则调用 list_objects_v2 API)来检查密钥是否存在。请记住,特别是当用于检查大量密钥时,它会针对每个密钥进行一次 API 调用。

检查一个文件

tests/system/providers/amazon/aws/example_s3.py[源代码]

# Check if a file exists
sensor_one_key = S3KeySensor(
    task_id="sensor_one_key",
    bucket_name=bucket_name,
    bucket_key=key,
)

检查多个文件

tests/system/providers/amazon/aws/example_s3.py[源代码]

# Check if both files exist
sensor_two_keys = S3KeySensor(
    task_id="sensor_two_keys",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
)

使用正则表达式检查文件

tests/system/providers/amazon/aws/example_s3.py[源代码]

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex = S3KeySensor(
    task_id="sensor_key_with_regex", bucket_name=bucket_name, bucket_key=key_regex_pattern, use_regex=True
)

要使用其他自定义检查,可以定义一个函数,该函数接收一个匹配的 S3 对象属性列表并返回一个布尔值

  • True:满足某个条件

  • False:不满足条件

此函数针对 bucket_key 中作为参数传递的每个键调用。此函数的参数为对象列表的原因是,当 wildcard_matchTrue 时,多个文件可以匹配一个键。匹配的 S3 对象属性列表仅包含大小,格式如下

[{"Size": int}]

tests/system/providers/amazon/aws/example_s3.py[源代码]

def check_fn(files: list) -> bool:
    """
    Example of custom check: check if all files are bigger than ``20 bytes``

    :param files: List of S3 object attributes.
    :return: true if the criteria is met
    """
    return all(f.get("Size", 0) > 20 for f in files)

tests/system/providers/amazon/aws/example_s3.py[源代码]

# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
    task_id="sensor_key_with_function",
    bucket_name=bucket_name,
    bucket_key=key,
    check_fn=check_fn,
)

您还可以通过将参数 deferrable 设置为 True 来以可延迟模式运行此操作符。这将高效利用 Airflow 工作进程,因为轮询作业状态在触发器上异步进行。请注意,这需要触发器在您的 Airflow 部署中可用。

检查一个文件

tests/system/providers/amazon/aws/example_s3.py[源代码]

# Check if a file exists
sensor_one_key_deferrable = S3KeySensor(
    task_id="sensor_one_key_deferrable",
    bucket_name=bucket_name,
    bucket_key=key,
    deferrable=True,
)

检查多个文件

tests/system/providers/amazon/aws/example_s3.py[源代码]

# Check if both files exist
sensor_two_keys_deferrable = S3KeySensor(
    task_id="sensor_two_keys_deferrable",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
    deferrable=True,
)

使用正则表达式检查文件

tests/system/providers/amazon/aws/example_s3.py[源代码]

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex_deferrable = S3KeySensor(
    task_id="sensor_key_with_regex_deferrable",
    bucket_name=bucket_name,
    bucket_key=key_regex_pattern,
    use_regex=True,
    deferrable=True,
)

等待 Amazon S3 前缀更改

要检查 Amazon S3 存储桶中特定前缀的的对象数变化,并等待非活动期过去,且对象数没有增加,您可以使用 S3KeysUnchangedSensor。请注意,此传感器在重新调度模式下不会正常运行,因为 Amazon S3 存储桶中列出的对象状态将在重新调度的调用之间丢失。

tests/system/providers/amazon/aws/example_s3.py[源代码]

sensor_keys_unchanged = S3KeysUnchangedSensor(
    task_id="sensor_keys_unchanged",
    bucket_name=bucket_name_2,
    prefix=PREFIX,
    inactivity_period=10,  # inactivity_period in seconds
)

您还可以通过将参数 deferrable 设置为 True 来以可延迟模式运行此传感器。这将高效利用 Airflow 工作进程,因为轮询作业状态在触发器上异步进行。请注意,这需要触发器在您的 Airflow 部署中可用。

此条目有帮助吗?