airflow.providers.google.cloud.hooks.gcs

此模块包含一个 Google Cloud Storage 钩子。

属性

RT

T

FParams

List

DEFAULT_TIMEOUT

PROVIDE_BUCKET

GCSHook

使用 Google Cloud 连接与 Google Cloud Storage 进行交互。

GCSAsyncHook

GCSAsyncHook 运行在触发器 worker 上,继承自 GoogleBaseAsyncHook。

函数

gcs_object_is_directory(bucket)

如果给定的 Google Cloud Storage URL (gs://<bucket>/<blob>) 是一个目录或空 bucket,则返回 True。

parse_json_from_gcs(gcp_conn_id, file_uri[, ...])

从 Google Cloud Storage 下载并解析 json 文件。

模块内容

airflow.providers.google.cloud.hooks.gcs.RT[source]
airflow.providers.google.cloud.hooks.gcs.T[source]
airflow.providers.google.cloud.hooks.gcs.FParams[source]
airflow.providers.google.cloud.hooks.gcs.List[source]
airflow.providers.google.cloud.hooks.gcs.DEFAULT_TIMEOUT = 60[source]
airflow.providers.google.cloud.hooks.gcs.PROVIDE_BUCKET: str = None[source]
class airflow.providers.google.cloud.hooks.gcs.GCSHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

继承自: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

使用 Google Cloud 连接与 Google Cloud Storage 进行交互。

get_conn()[source]

返回一个 Google Cloud Storage 服务对象。

copy(source_bucket, source_object, destination_bucket=None, destination_object=None)[source]

将一个对象从一个 bucket 复制到另一个 bucket,如果请求则重命名。

可以省略 destination_bucket 或 destination_object,在这种情况下使用 source bucket/object,但不能两者都省略。

参数:
  • source_bucket (str) – 要复制的对象的源 bucket。

  • source_object (str) – 要复制的对象。

  • destination_bucket (str | None) – 要复制到的对象的目的地。可以省略;如果省略,则使用相同的 bucket。

  • destination_object (str | None) – 如果给出,则为对象的(重命名后的)路径。可以省略;如果省略,则使用相同的名称。

rewrite(source_bucket, source_object, destination_bucket, destination_object=None)[source]

类似于 copy;支持超过 5 TB 的文件,以及在位置和/或存储类别之间复制。

可以省略 destination_object,在这种情况下使用 source_object。

参数:
  • source_bucket (str) – 要复制的对象的源 bucket。

  • source_object (str) – 要复制的对象。

  • destination_bucket (str) – 要复制到的对象的目的地。

  • destination_object (str | None) – 如果给出,则为对象的(重命名后的)路径。可以省略;如果省略,则使用相同的名称。

download(bucket_name: str, object_name: str, filename: None = None, chunk_size: int | None = None, timeout: int | None = DEFAULT_TIMEOUT, num_max_attempts: int | None =1, user_project: str | None =None) bytes[source]
download(bucket_name: str, object_name: str, filename: str, chunk_size: int | None = None, timeout: int | None =DEFAULT_TIMEOUT, num_max_attempts: int | None =1, user_project: str | None =None) str

从 Google Cloud Storage 下载文件。

如果未提供 filename,operator 将文件加载到内存并返回其内容。如果提供了 filename,它将文件写入指定位置并返回该位置。对于超出可用内存的文件大小,建议写入文件。

参数:
  • bucket_name – 要从中获取的 bucket。

  • object_name – 要获取的对象。

  • filename – 如果设置,则为文件应该写入的本地文件路径。

  • chunk_size – Blob 块大小。

  • timeout – 请求超时(秒)。

  • num_max_attempts – 下载文件的尝试次数。

  • user_project – 要向其收取请求费用的 Google Cloud 项目标识符。Requester Pays buckets 需要此项。

download_as_byte_array(bucket_name, object_name, chunk_size=None, timeout=DEFAULT_TIMEOUT, num_max_attempts=1)[source]

从 Google Cloud Storage 下载文件。

如果未提供 filename,operator 将文件加载到内存并返回其内容。如果提供了 filename,它将文件写入指定位置并返回该位置。对于超出可用内存的文件大小,建议写入文件。

参数:
  • bucket_name (str) – 要从中获取的 bucket。

  • object_name (str) – 要获取的对象。

  • chunk_size (int | None) – Blob 块大小。

  • timeout (int | None) – 请求超时(秒)。

  • num_max_attempts (int | None) – 下载文件的尝试次数。

provide_file(bucket_name=PROVIDE_BUCKET, object_name=None, object_url=None, dir=None, user_project=None)[source]

将文件下载到临时目录并返回一个文件句柄。

您可以通过传递 bucket_name 和 object_name 参数或仅传递 object_url 参数来使用此方法。

参数:
  • bucket_name (str) – 要从中获取的 bucket。

  • object_name (str | None) – 要获取的对象。

  • object_url (str | None) – 文件引用 URL。必须以 "gs: //" 开头。

  • dir (str | None) – 下载文件的临时子目录。(传递给 NamedTemporaryFile)

  • user_project (str | None) – 要向其收取请求费用的 Google Cloud 项目标识符。Requester Pays buckets 需要此项。

返回:

文件句柄

返回类型:

collections.abc.Generator[IO[bytes], None, None]

provide_file_and_upload(bucket_name=PROVIDE_BUCKET, object_name=None, object_url=None, user_project=None)[source]

创建临时文件,返回文件句柄,并在关闭时上传文件内容。

您可以通过传递 bucket_name 和 object_name 参数或仅传递 object_url 参数来使用此方法。

参数:
  • bucket_name (str) – 要从中获取的 bucket。

  • object_name (str | None) – 要获取的对象。

  • object_url (str | None) – 文件引用 URL。必须以 "gs: //" 开头。

  • user_project (str | None) – 要向其收取请求费用的 Google Cloud 项目标识符。Requester Pays buckets 需要此项。

返回:

文件句柄

返回类型:

collections.abc.Generator[IO[bytes], None, None]

upload(bucket_name, object_name, filename=None, data=None, mime_type=None, gzip=False, encoding='utf-8', chunk_size=None, timeout=DEFAULT_TIMEOUT, num_max_attempts=1, metadata=None, cache_control=None, user_project=None)[source]

将本地文件或文件数据(字符串或字节)上传到 Google Cloud Storage。

参数:
  • bucket_name (str) – 要上传到的 bucket。

  • object_name (str) – 上传文件时要设置的对象名称。

  • filename (str | None) – 要上传文件的本地文件路径。

  • data (str | bytes | None) – 要上传的文件数据(字符串或字节)。

  • mime_type (str | None) – 上传文件时设置的文件 mime 类型。

  • gzip (bool) – 上传时压缩本地文件或文件数据的选项。

  • encoding (str) – 如果文件数据以字符串形式提供,则为字节编码。

  • chunk_size (int | None) – Blob 块大小。

  • timeout (int | None) – 请求超时(秒)。

  • num_max_attempts (int) – 尝试上传文件的次数。

  • metadata (dict | None) – 与文件一起上传的元数据。

  • cache_control (str | None) – Cache-Control 元数据字段。

  • user_project (str | None) – 要向其收取请求费用的 Google Cloud 项目标识符。Requester Pays buckets 需要此项。

exists(bucket_name, object_name, retry=DEFAULT_RETRY, user_project=None)[source]

检查 Google Cloud Storage 中是否存在文件。

参数:
  • bucket_name (str) – 对象所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中检查的 blob 名称。

  • retry (google.api_core.retry.Retry) – (可选) 如何重试 RPC。

  • user_project (str | None) – 要向其收取请求费用的 Google Cloud 项目标识符。Requester Pays buckets 需要此项。

get_blob_update_time(bucket_name, object_name)[source]

获取 Google Cloud Storage 中文件的更新时间。

参数:
  • bucket_name (str) – 对象所在的 Google Cloud Storage bucket。

  • object_name (str) – 要从 Google Cloud Storage bucket 获取更新时间的 blob 名称。

is_updated_after(bucket_name, object_name, ts)[source]

检查 Google Cloud Storage 中的 blob 是否已更新。

参数:
  • bucket_name (str) – 对象所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中检查的对象名称。

  • ts (datetime.datetime) – 要对照检查的时间戳。

is_updated_between(bucket_name, object_name, min_ts, max_ts)[source]

检查 Google Cloud Storage 中的 blob 是否已更新。

参数:
  • bucket_name (str) – 对象所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中检查的对象名称。

  • min_ts (datetime.datetime) – 要对照检查的最小时间戳。

  • max_ts (datetime.datetime) – 要对照检查的最大时间戳。

is_updated_before(bucket_name, object_name, ts)[source]

检查 Google Cloud Storage 中的 blob 是否在给定时间之前已更新。

参数:
  • bucket_name (str) – 对象所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中检查的对象名称。

  • ts (datetime.datetime) – 要对照检查的时间戳。

is_older_than(bucket_name, object_name, seconds)[source]

检查对象是否比给定时间更旧。

参数:
  • bucket_name (str) – 对象所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中检查的对象名称。

  • seconds (int) – 要对照检查的时间(秒)。

delete(bucket_name, object_name)[source]

从 bucket 中删除对象。

参数:
  • bucket_name (str) – 对象所在的 bucket 名称。

  • object_name (str) – 要删除的对象名称。

get_bucket(bucket_name)[source]

从 Google Cloud Storage 获取 bucket 对象。

参数:

bucket_name (str) – bucket 名称。

delete_bucket(bucket_name, force=False, user_project=None)[source]

从 Google Cloud Storage 删除 bucket 对象。

参数:
  • bucket_name (str) – 将要删除的 bucket 名称。

  • force (bool) – false 不允许删除非空 bucket,设置 force=True 允许删除非空 bucket。

  • user_project (str | None) – 要向其收取请求费用的 Google Cloud 项目标识符。Requester Pays buckets 需要此项。

list(bucket_name, versions=None, max_results=None, prefix=None, delimiter=None, match_glob=None, user_project=None)[source]

列出指定单个或多个前缀的存储桶中的所有对象。

参数:
  • bucket_name (str) – 存储桶名称

  • versions (bool | None) – 如果为 true,则列出对象的所有版本

  • max_results (int | None) – 单页响应中返回的最大项目数

  • prefix (str | List[str] | None) – 字符串或字符串列表,用于过滤名称以此字符串/列表中的字符串开头的对象

  • delimiter (str | None) – (已弃用) 基于分隔符(例如 ‘.csv’)过滤对象

  • match_glob (str | None) – (可选) 基于字符串给定的 glob 模式(例如 '**/*/.json')过滤对象。

  • user_project (str | None) – 要向其收取请求费用的 Google Cloud 项目标识符。Requester Pays buckets 需要此项。

返回:

与过滤条件匹配的对象名称流

list_by_timespan(bucket_name, timespan_start, timespan_end, versions=None, max_results=None, prefix=None, delimiter=None, match_glob=None)[source]

列出指定前缀且在指定时间范围内更新的存储桶中的所有对象。

参数:
  • bucket_name (str) – 存储桶名称

  • timespan_start (datetime.datetime) – 将返回在此日期时间(UTC)或之后更新的对象

  • timespan_end (datetime.datetime) – 将返回在此日期时间(UTC)之前更新的对象

  • versions (bool | None) – 如果为 true,则列出对象的所有版本

  • max_results (int | None) – 单页响应中返回的最大项目数

  • prefix (str | None) – 过滤名称以此前缀开头的对象的字符串前缀

  • delimiter (str | None) – (已弃用) 基于分隔符(例如 ‘.csv’)过滤对象

  • match_glob (str | None) – (可选) 基于字符串给定的 glob 模式(例如 '**/*/.json')过滤对象。

返回:

与过滤条件匹配的对象名称流

返回类型:

List[str]

get_size(bucket_name, object_name)[source]

获取 Google Cloud Storage 中文件的大小。

参数:
  • bucket_name (str) – 存储 blob_name 的 Google Cloud Storage 存储桶。

  • object_name (str) – 要在 Google Cloud Storage bucket_name 中检查的对象名称。

get_crc32c(bucket_name, object_name)[source]

获取 Google Cloud Storage 中对象的 CRC32c 校验和。

参数:
  • bucket_name (str) – 存储 blob_name 的 Google Cloud Storage 存储桶。

  • object_name (str) – 要在 Google Cloud Storage bucket_name 中检查的对象名称。

get_md5hash(bucket_name, object_name)[source]

获取 Google Cloud Storage 中对象的 MD5 哈希。

参数:
  • bucket_name (str) – 存储 blob_name 的 Google Cloud Storage 存储桶。

  • object_name (str) – 要在 Google Cloud Storage bucket_name 中检查的对象名称。

get_metadata(bucket_name, object_name)[source]

获取 Google Cloud Storage 中对象的元数据。

参数:
  • bucket_name (str) – 对象所在的 Google Cloud Storage 存储桶的名称。

  • object_name (str) – 包含所需元数据的对象的名称

返回:

与对象关联的元数据

返回类型:

dict | None

create_bucket(bucket_name, resource=None, storage_class='MULTI_REGIONAL', location='US', project_id=PROVIDE_PROJECT_ID, labels=None)[source]

创建一个新存储桶。

Google Cloud Storage 使用扁平命名空间,因此无法创建名称已在使用中的存储桶。

另请参阅

有关更多信息,请参阅存储桶命名指南:https://cloud.google.com/storage/docs/bucketnaming.html#requirements

参数:
  • bucket_name (str) – 存储桶的名称。

  • resource (dict | None) – 用于创建存储桶的可选字典参数。有关可用参数的信息,请参阅 Cloud Storage API 文档:https://cloud.google.com/storage/docs/json_api/v1/buckets/insert

  • storage_class (str) –

    这定义了存储桶中对象的存储方式,并决定了 SLA 和存储成本。包括以下值:

    • MULTI_REGIONAL (多区域)

    • REGIONAL (区域)

    • STANDARD (标准)

    • NEARLINE (近线)

    • COLDLINE (冷线).

    如果在创建存储桶时未指定此值,则默认为 STANDARD。

  • location (str) –

    存储桶的位置。存储桶中对象的数据物理存储在该区域内。默认为 US。

  • project_id (str) – Google Cloud 项目的 ID。

  • labels (dict | None) – 用户提供的标签,以键/值对形式。

返回:

如果成功,返回存储桶的 id

返回类型:

str

insert_bucket_acl(bucket_name, entity, role, user_project=None)[source]

在指定的 bucket_name 上创建一个新的 ACL 条目。

参阅:https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert

参数:
  • bucket_name (str) – 存储桶名称。

  • entity (str) – 拥有权限的实体,格式如下之一:user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers。参阅:https://cloud.google.com/storage/docs/access-control/lists#scopes

  • role (str) – 实体的访问权限。可接受的值有:“OWNER”、“READER”、“WRITER”。

  • user_project (str | None) – (可选) 为此请求付费的项目。对于请求者付费存储桶是必需的。

insert_object_acl(bucket_name, object_name, entity, role, generation=None, user_project=None)[source]

在指定的对象上创建一个新的 ACL 条目。

参阅:https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert

参数:
  • bucket_name (str) – 存储桶名称。

  • object_name (str) – 对象的名称。有关如何对对象名称进行 URL 编码以确保路径安全的信息,请参阅:https://cloud.google.com/storage/docs/json_api/#encoding

  • entity (str) – 拥有权限的实体,格式如下之一:user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers 参阅:https://cloud.google.com/storage/docs/access-control/lists#scopes

  • role (str) – 实体的访问权限。可接受的值有:“OWNER”、“READER”。

  • generation (int | None) – 可选。如果存在,选择此对象的特定修订版本。

  • user_project (str | None) – (可选) 为此请求付费的项目。对于请求者付费存储桶是必需的。

compose(bucket_name, source_objects, destination_object)[source]

将现有对象列表组合到同一存储桶中的一个新对象中。

目前仅支持在单个操作中连接最多 32 个对象

https://cloud.google.com/storage/docs/json_api/v1/objects/compose

参数:
  • bucket_name (str) – 包含源对象的存储桶的名称。这也是存储组合后的目标对象的存储桶。

  • source_objects (List[str]) – 将组合成单个对象的源对象列表。

  • destination_object (str) – 如果给定,则为对象的路径。

sync(source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, allow_overwrite=False, delete_extra_files=False)[source]

同步存储桶的内容。

参数 source_objectdestination_object 描述了根同步目录。如果未传递,则同步整个存储桶。如果传递,则应指向目录。

注意

不支持单个文件的同步。只能同步整个目录。

参数:
  • source_bucket (str) – 包含源对象的存储桶名称。

  • destination_bucket (str) – 包含目标对象的存储桶名称。

  • source_object (str | None) – 源存储桶中的根同步目录。

  • destination_object (str | None) – 目标存储桶中的根同步目录。

  • recursive (bool) – 如果为 True,将考虑子目录

  • recursive – 如果为 True,将考虑子目录

  • allow_overwrite (bool) – 如果为 True,在找到不匹配文件时将覆盖文件。默认情况下,不允许覆盖文件

  • delete_extra_files (bool) –

    如果为 True,删除源中存在而目标中不存在的附加文件。默认情况下,不会删除附加文件。

    注意

    如果您指定了错误的源/目标组合,此选项可能会快速删除数据。

返回:

none

返回类型:

None

airflow.providers.google.cloud.hooks.gcs.gcs_object_is_directory(bucket)[source]

如果给定的 Google Cloud Storage URL (gs://<bucket>/<blob>) 是一个目录或空 bucket,则返回 True。

airflow.providers.google.cloud.hooks.gcs.parse_json_from_gcs(gcp_conn_id, file_uri, impersonation_chain=None)[source]

从 Google Cloud Storage 下载并解析 json 文件。

参数:
  • gcp_conn_id (str) – Airflow Google Cloud 连接 ID。

  • file_uri (str) – json 文件的完整路径,例如:gs://test-bucket/dir1/dir2/file

class airflow.providers.google.cloud.hooks.gcs.GCSAsyncHook(**kwargs)[source]

基类:airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

GCSAsyncHook 运行在触发器 worker 上,继承自 GoogleBaseAsyncHook。

sync_hook_class[source]
async get_storage_client(session)[source]

返回一个 Google Cloud Storage 服务对象。

此条目是否有帮助?