airflow.providers.amazon.aws.hooks.s3

使用 boto3 库与 AWS S3 交互。

属性

logger

NO_ACL

S3Hook

与 Amazon Simple Storage Service (S3) 交互。

函数

provide_bucket_name(func)

如果函数未传入存储桶名称,则从连接中获取存储桶名称。

unify_bucket_name_and_key(func)

在未传入存储桶名称但至少传入了一个 key 的情况下,统一存储桶名称和 key。

模块内容

airflow.providers.amazon.aws.hooks.s3.logger[source]
airflow.providers.amazon.aws.hooks.s3.NO_ACL = 'no-acl'[source]
airflow.providers.amazon.aws.hooks.s3.provide_bucket_name(func)[source]

如果函数未传入存储桶名称,则从连接中获取存储桶名称。

airflow.providers.amazon.aws.hooks.s3.unify_bucket_name_and_key(func)[source]

在未传入存储桶名称但至少传入了一个 key 的情况下,统一存储桶名称和 key。

class airflow.providers.amazon.aws.hooks.s3.S3Hook(aws_conn_id=AwsBaseHook.default_conn_name, transfer_config_args=None, extra_args=None, *args, **kwargs)[source]

基类: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 Amazon Simple Storage Service (S3) 交互。

boto3.client("s3")boto3.resource("s3") 提供厚封装。

参数:
  • transfer_config_args (dict | None) – 用于托管 S3 传输的配置对象。

  • extra_args (dict | None) – 可传递给下载/上传操作的额外参数。

另请参阅

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#s3-transfers

  • 有关允许的上传额外参数,请参阅 boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS

  • 有关允许的下载额外参数,请参阅 boto3.s3.transfer.S3Transfer.ALLOWED_DOWNLOAD_ARGS

可以指定附加参数(例如 aws_conn_id),这些参数将传递给底层的 AwsBaseHook。

transfer_config[source]
property resource[source]
property extra_args[source]

返回 hook 的额外参数(不可变)。

static parse_s3_url(s3url)[source]

将 S3 URL 解析为存储桶名称和 key。

有关有效的 URL 格式,请参阅 https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html

参数:

s3url (str) – 要解析的 S3 URL。

返回值:

解析后的存储桶名称和 key

返回类型:

tuple[str, str]

static get_s3_bucket_key(bucket, key, bucket_param_name, key_param_name)[source]

获取 S3 存储桶名称和 key。

来源可以是: - 存储桶名称和 key。检查 key 是相对路径后返回信息。 - key。必须是完整的 s3:// URL。

参数:
  • bucket (str | None) – S3 存储桶名称

  • key (str) – S3 key

  • bucket_param_name (str) – 包含存储桶名称的参数名称

  • key_param_name (str) – 包含 key 名称的参数名称

返回值:

解析后的存储桶名称和 key

返回类型:

tuple[str, str]

check_for_bucket(bucket_name=None)[source]

检查 bucket_name 是否存在。

参数:

bucket_name (str | None) – 存储桶名称

返回值:

如果存在则为 True,否则为 False。

返回类型:

bool

get_bucket(bucket_name=None)[source]

返回一个 S3.Bucket 对象。

参数:

bucket_name (str | None) – 存储桶名称

返回值:

与存储桶名称对应的存储桶对象。

返回类型:

mypy_boto3_s3.service_resource.Bucket

create_bucket(bucket_name=None, region_name=None)[source]

创建一个 Amazon S3 存储桶。

参数:
  • bucket_name (str | None) – 存储桶名称

  • region_name (str | None) – 创建存储桶所在的 AWS 区域名称。

check_for_prefix(prefix, delimiter, bucket_name=None)[source]

检查存储桶中是否存在某个前缀。

参数:
  • bucket_name (str | None) – 存储桶名称

  • prefix (str) – key 前缀

  • delimiter (str) – 用于标记 key 层级结构的分隔符。

返回值:

如果前缀在存储桶中不存在则为 False,如果存在则为 True。

返回类型:

bool

list_prefixes(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]

列出存储桶中在指定前缀下的前缀。

参数:
  • bucket_name (str | None) – 存储桶名称

  • prefix (str | None) – key 前缀

  • delimiter (str | None) – 用于标记 key 层级结构的分隔符。

  • page_size (int | None) – 分页大小

  • max_items (int | None) – 最大返回条目数

返回值:

匹配的前缀列表

返回类型:

list

async get_head_object_async(client, key, bucket_name=None)[source]

检索对象的元数据。

参数:
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端

  • bucket_name (str | None) – 存储文件的存储桶名称

  • key (str) – 指向文件的 S3 key

async list_prefixes_async(client, bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]

列出存储桶中在指定前缀下的前缀。

参数:
  • client (aiobotocore.client.AioBaseClient) – ClientCreatorContext

  • bucket_name (str | None) – 存储桶名称

  • prefix (str | None) – key 前缀

  • delimiter (str | None) – 用于标记 key 层级结构的分隔符。

  • page_size (int | None) – 分页大小

  • max_items (int | None) – 最大返回条目数

返回值:

匹配的前缀列表

返回类型:

list[Any]

async get_file_metadata_async(client, bucket_name, key=None)[source]

异步获取与通配符表达式匹配的 key 存在于存储桶中的文件列表。

参数:
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端

  • bucket_name (str) – 存储桶名称

  • key (str | None) – key 的路径

async check_key_async(client, bucket, bucket_keys, wildcard_match, use_regex=False)[source]

获取与通配符表达式匹配的 key 存在的文件列表或获取 head 对象。

如果 wildcard_match 为 True,则异步获取与通配符表达式匹配的 key 存在于存储桶中的文件列表并返回布尔值。如果 wildcard_match 为 False,则从存储桶获取 head 对象并返回布尔值。

参数:
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端

  • bucket (str) – 存储桶名称

  • bucket_keys (str | list[str]) – 指向文件的 S3 key

  • wildcard_match (bool) – key 的路径

  • use_regex (bool) – 是否使用正则表达式检查存储桶

async check_for_prefix_async(client, prefix, delimiter, bucket_name=None)[source]

检查存储桶中是否存在某个前缀。

参数:
  • bucket_name (str | None) – 存储桶名称

  • prefix (str) – key 前缀

  • delimiter (str) – 用于标记 key 层级结构的分隔符。

返回值:

如果前缀在存储桶中不存在则为 False,如果存在则为 True。

返回类型:

bool

async get_files_async(client, bucket, bucket_keys, wildcard_match, delimiter='/')[source]

获取存储桶中的文件列表。

async is_keys_unchanged_async(client, bucket_name, prefix, inactivity_period=60 * 60, min_objects=1, previous_objects=None, inactivity_seconds=0, allow_delete=True, last_activity_time=None)[source]

检查是否上传了新对象以及指定周期是否已过;据此更新 sensor 状态。

参数:
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端

  • bucket_name (str) – 存储桶名称

  • prefix (str) – key 前缀

  • inactivity_period (float) – 指定 key 未更改的总不活动秒数。注意,此机制不是实时的,此 operator 可能直到此周期过去且没有检测到其他对象后经过一个 poke_interval 才会返回。

  • min_objects (int) – 对于 key 未更改 sensor,视为有效所需的最小对象数。

  • previous_objects (set[str] | None) – 在上次 poke 期间找到的对象 ID 集合。

  • inactivity_seconds (int) – 不活动秒数

  • allow_delete (bool) – 此 sensor 是否应将两次 poke 之间删除对象视为有效行为。如果为 true,发生此情况时将记录警告消息。如果为 false,则将引发错误。

  • last_activity_time (datetime.datetime | None) – 最后活动时间。

list_keys(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None, start_after_key=None, from_datetime=None, to_datetime=None, object_filter=None, apply_wildcard=False)[source]

列出存储桶中在指定前缀下且不包含分隔符的 key。

参数:
  • bucket_name (str | None) – 存储桶名称

  • prefix (str | None) – key 前缀

  • delimiter (str | None) – 用于标记 key 层级结构的分隔符。

  • page_size (int | None) – 分页大小

  • max_items (int | None) – 最大返回条目数

  • start_after_key (str | None) – 仅返回大于此 key 的 key

  • from_datetime (datetime.datetime | None) – 仅返回 LastModified 属性大于或等于此 from_datetime 的 key

  • to_datetime (datetime.datetime | None) – 仅返回 LastModified 属性小于此 to_datetime 的 key

  • object_filter (Callable[Ellipsis, list] | None) – 接收 S3 对象列表、from_datetime 和 to_datetime 并返回匹配 key 列表的函数。

  • apply_wildcard (bool) – 是否将前缀中的 ‘*’ 视为通配符或普通符号。

示例:返回 LastModified 属性大于 from_datetime 的 S3 对象列表

且小于 to_datetime

def object_filter(
    keys: list,
    from_datetime: datetime | None = None,
    to_datetime: datetime | None = None,
) -> list:
    def _is_in_period(input_date: datetime) -> bool:
        if from_datetime is not None and input_date < from_datetime:
            return False

        if to_datetime is not None and input_date > to_datetime:
            return False
        return True

    return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
返回值:

匹配的 key 列表

返回类型:

list

get_file_metadata(prefix, bucket_name=None, page_size=None, max_items=None)[source]

列出存储桶中在指定前缀下的元数据对象。

参数:
  • prefix (str) – key 前缀

  • bucket_name (str | None) – 存储桶名称

  • page_size (int | None) – 分页大小

  • max_items (int | None) – 最大返回条目数

返回值:

对象元数据列表

返回类型:

list

head_object(key, bucket_name=None)[source]

检索对象的元数据。

参数:
  • key (str) – 指向文件的 S3 key

  • bucket_name (str | None) – 存储文件的存储桶名称

返回值:

对象的元数据

返回类型:

dict | None

check_for_key(key, bucket_name=None)[source]

检查存储桶中是否存在某个 key。

参数:
  • key (str) – 指向文件的 S3 key

  • bucket_name (str | None) – 存储文件的存储桶名称

返回值:

如果 key 存在则为 True,否则为 False。

返回类型:

bool

get_key(key, bucket_name=None)[source]

返回一个 S3.Object

参数:
  • key (str) – key 的路径

  • bucket_name (str | None) – 存储桶名称

返回值:

来自存储桶的 key 对象

返回类型:

mypy_boto3_s3.service_resource.Object

read_key(key, bucket_name=None)[source]

从 S3 读取 key。

另请参阅

参数:
  • key (str) – 指向文件的 S3 key

  • bucket_name (str | None) – 存储文件的存储桶名称

返回值:

key 的内容

返回类型:

str

select_key(key, bucket_name=None, expression=None, expression_type=None, input_serialization=None, output_serialization=None)[source]

使用 S3 Select 读取 key。

参数:
  • key (str) – 指向文件的 S3 key

  • bucket_name (str | None) – 存储文件的存储桶名称

  • expression (str | None) – S3 Select 表达式

  • expression_type (str | None) – S3 Select 表达式类型

  • input_serialization (dict[str, Any] | None) – S3 Select 输入数据序列化格式

  • output_serialization (dict[str, Any] | None) – S3 Select 输出数据序列化格式

返回值:

由 S3 Select 检索到的原始数据子集

返回类型:

str

check_for_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[源码]

检查存储桶中是否存在与通配符表达式匹配的键。

参数:
  • wildcard_key (str) – 键的路径

  • bucket_name (str | None) – 存储桶名称

  • delimiter (str) – 定界符标记键的层级结构

返回值:

如果键存在,则为 True;否则为 False。

返回类型:

bool

get_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[源码]

返回与通配符表达式匹配的 boto3.s3.Object 对象。

参数:
  • wildcard_key (str) – 键的路径

  • bucket_name (str | None) – 存储桶名称

  • delimiter (str) – 定界符标记键的层级结构

返回值:

存储桶中的键对象,如果未找到则为 None。

返回类型:

mypy_boto3_s3.service_resource.Object | None

load_file(filename, key, bucket_name=None, replace=False, encrypt=False, gzip=False, acl_policy=None)[源码]

将本地文件加载到 S3。

参数:
  • filename (pathlib.Path | str) – 要加载的文件的路径。

  • key (str) – 指向文件的 S3 key

  • bucket_name (str | None) – 存储文件的存储桶名称

  • replace (bool) – 一个标志,用于决定是否覆盖已存在的键。如果 replace 为 False 且键已存在,将引发错误。

  • encrypt (bool) – 如果为 True,文件将在 S3 服务器端加密,并以加密形式存储在 S3 中。

  • gzip (bool) – 如果为 True,文件将在本地压缩。

  • acl_policy (str | None) – 指定上传到 S3 存储桶的文件的预设 ACL 策略的字符串。

load_string(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding=None, acl_policy=None, compression=None)[源码]

将字符串加载到 S3。

提供此功能是为了方便将字符串放入 S3。它使用 boto 基础架构将文件发送到 S3。

参数:
  • string_data (str) – 要设置为键内容的字符串。

  • key (str) – 指向文件的 S3 key

  • bucket_name (str | None) – 存储文件的存储桶名称

  • replace (bool) – 一个标志,用于决定是否覆盖已存在的键

  • encrypt (bool) – 如果为 True,文件将在 S3 服务器端加密,并以加密形式存储在 S3 中。

  • encoding (str | None) – 字符串到字节的编码方式

  • acl_policy (str | None) – 指定要上传的对象的预设 ACL 策略的字符串

  • compression (str | None) – 要使用的压缩类型,当前仅支持 gzip。

load_bytes(bytes_data, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[源码]

将字节数据加载到 S3。

提供此功能是为了方便将字节数据放入 S3。它使用 boto 基础架构将文件发送到 S3。

参数:
  • bytes_data (bytes) – 要设置为键内容的字节数据。

  • key (str) – 指向文件的 S3 key

  • bucket_name (str | None) – 存储文件的存储桶名称

  • replace (bool) – 一个标志,用于决定是否覆盖已存在的键

  • encrypt (bool) – 如果为 True,文件将在 S3 服务器端加密,并以加密形式存储在 S3 中。

  • acl_policy (str | None) – 指定要上传的对象的预设 ACL 策略的字符串

load_file_obj(file_obj, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[源码]

将文件对象加载到 S3。

参数:
  • file_obj (io.BytesIO) – 要设置为 S3 键内容的类文件对象。

  • key (str) – 指向文件的 S3 key

  • bucket_name (str | None) – 存储文件的存储桶名称

  • replace (bool) – 一个标志,指示是否覆盖已存在的键。

  • encrypt (bool) – 如果为 True,S3 会在服务器上加密文件,文件将以加密形式存储在 S3 中。

  • acl_policy (str | None) – 指定要上传的对象的预设 ACL 策略的字符串

copy_object(source_bucket_key, dest_bucket_key, source_bucket_name=None, dest_bucket_name=None, source_version_id=None, acl_policy=None, meta_data_directive=None, **kwargs)[源码]

创建已存储在 S3 中的对象的副本。

注意:此处使用的 S3 连接需要同时具有源存储桶/键和目标存储桶/键的访问权限。

参数:
  • source_bucket_key (str) –

    源对象的键。

    它可以是完整的 s3:// 风格 URL,也可以是相对于根级别的路径。

    当指定为完整的 s3:// URL 时,请省略 source_bucket_name。

  • dest_bucket_key (str) –

    要复制到的对象的键。

    指定 dest_bucket_key 的约定与 source_bucket_key 相同。

  • source_bucket_name (str | None) –

    源对象所在的 S3 存储桶的名称。

    source_bucket_key 作为完整的 s3:// URL 提供时,应省略此参数。

  • dest_bucket_name (str | None) –

    对象被复制到的 S3 存储桶的名称。

    dest_bucket_key 作为完整的 s3:// URL 提供时,应省略此参数。

  • source_version_id (str | None) – 源对象的版本 ID(可选)

  • acl_policy (str | None) – 指定要复制的对象的预设 ACL 策略的字符串,默认为 private。

  • meta_data_directive (str | None) – 是从源对象 COPY 元数据,还是使用请求中提供的元数据来 REPLACE 元数据。

delete_bucket(bucket_name, force_delete=False, max_retries=5)[源码]

要删除 S3 存储桶,需要先删除存储桶中的所有对象,然后才能删除存储桶。

参数:
  • bucket_name (str) – 存储桶名称

  • force_delete (bool) – 启用此选项以即使存储桶不为空也删除它。

  • max_retries (int) – 要删除存储桶,它必须是空的。如果 force_delete 为 true,则重试可能有助于防止在删除存储桶中的对象和尝试删除存储桶之间发生竞态条件。

返回值:

None

返回类型:

None

delete_objects(bucket, keys)[源码]

从存储桶中删除键。

参数:
  • bucket (str) – 要从中删除对象(s)的存储桶名称

  • keys (str | list) –

    要从 S3 存储桶中删除的键。

    keys 是字符串时,它应为要删除的单个对象的键名。

    keys 是列表时,它应为要删除的键列表。

download_file(key, bucket_name=None, local_path=None, preserve_file_name=False, use_autogenerated_subdir=True)[源码]

将文件从 S3 位置下载到本地文件系统。

注意

此函数屏蔽了 S3 API 的 ‘download_file’ 方法,但它们并不相同。如果想使用 S3 API 的原始方法,请使用 ‘S3Hook.get_conn().download_file()’。

参数:
  • key (str) – S3 中的键路径。

  • bucket_name (str | None) – 要使用的特定存储桶。

  • local_path (str | None) – 下载文件的本地路径。如果未提供路径,则将使用系统的临时目录。

  • preserve_file_name (bool) – 如果希望下载的文件名与 S3 中的文件名相同,请将此参数设置为 True。设置为 False 时,将生成随机文件名。默认值:False。

  • use_autogenerated_subdir (bool) – 与 ‘preserve_file_name = True’ 搭配使用,将文件下载到 ‘local_path’ 内随机生成的文件夹中,有助于避免多个可能下载同名文件的任务之间的冲突。如果不需要此功能并希望获得可预测的路径,请将其设置为 ‘False’。默认值:True。

返回值:

文件名。

返回类型:

str

generate_presigned_url(client_method, params=None, expires_in=3600, http_method=None)[源码]

根据客户端、其方法和参数生成预签名 URL。

参数:
  • client_method (str) – 要预签名的客户端方法。

  • params (dict | None) – 通常传递给 ClientMethod 的参数。

  • expires_in (int) – 预签名 URL 的有效秒数。默认情况下,它在一小时(3600 秒)后过期。

  • http_method (str | None) – 要在生成的 URL 上使用的 http 方法。默认情况下,http 方法是该方法模型中使用的任何方法。

返回值:

预签名 URL。

返回类型:

str | None

get_bucket_tagging(bucket_name=None)[源码]

从存储桶获取标签列表。

参数:

bucket_name (str | None) – 存储桶的名称。

返回值:

一个包含标签键值对的列表

返回类型:

list[dict[str, str]] | None

put_bucket_tagging(tag_set=None, key=None, value=None, bucket_name=None)[源码]

用提供的标签覆盖现有 TagSet;必须提供 TagSet、键值对或两者。

参数:
  • tag_set (dict[str, str] | list[dict[str, str]] | None) – 一个包含标签键值对的字典,或一个已按 API 格式化的列表。

  • key (str | None) – 新 TagSet 条目的键。

  • value (str | None) – 新 TagSet 条目的值。

  • bucket_name (str | None) – 存储桶的名称。

返回值:

None

返回类型:

None

delete_bucket_tagging(bucket_name=None)[源码]

删除存储桶中的所有标签。

参数:

bucket_name (str | None) – 存储桶的名称。

返回值:

None

返回类型:

None

此条目是否有用?