Amazon S3

Amazon Simple Storage Service (Amazon S3) 是互联网存储服务。您可以使用 Amazon S3 在任何时间、从任何位置存储和检索任意数量的数据。

前置任务

要使用这些操作符,您必须完成以下几项工作:

操作符

创建 Amazon S3 存储桶

要创建 Amazon S3 存储桶,您可以使用 S3CreateBucketOperator

tests/system/amazon/aws/example_s3.py

create_bucket = S3CreateBucketOperator(
    task_id="create_bucket",
    bucket_name=bucket_name,
)

删除 Amazon S3 存储桶

要删除 Amazon S3 存储桶,您可以使用 S3DeleteBucketOperator

tests/system/amazon/aws/example_s3.py

delete_bucket = S3DeleteBucketOperator(
    task_id="delete_bucket",
    bucket_name=bucket_name,
    force_delete=True,
)

设置 Amazon S3 存储桶的标签

要设置 Amazon S3 存储桶的标签,您可以使用 S3PutBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

put_tagging = S3PutBucketTaggingOperator(
    task_id="put_tagging",
    bucket_name=bucket_name,
    key=TAG_KEY,
    value=TAG_VALUE,
)

获取 Amazon S3 存储桶的标签

要获取与 Amazon S3 存储桶关联的标签集,您可以使用 S3GetBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

get_tagging = S3GetBucketTaggingOperator(
    task_id="get_tagging",
    bucket_name=bucket_name,
)

删除 Amazon S3 存储桶的标签

要删除 Amazon S3 存储桶的标签,您可以使用 S3DeleteBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

delete_tagging = S3DeleteBucketTaggingOperator(
    task_id="delete_tagging",
    bucket_name=bucket_name,
)

创建 Amazon S3 对象

要创建新(或替换现有)Amazon S3 对象,您可以使用 S3CreateObjectOperator

tests/system/amazon/aws/example_s3.py

create_object = S3CreateObjectOperator(
    task_id="create_object",
    s3_bucket=bucket_name,
    s3_key=key,
    data=DATA,
    replace=True,
)

复制 Amazon S3 对象

要将 Amazon S3 对象从一个存储桶复制到另一个存储桶,您可以使用 S3CopyObjectOperator。这里使用的 Amazon S3 连接需要同时拥有源存储桶/键和目标存储桶/键的访问权限。

tests/system/amazon/aws/example_s3.py

copy_object = S3CopyObjectOperator(
    task_id="copy_object",
    source_bucket_name=bucket_name,
    dest_bucket_name=bucket_name_2,
    source_bucket_key=key,
    dest_bucket_key=key_2,
)

删除 Amazon S3 对象

要删除一个或多个 Amazon S3 对象,您可以使用 S3DeleteObjectsOperator

tests/system/amazon/aws/example_s3.py

delete_objects = S3DeleteObjectsOperator(
    task_id="delete_objects",
    bucket=bucket_name_2,
    keys=key_2,
)

转换 Amazon S3 对象

要转换一个 Amazon S3 对象的数据并将其保存到另一个对象,您可以使用 S3FileTransformOperator。您还可以应用可选的 Amazon S3 Select 表达式,使用 select_expressionsource_s3_key 中选择您想要检索的数据。

tests/system/amazon/aws/example_s3.py

file_transform = S3FileTransformOperator(
    task_id="file_transform",
    source_s3_key=f"s3://{bucket_name}/{key}",
    dest_s3_key=f"s3://{bucket_name_2}/{key_2}",
    # Use `cp` command as transform script as an example
    transform_script="cp",
    replace=True,
)

列出 Amazon S3 前缀

要列出 Amazon S3 存储桶中的所有 Amazon S3 前缀,您可以使用 S3ListPrefixesOperator。有关 Amazon S3 前缀的更多信息,请参阅 此处

tests/system/amazon/aws/example_s3.py

list_prefixes = S3ListPrefixesOperator(
    task_id="list_prefixes",
    bucket=bucket_name,
    prefix=PREFIX,
    delimiter=DELIMITER,
)

列出 Amazon S3 对象

要列出 Amazon S3 存储桶中的所有 Amazon S3 对象,您可以使用 S3ListOperator。您可以指定一个 prefix 来过滤名称以该前缀开头的对象。

tests/system/amazon/aws/example_s3.py

list_keys = S3ListOperator(
    task_id="list_keys",
    bucket=bucket_name,
    prefix=PREFIX,
)

传感器

等待 Amazon S3 键

要等待一个或多个键出现在 Amazon S3 存储桶中,您可以使用 S3KeySensor。对于每个键,它都会调用 head_object API(如果 wildcard_matchTrue,则调用 list_objects_v2 API)来检查它是否存在。请注意,特别是当用于检查大量键时,每个键会进行一次 API 调用。

检查单个文件

tests/system/amazon/aws/example_s3.py

# Check if a file exists
sensor_one_key = S3KeySensor(
    task_id="sensor_one_key",
    bucket_name=bucket_name,
    bucket_key=key,
)

检查多个文件

tests/system/amazon/aws/example_s3.py

# Check if both files exist
sensor_two_keys = S3KeySensor(
    task_id="sensor_two_keys",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
)

使用正则表达式检查文件

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex = S3KeySensor(
    task_id="sensor_key_with_regex", bucket_name=bucket_name, bucket_key=key_regex_pattern, use_regex=True
)

要使用额外的自定义检查,您可以定义一个函数,该函数接收匹配的 S3 对象属性列表并返回布尔值

  • True: 满足特定条件

  • False: 不满足条件

对于作为 bucket_key 参数传入的每个键,都会调用此函数。该函数参数是对象列表的原因是,当 wildcard_matchTrue 时,多个文件可能匹配一个键。匹配的 S3 对象属性列表仅包含大小,格式如下:

[{"Size": int}]

tests/system/amazon/aws/example_s3.py

def check_fn(files: list, **kwargs) -> bool:
    """
    Example of custom check: check if all files are bigger than ``20 bytes``

    :param files: List of S3 object attributes.
    :return: true if the criteria is met
    """
    return all(f.get("Size", 0) > 20 for f in files)

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
    task_id="sensor_key_with_function",
    bucket_name=bucket_name,
    bucket_key=key,
    check_fn=check_fn,
)

您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此操作符。这将提高 Airflow worker 的利用效率,因为作业状态的轮询在 triggerer 上异步进行。请注意,这需要在您的 Airflow 部署中提供 triggerer。

检查单个文件

tests/system/amazon/aws/example_s3.py

# Check if a file exists
sensor_one_key_deferrable = S3KeySensor(
    task_id="sensor_one_key_deferrable",
    bucket_name=bucket_name,
    bucket_key=key,
    deferrable=True,
)

检查多个文件

tests/system/amazon/aws/example_s3.py

# Check if both files exist
sensor_two_keys_deferrable = S3KeySensor(
    task_id="sensor_two_keys_deferrable",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
    deferrable=True,
)

使用正则表达式检查文件

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex_deferrable = S3KeySensor(
    task_id="sensor_key_with_regex_deferrable",
    bucket_name=bucket_name,
    bucket_key=key_regex_pattern,
    use_regex=True,
    deferrable=True,
)

等待 Amazon S3 前缀更改

要检查 Amazon S3 存储桶中特定前缀处对象数量的变化,并等待直到不活动期过去且对象数量没有增加为止,您可以使用 S3KeysUnchangedSensor。请注意,此传感器在重新调度模式下无法正常工作,因为 Amazon S3 存储桶中列出的对象状态将在重新调度的调用之间丢失。

tests/system/amazon/aws/example_s3.py

sensor_keys_unchanged = S3KeysUnchangedSensor(
    task_id="sensor_keys_unchanged",
    bucket_name=bucket_name_2,
    prefix=PREFIX,
    inactivity_period=10,  # inactivity_period in seconds
)

您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此传感器。这将提高 Airflow worker 的利用效率,因为作业状态的轮询在 triggerer 上异步进行。请注意,这需要在您的 Airflow 部署中提供 triggerer。

参考

此条目有帮助吗?