airflow.providers.amazon.aws.hooks.s3¶
使用 boto3 库与 AWS S3 交互。
属性¶
类¶
与 Amazon Simple Storage Service (S3) 交互。 |
函数¶
|
如果函数未传入存储桶名称,则从连接中获取存储桶名称。 |
在未传入存储桶名称但至少传入了一个 key 的情况下,统一存储桶名称和 key。 |
模块内容¶
- airflow.providers.amazon.aws.hooks.s3.unify_bucket_name_and_key(func)[source]¶
在未传入存储桶名称但至少传入了一个 key 的情况下,统一存储桶名称和 key。
- class airflow.providers.amazon.aws.hooks.s3.S3Hook(aws_conn_id=AwsBaseHook.default_conn_name, transfer_config_args=None, extra_args=None, *args, **kwargs)[source]¶
基类:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
与 Amazon Simple Storage Service (S3) 交互。
为
boto3.client("s3")
和boto3.resource("s3")
提供厚封装。- 参数:
另请参阅
有关允许的上传额外参数,请参阅
boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS
。有关允许的下载额外参数,请参阅
boto3.s3.transfer.S3Transfer.ALLOWED_DOWNLOAD_ARGS
。
可以指定附加参数(例如
aws_conn_id
),这些参数将传递给底层的 AwsBaseHook。- static parse_s3_url(s3url)[source]¶
将 S3 URL 解析为存储桶名称和 key。
有关有效的 URL 格式,请参阅 https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html。
- static get_s3_bucket_key(bucket, key, bucket_param_name, key_param_name)[source]¶
获取 S3 存储桶名称和 key。
来源可以是: - 存储桶名称和 key。检查 key 是相对路径后返回信息。 - key。必须是完整的 s3:// URL。
- get_bucket(bucket_name=None)[source]¶
返回一个
S3.Bucket
对象。- 参数:
bucket_name (str | None) – 存储桶名称
- 返回值:
与存储桶名称对应的存储桶对象。
- 返回类型:
mypy_boto3_s3.service_resource.Bucket
- list_prefixes(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]¶
列出存储桶中在指定前缀下的前缀。
- async list_prefixes_async(client, bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]¶
列出存储桶中在指定前缀下的前缀。
- async get_file_metadata_async(client, bucket_name, key=None)[source]¶
异步获取与通配符表达式匹配的 key 存在于存储桶中的文件列表。
- async check_key_async(client, bucket, bucket_keys, wildcard_match, use_regex=False)[source]¶
获取与通配符表达式匹配的 key 存在的文件列表或获取 head 对象。
如果 wildcard_match 为 True,则异步获取与通配符表达式匹配的 key 存在于存储桶中的文件列表并返回布尔值。如果 wildcard_match 为 False,则从存储桶获取 head 对象并返回布尔值。
- async get_files_async(client, bucket, bucket_keys, wildcard_match, delimiter='/')[source]¶
获取存储桶中的文件列表。
- async is_keys_unchanged_async(client, bucket_name, prefix, inactivity_period=60 * 60, min_objects=1, previous_objects=None, inactivity_seconds=0, allow_delete=True, last_activity_time=None)[source]¶
检查是否上传了新对象以及指定周期是否已过;据此更新 sensor 状态。
- 参数:
client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端
bucket_name (str) – 存储桶名称
prefix (str) – key 前缀
inactivity_period (float) – 指定 key 未更改的总不活动秒数。注意,此机制不是实时的,此 operator 可能直到此周期过去且没有检测到其他对象后经过一个 poke_interval 才会返回。
min_objects (int) – 对于 key 未更改 sensor,视为有效所需的最小对象数。
previous_objects (set[str] | None) – 在上次 poke 期间找到的对象 ID 集合。
inactivity_seconds (int) – 不活动秒数
allow_delete (bool) – 此 sensor 是否应将两次 poke 之间删除对象视为有效行为。如果为 true,发生此情况时将记录警告消息。如果为 false,则将引发错误。
last_activity_time (datetime.datetime | None) – 最后活动时间。
- list_keys(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None, start_after_key=None, from_datetime=None, to_datetime=None, object_filter=None, apply_wildcard=False)[source]¶
列出存储桶中在指定前缀下且不包含分隔符的 key。
- 参数:
bucket_name (str | None) – 存储桶名称
prefix (str | None) – key 前缀
delimiter (str | None) – 用于标记 key 层级结构的分隔符。
page_size (int | None) – 分页大小
max_items (int | None) – 最大返回条目数
start_after_key (str | None) – 仅返回大于此 key 的 key
from_datetime (datetime.datetime | None) – 仅返回 LastModified 属性大于或等于此 from_datetime 的 key
to_datetime (datetime.datetime | None) – 仅返回 LastModified 属性小于此 to_datetime 的 key
object_filter (Callable[Ellipsis, list] | None) – 接收 S3 对象列表、from_datetime 和 to_datetime 并返回匹配 key 列表的函数。
apply_wildcard (bool) – 是否将前缀中的 ‘*’ 视为通配符或普通符号。
- 示例:返回 LastModified 属性大于 from_datetime 的 S3 对象列表
且小于 to_datetime
def object_filter( keys: list, from_datetime: datetime | None = None, to_datetime: datetime | None = None, ) -> list: def _is_in_period(input_date: datetime) -> bool: if from_datetime is not None and input_date < from_datetime: return False if to_datetime is not None and input_date > to_datetime: return False return True return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
- 返回值:
匹配的 key 列表
- 返回类型:
- get_file_metadata(prefix, bucket_name=None, page_size=None, max_items=None)[source]¶
列出存储桶中在指定前缀下的元数据对象。
- select_key(key, bucket_name=None, expression=None, expression_type=None, input_serialization=None, output_serialization=None)[source]¶
使用 S3 Select 读取 key。
- 参数:
- 返回值:
由 S3 Select 检索到的原始数据子集
- 返回类型:
- get_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[源码]¶
返回与通配符表达式匹配的 boto3.s3.Object 对象。
- load_file(filename, key, bucket_name=None, replace=False, encrypt=False, gzip=False, acl_policy=None)[源码]¶
将本地文件加载到 S3。
- 参数:
filename (pathlib.Path | str) – 要加载的文件的路径。
key (str) – 指向文件的 S3 key
bucket_name (str | None) – 存储文件的存储桶名称
replace (bool) – 一个标志,用于决定是否覆盖已存在的键。如果 replace 为 False 且键已存在,将引发错误。
encrypt (bool) – 如果为 True,文件将在 S3 服务器端加密,并以加密形式存储在 S3 中。
gzip (bool) – 如果为 True,文件将在本地压缩。
acl_policy (str | None) – 指定上传到 S3 存储桶的文件的预设 ACL 策略的字符串。
- load_string(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding=None, acl_policy=None, compression=None)[源码]¶
将字符串加载到 S3。
提供此功能是为了方便将字符串放入 S3。它使用 boto 基础架构将文件发送到 S3。
- 参数:
string_data (str) – 要设置为键内容的字符串。
key (str) – 指向文件的 S3 key
bucket_name (str | None) – 存储文件的存储桶名称
replace (bool) – 一个标志,用于决定是否覆盖已存在的键
encrypt (bool) – 如果为 True,文件将在 S3 服务器端加密,并以加密形式存储在 S3 中。
encoding (str | None) – 字符串到字节的编码方式
acl_policy (str | None) – 指定要上传的对象的预设 ACL 策略的字符串
compression (str | None) – 要使用的压缩类型,当前仅支持 gzip。
- load_bytes(bytes_data, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[源码]¶
将字节数据加载到 S3。
提供此功能是为了方便将字节数据放入 S3。它使用 boto 基础架构将文件发送到 S3。
- load_file_obj(file_obj, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[源码]¶
将文件对象加载到 S3。
- copy_object(source_bucket_key, dest_bucket_key, source_bucket_name=None, dest_bucket_name=None, source_version_id=None, acl_policy=None, meta_data_directive=None, **kwargs)[源码]¶
创建已存储在 S3 中的对象的副本。
注意:此处使用的 S3 连接需要同时具有源存储桶/键和目标存储桶/键的访问权限。
- 参数:
source_bucket_key (str) –
源对象的键。
它可以是完整的 s3:// 风格 URL,也可以是相对于根级别的路径。
当指定为完整的 s3:// URL 时,请省略 source_bucket_name。
dest_bucket_key (str) –
要复制到的对象的键。
指定 dest_bucket_key 的约定与 source_bucket_key 相同。
source_bucket_name (str | None) –
源对象所在的 S3 存储桶的名称。
当 source_bucket_key 作为完整的 s3:// URL 提供时,应省略此参数。
dest_bucket_name (str | None) –
对象被复制到的 S3 存储桶的名称。
当 dest_bucket_key 作为完整的 s3:// URL 提供时,应省略此参数。
source_version_id (str | None) – 源对象的版本 ID(可选)
acl_policy (str | None) – 指定要复制的对象的预设 ACL 策略的字符串,默认为 private。
meta_data_directive (str | None) – 是从源对象 COPY 元数据,还是使用请求中提供的元数据来 REPLACE 元数据。
- delete_bucket(bucket_name, force_delete=False, max_retries=5)[源码]¶
要删除 S3 存储桶,需要先删除存储桶中的所有对象,然后才能删除存储桶。
- download_file(key, bucket_name=None, local_path=None, preserve_file_name=False, use_autogenerated_subdir=True)[源码]¶
将文件从 S3 位置下载到本地文件系统。
- 注意
此函数屏蔽了 S3 API 的 ‘download_file’ 方法,但它们并不相同。如果想使用 S3 API 的原始方法,请使用 ‘S3Hook.get_conn().download_file()’。
- 参数:
key (str) – S3 中的键路径。
bucket_name (str | None) – 要使用的特定存储桶。
local_path (str | None) – 下载文件的本地路径。如果未提供路径,则将使用系统的临时目录。
preserve_file_name (bool) – 如果希望下载的文件名与 S3 中的文件名相同,请将此参数设置为 True。设置为 False 时,将生成随机文件名。默认值:False。
use_autogenerated_subdir (bool) – 与 ‘preserve_file_name = True’ 搭配使用,将文件下载到 ‘local_path’ 内随机生成的文件夹中,有助于避免多个可能下载同名文件的任务之间的冲突。如果不需要此功能并希望获得可预测的路径,请将其设置为 ‘False’。默认值:True。
- 返回值:
文件名。
- 返回类型:
- generate_presigned_url(client_method, params=None, expires_in=3600, http_method=None)[源码]¶
根据客户端、其方法和参数生成预签名 URL。