Amazon S3¶
Amazon Simple Storage Service (Amazon S3)是互联网存储。您可以使用 Amazon S3 随时随地存储和检索任意数量的数据。
先决任务¶
要使用这些操作符,您必须执行以下操作
通过pip安装 API 库。
pip install 'apache-airflow[amazon]'详细信息可在安装 Airflow™中找到
设置连接.
操作符¶
创建 Amazon S3 存储桶¶
要创建 Amazon S3 存储桶,可以使用 S3CreateBucketOperator
。
create_bucket = S3CreateBucketOperator(
task_id="create_bucket",
bucket_name=bucket_name,
)
删除 Amazon S3 存储桶¶
要删除 Amazon S3 存储桶,可以使用 S3DeleteBucketOperator
。
delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
bucket_name=bucket_name,
force_delete=True,
)
设置 Amazon S3 存储桶的标签¶
要设置 Amazon S3 存储桶的标签,可以使用 S3PutBucketTaggingOperator
。
put_tagging = S3PutBucketTaggingOperator(
task_id="put_tagging",
bucket_name=bucket_name,
key=TAG_KEY,
value=TAG_VALUE,
)
获取 Amazon S3 存储桶的标签¶
要获取与 Amazon S3 存储桶关联的标签集,可以使用 S3GetBucketTaggingOperator
。
get_tagging = S3GetBucketTaggingOperator(
task_id="get_tagging",
bucket_name=bucket_name,
)
删除 Amazon S3 存储桶的标签¶
要删除 Amazon S3 存储桶的标签,可以使用 S3DeleteBucketTaggingOperator
。
delete_tagging = S3DeleteBucketTaggingOperator(
task_id="delete_tagging",
bucket_name=bucket_name,
)
创建 Amazon S3 对象¶
要创建新的(或替换)Amazon S3 对象,可以使用 S3CreateObjectOperator
。
create_object = S3CreateObjectOperator(
task_id="create_object",
s3_bucket=bucket_name,
s3_key=key,
data=DATA,
replace=True,
)
复制 Amazon S3 对象¶
要将 Amazon S3 对象从一个存储桶复制到另一个存储桶,可以使用 S3CopyObjectOperator
。此处使用的 Amazon S3 连接需要能够访问源存储桶/密钥和目标存储桶/密钥。
copy_object = S3CopyObjectOperator(
task_id="copy_object",
source_bucket_name=bucket_name,
dest_bucket_name=bucket_name_2,
source_bucket_key=key,
dest_bucket_key=key_2,
)
删除 Amazon S3 对象¶
要删除一个或多个 Amazon S3 对象,可以使用 S3DeleteObjectsOperator
。
delete_objects = S3DeleteObjectsOperator(
task_id="delete_objects",
bucket=bucket_name_2,
keys=key_2,
)
转换 Amazon S3 对象¶
要转换来自一个 Amazon S3 对象的数据并将其保存到另一个对象,可以使用 S3FileTransformOperator
。您还可以应用一个可选的 Amazon S3 Select 表达式,以使用 select_expression
从 source_s3_key
中选择您想要检索的数据。
file_transform = S3FileTransformOperator(
task_id="file_transform",
source_s3_key=f"s3://{bucket_name}/{key}",
dest_s3_key=f"s3://{bucket_name_2}/{key_2}",
# Use `cp` command as transform script as an example
transform_script="cp",
replace=True,
)
列出 Amazon S3 前缀¶
要列出 Amazon S3 存储桶中的所有 Amazon S3 前缀,可以使用 S3ListPrefixesOperator
。请参阅 此处 了解更多有关 Amazon S3 前缀的信息。
list_prefixes = S3ListPrefixesOperator(
task_id="list_prefixes",
bucket=bucket_name,
prefix=PREFIX,
delimiter=DELIMITER,
)
列出 Amazon S3 对象¶
要列出 Amazon S3 存储桶中的所有 Amazon S3 对象,可以使用 S3ListOperator
。您可以指定 prefix
来过滤名称以该前缀开头的对象。
list_keys = S3ListOperator(
task_id="list_keys",
bucket=bucket_name,
prefix=PREFIX,
)
传感器¶
等待 Amazon S3 密钥¶
要等待一个或多个密钥出现在 Amazon S3 存储桶中,可以使用 S3KeySensor
。对于每个密钥,它会调用 head_object API(如果 wildcard_match
为 True
,则调用 list_objects_v2 API)来检查密钥是否存在。请记住,特别是当用于检查大量密钥时,它会针对每个密钥进行一次 API 调用。
检查一个文件
# Check if a file exists
sensor_one_key = S3KeySensor(
task_id="sensor_one_key",
bucket_name=bucket_name,
bucket_key=key,
)
检查多个文件
# Check if both files exist
sensor_two_keys = S3KeySensor(
task_id="sensor_two_keys",
bucket_name=bucket_name,
bucket_key=[key, key_2],
)
使用正则表达式检查文件
# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex = S3KeySensor(
task_id="sensor_key_with_regex", bucket_name=bucket_name, bucket_key=key_regex_pattern, use_regex=True
)
要使用其他自定义检查,可以定义一个函数,该函数接收一个匹配的 S3 对象属性列表并返回一个布尔值
True
:满足某个条件False
:不满足条件
此函数针对 bucket_key
中作为参数传递的每个键调用。此函数的参数为对象列表的原因是,当 wildcard_match
为 True
时,多个文件可以匹配一个键。匹配的 S3 对象属性列表仅包含大小,格式如下
[{"Size": int}]
def check_fn(files: list) -> bool:
"""
Example of custom check: check if all files are bigger than ``20 bytes``
:param files: List of S3 object attributes.
:return: true if the criteria is met
"""
return all(f.get("Size", 0) > 20 for f in files)
# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
task_id="sensor_key_with_function",
bucket_name=bucket_name,
bucket_key=key,
check_fn=check_fn,
)
您还可以通过将参数 deferrable
设置为 True 来以可延迟模式运行此操作符。这将高效利用 Airflow 工作进程,因为轮询作业状态在触发器上异步进行。请注意,这需要触发器在您的 Airflow 部署中可用。
检查一个文件
# Check if a file exists
sensor_one_key_deferrable = S3KeySensor(
task_id="sensor_one_key_deferrable",
bucket_name=bucket_name,
bucket_key=key,
deferrable=True,
)
检查多个文件
# Check if both files exist
sensor_two_keys_deferrable = S3KeySensor(
task_id="sensor_two_keys_deferrable",
bucket_name=bucket_name,
bucket_key=[key, key_2],
deferrable=True,
)
使用正则表达式检查文件
# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex_deferrable = S3KeySensor(
task_id="sensor_key_with_regex_deferrable",
bucket_name=bucket_name,
bucket_key=key_regex_pattern,
use_regex=True,
deferrable=True,
)
等待 Amazon S3 前缀更改¶
要检查 Amazon S3 存储桶中特定前缀的的对象数变化,并等待非活动期过去,且对象数没有增加,您可以使用 S3KeysUnchangedSensor
。请注意,此传感器在重新调度模式下不会正常运行,因为 Amazon S3 存储桶中列出的对象状态将在重新调度的调用之间丢失。
sensor_keys_unchanged = S3KeysUnchangedSensor(
task_id="sensor_keys_unchanged",
bucket_name=bucket_name_2,
prefix=PREFIX,
inactivity_period=10, # inactivity_period in seconds
)
您还可以通过将参数 deferrable
设置为 True 来以可延迟模式运行此传感器。这将高效利用 Airflow 工作进程,因为轮询作业状态在触发器上异步进行。请注意,这需要触发器在您的 Airflow 部署中可用。