airflow.providers.google.cloud.operators.gcs

此模块包含一个 Google Cloud Storage 桶 operator。

GCSCreateBucketOperator

创建一个新桶。

GCSListObjectsOperator

列出桶中所有对象,可按名称前缀、分隔符或 match_glob 进行过滤。

GCSDeleteObjectsOperator

从 Google Cloud Storage 桶中删除列表中的对象或所有匹配前缀的对象。

GCSBucketCreateAclEntryOperator

在指定的桶上创建一个新的 ACL 条目。

GCSObjectCreateAclEntryOperator

在指定的对象上创建一个新的 ACL 条目。

GCSFileTransformOperator

将数据从源 GCS 位置复制到本地文件系统上的临时位置。

GCSTimeSpanFileTransformOperator

复制在指定时间范围内修改的对象,运行转换,并将结果上传到桶。

GCSDeleteBucketOperator

从 Google Cloud Storage 中删除桶。

GCSSynchronizeBucketsOperator

同步 Google Cloud Services 中桶或桶目录的内容。

模块内容

class airflow.providers.google.cloud.operators.gcs.GCSCreateBucketOperator(*, bucket_name, resource=None, storage_class='MULTI_REGIONAL', location='US', project_id=PROVIDE_PROJECT_ID, labels=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

继承自: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

创建一个新桶。

Google Cloud Storage 使用扁平命名空间,因此您不能创建一个已存在的桶名。

另请参阅

更多信息,请参阅桶命名指南:https://cloud.google.com/storage/docs/bucketnaming.html#requirements

参数
  • bucket_name (str) – 桶的名称。(templated)

  • resource (dict | None) – 用于创建桶的可选字典参数。有关可用参数的信息,请参阅 Cloud Storage API 文档:https://cloud.google.com/storage/docs/json_api/v1/buckets/insert

  • storage_class (str) –

    这定义了桶中对象的存储方式,并决定了 SLA 和存储成本。(templated) 可用值包括

    • MULTI_REGIONAL

    • REGIONAL

    • STANDARD

    • NEARLINE

    • COLDLINE.

    如果在创建桶时未指定此值,则默认为 STANDARD。

  • location (str) –

    桶的位置。(templated) 桶中对象的数据物理存储在此区域内。默认为 US。

  • project_id (str) – Google Cloud 项目的 ID。(templated)

  • labels (dict | None) – 用户提供的标签,以键/值对形式。

  • gcp_conn_id (str) – (可选) 用于连接 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选) 要使用短期凭证模拟的服务帐号,或者为了获取列表中最后一个帐号的 access_token 而必需的链式帐号列表,该 access_token 将用于请求中的模拟。如果设置为字符串,则该帐号必须授予原始帐号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一个帐号将此角色授予原始帐号。(templated)

以下 Operator 将在 EU 区域创建一个名为 test-bucket 的新桶,存储类别为 MULTI_REGIONAL

CreateBucket = GCSCreateBucketOperator(
    task_id="CreateNewBucket",
    bucket_name="test-bucket",
    storage_class="MULTI_REGIONAL",
    location="EU",
    labels={"env": "dev", "team": "airflow"},
    gcp_conn_id="airflow-conn-id",
)
template_fields: collections.abc.Sequence[str] = ('bucket_name', 'storage_class', 'location', 'project_id', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket_name[source]
resource = None[source]
storage_class = 'MULTI_REGIONAL'[source]
location = 'US'[source]
project_id = None[source]
labels = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

创建 operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

请参阅 get_template_context 以获取更多上下文。

class airflow.providers.google.cloud.operators.gcs.GCSListObjectsOperator(*, bucket, prefix=None, delimiter=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, match_glob=None, **kwargs)[source]

继承自: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

列出桶中所有对象,可按名称前缀、分隔符或 match_glob 进行过滤。

此 operator 返回一个 Python 列表,其中包含对象名称,可供下游任务中的 XCom 使用。

参数
  • bucket (str) – 要查找对象的 Google Cloud Storage 桶。(templated)

  • prefix (str | list[str] | None) – 字符串或字符串列表,用于过滤名称以此开头/开头的对象。(templated)

  • delimiter (str | None) – (已弃用) 用于过滤对象的分隔符。(templated) 例如,要在 GCS 中列出目录中的 CSV 文件,可以使用 delimiter='.csv'。

  • gcp_conn_id (str) – (可选) 用于连接 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选) 要使用短期凭证模拟的服务帐号,或者为了获取列表中最后一个帐号的 access_token 而必需的链式帐号列表,该 access_token 将用于请求中的模拟。如果设置为字符串,则该帐号必须授予原始帐号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一个帐号将此角色授予原始帐号。(templated)

  • match_glob (str | None) – (可选) 根据给定的 glob 模式字符串过滤对象 (例如,'**/*.json')

示例:

以下 Operator 将列出 data 桶中 sales/sales-2017 文件夹中的所有 Avro 文件。

GCS_Files = GCSListOperator(
    task_id="GCS_Files",
    bucket="data",
    prefix="sales/sales-2017/",
    match_glob="**/*.avro",
    gcp_conn_id=google_cloud_conn_id,
)
template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket[source]
prefix = None[source]
delimiter = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
match_glob = None[source]
execute(context)[source]

创建 operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

请参阅 get_template_context 以获取更多上下文。

class airflow.providers.google.cloud.operators.gcs.GCSDeleteObjectsOperator(*, bucket_name, objects=None, prefix=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

继承自: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

从 Google Cloud Storage 桶中删除列表中的对象或所有匹配前缀的对象。

参数
  • bucket_name (str) – 要从中删除的 GCS 桶

  • objects (list[str] | None) – 要删除的对象列表。这些应该是桶中对象的名称,不包含 gs://bucket/

  • prefix (str | list[str] | None) – 字符串或字符串列表,用于过滤名称以此开头/开头的对象。(templated)

  • gcp_conn_id (str) – (可选) 用于连接 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选) 要使用短期凭证模拟的服务帐号,或者为了获取列表中最后一个帐号的 access_token 而必需的链式帐号列表,该 access_token 将用于请求中的模拟。如果设置为字符串,则该帐号必须授予原始帐号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一个帐号将此角色授予原始帐号。(templated)

template_fields: collections.abc.Sequence[str] = ('bucket_name', 'prefix', 'objects', 'impersonation_chain')[source]
bucket_name[source]
objects = None[source]
prefix = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

创建 operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

请参阅 get_template_context 以获取更多上下文。

get_openlineage_facets_on_start()[source]
class airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator(*, bucket, entity, role, user_project=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

继承自: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的桶上创建一个新的 ACL 条目。

另请参阅

有关如何使用此 operator 的更多信息,请参阅指南:GCSBucketCreateAclEntryOperator

参数
  • bucket (str) – 桶的名称。

  • entity (str) – 持有权限的实体,以下形式之一:user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers

  • role (str) – 实体的访问权限。可接受的值为:“OWNER”、“READER”、“WRITER”。

  • user_project (str | None) – (可选) 此请求的计费项目。对于 Requester Pays 桶是必需的。

  • gcp_conn_id (str) – (可选) 用于连接 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选) 要使用短期凭证模拟的服务帐号,或者为了获取列表中最后一个帐号的 access_token 而必需的链式帐号列表,该 access_token 将用于请求中的模拟。如果设置为字符串,则该帐号必须授予原始帐号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一个帐号将此角色授予原始帐号。(templated)

template_fields: collections.abc.Sequence[str] = ('bucket', 'entity', 'role', 'user_project', 'impersonation_chain')[source]
bucket[source]
entity[source]
role[source]
user_project = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

创建 operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

请参阅 get_template_context 以获取更多上下文。

airflow.providers.google.cloud.operators.gcs.GCSObjectCreateAclEntryOperator(*, bucket, object_name, entity, role, generation=None, user_project=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

继承自: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的对象上创建一个新的 ACL 条目。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: GCSObjectCreateAclEntryOperator

参数
  • bucket (str) – 桶的名称。

  • object_name (str) – 对象的名称。有关如何对对象名称进行 URL 编码以使其路径安全的更多信息,请参阅:https://cloud.google.com/storage/docs/json_api/#encoding

  • entity (str) – 持有权限的实体,以下形式之一:user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers

  • role (str) – 实体(entity)的访问权限。可接受的值为:“OWNER”、“READER”。

  • generation (int | None) – 可选。如果存在,则选择此对象的特定版本。

  • user_project (str | None) – (可选) 此请求的计费项目。对于 Requester Pays 桶是必需的。

  • gcp_conn_id (str) – (可选) 用于连接 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选) 要使用短期凭证模拟的服务帐号,或者为了获取列表中最后一个帐号的 access_token 而必需的链式帐号列表,该 access_token 将用于请求中的模拟。如果设置为字符串,则该帐号必须授予原始帐号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一个帐号将此角色授予原始帐号。(templated)

template_fields: collections.abc.Sequence[str] = ('bucket', 'object_name', 'entity', 'generation', 'role', 'user_project', 'impersonation_chain')[source]
bucket[source]
object_name[source]
entity[source]
role[source]
generation = None[source]
user_project = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

创建 operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

请参阅 get_template_context 以获取更多上下文。

airflow.providers.google.cloud.operators.gcs.GCSFileTransformOperator(*, source_bucket, source_object, transform_script, destination_bucket=None, destination_object=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

继承自: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

将数据从源 GCS 位置复制到本地文件系统上的临时位置。

根据指定的转换脚本对该文件运行转换,并将输出上传到目标存储桶。如果未指定输出存储桶,则原始文件将被覆盖。

本地文件系统中源文件和目标文件的位置作为转换脚本的第一个和第二个参数提供。转换脚本应从源读取数据,进行转换,并将输出写入本地目标文件。

参数
  • source_bucket (str) – 源对象的存储桶。(模板化)

  • source_object (str) – 要从 GCS 中检索的键(对象路径)。(模板化)

  • destination_bucket (str | None) – 转换后上传键(对象路径)的存储桶。如果未提供,将使用 source_bucket。(模板化)

  • destination_object (str | None) – 要写入 GCS 的键(对象路径)。如果未提供,将使用 source_object。(模板化)

  • transform_script (str | list[str]) – 可执行转换脚本的位置或传递给子进程的参数列表,例如 [‘python’, ‘script.py’, 10]。(模板化)

  • gcp_conn_id (str) – 连接到 Google Cloud 时使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选) 要使用短期凭证模拟的服务帐号,或者为了获取列表中最后一个帐号的 access_token 而必需的链式帐号列表,该 access_token 将用于请求中的模拟。如果设置为字符串,则该帐号必须授予原始帐号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一个帐号将此角色授予原始帐号。(templated)

template_fields: collections.abc.Sequence[str] = ('source_bucket', 'source_object', 'destination_bucket', 'destination_object',...[source]
source_bucket[source]
source_object[source]
destination_bucket[source]
destination_object[source]
gcp_conn_id = 'google_cloud_default'[source]
transform_script[source]
output_encoding[source]
impersonation_chain = None[source]
execute(context)[source]

创建 operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

请参阅 get_template_context 以获取更多上下文。

get_openlineage_facets_on_start()[source]
airflow.providers.google.cloud.operators.gcs.GCSTimeSpanFileTransformOperator(*, source_bucket, source_prefix, source_gcp_conn_id, destination_bucket, destination_prefix, destination_gcp_conn_id, transform_script, source_impersonation_chain=None, destination_impersonation_chain=None, chunk_size=None, download_continue_on_fail=False, download_num_attempts=1, upload_continue_on_fail=False, upload_num_attempts=1, **kwargs)[source]

继承自: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

复制在指定时间范围内修改的对象,运行转换,并将结果上传到桶。

确定在 GCS 源位置的特定时间段内添加或修改的对象列表,将它们复制到本地文件系统的临时位置,根据指定的转换脚本对文件运行转换,并将输出上传到目标存储桶。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: GCSTimeSpanFileTransformOperator

本地文件系统中源文件和目标文件的位置作为转换脚本的第一个和第二个参数提供。时间跨度作为 UTC ISO 8601 字符串以第三个和第四个参数的形式传递给转换脚本。

转换脚本应从源读取数据,进行转换,并将输出写入本地目标文件。

参数
  • source_bucket (str) – 从中获取数据的存储桶。(模板化)

  • source_prefix (str) – 用于过滤对象名称以此前缀开头的字符串前缀。可以插入逻辑日期和时间组件。(模板化)

  • source_gcp_conn_id (str) – 连接到 Google Cloud 以下载要处理的文件时使用的连接 ID。

  • source_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐号,用于使用短期凭据进行模拟(以下载要处理的文件),或为了获取列表中最后一个帐号的 access_token 所需的帐号链列表,该帐号将在请求中被模拟。如果设置为字符串,该帐号必须授予发起帐号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须授予直接前一个身份 Service Account Token Creator IAM 角色,并且列表中的第一个帐号将此角色授予发起帐号(模板化)。

  • destination_bucket (str) – 用于写入数据的存储桶。(模板化)

  • destination_prefix (str) – 上传位置的字符串前缀。可以插入逻辑日期和时间组件。(模板化)

  • destination_gcp_conn_id (str) – 连接到 Google Cloud 以上传处理后的文件时使用的连接 ID。

  • destination_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐号,用于使用短期凭据进行模拟(以上传处理后的文件),或为了获取列表中最后一个帐号的 access_token 所需的帐号链列表,该帐号将在请求中被模拟。如果设置为字符串,该帐号必须授予发起帐号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须授予直接前一个身份 Service Account Token Creator IAM 角色,并且列表中的第一个帐号将此角色授予发起帐号(模板化)。

  • transform_script (str | list[str]) – 可执行转换脚本的位置或传递给子进程的参数列表,例如 [‘python’, ‘script.py’, 10]。(模板化)

  • chunk_size (int | None) – 下载或上传时的数据块大小(以字节为单位)。根据 Google Cloud Storage API 规范,此值必须是 256 KB 的倍数。

  • download_continue_on_fail (bool | None) – 如果设置为 true,则下载失败时任务不会出错,但会继续执行。

  • upload_chunk_size – 上传时的数据块大小(以字节为单位)。根据 Google Cloud Storage API 规范,此值必须是 256 KB 的倍数。

  • upload_continue_on_fail (bool | None) – 如果设置为 true,则上传失败时任务不会出错,但会继续执行。

  • upload_num_attempts (int) – 尝试上传单个文件的次数。

template_fields: collections.abc.Sequence[str] = ('source_bucket', 'source_prefix', 'destination_bucket', 'destination_prefix',...[source]
静态 interpolate_prefix(prefix, dt)[source]

将前缀与日期时间进行插值。

参数
  • prefix (str) – 要进行插值的前缀

  • dt (datetime.datetime) – 要进行插值的日期时间

source_bucket[source]
source_prefix[source]
source_gcp_conn_id[source]
source_impersonation_chain = None[source]
destination_bucket[source]
destination_prefix[source]
destination_gcp_conn_id[source]
destination_impersonation_chain = None[source]
transform_script[source]
output_encoding[source]
chunk_size = None[source]
download_continue_on_fail = False[source]
download_num_attempts = 1[source]
upload_continue_on_fail = False[source]
upload_num_attempts = 1[source]
execute(context)[source]

创建 operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

请参阅 get_template_context 以获取更多上下文。

get_openlineage_facets_on_complete(task_instance)[source]

实现 on_complete,因为 execute() 解析对象前缀。

class airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator(*, bucket_name, force=True, gcp_conn_id='google_cloud_default', impersonation_chain=None, user_project=None, **kwargs)[source]

继承自: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

从 Google Cloud Storage 中删除桶。

另请参阅

有关如何使用此操作器的更多信息,请参阅指南:删除存储桶

参数
  • bucket_name (str) – 将要删除的存储桶名称

  • force (bool) – 如果为 false,则不允许删除非空存储桶,设置为 force=True 允许删除非空存储桶

  • gcp_conn_id (str) – 连接到 Google Cloud 时使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选) 要使用短期凭证模拟的服务帐号,或者为了获取列表中最后一个帐号的 access_token 而必需的链式帐号列表,该 access_token 将用于请求中的模拟。如果设置为字符串,则该帐号必须授予原始帐号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一个帐号将此角色授予原始帐号。(templated)

  • user_project (str | None) – (可选) 此请求的计费项目标识符。对于请求方付费存储桶是必需的。

template_fields: collections.abc.Sequence[str] = ('bucket_name', 'gcp_conn_id', 'impersonation_chain', 'user_project')[source]
bucket_name[source]
force: bool = True[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
user_project = None[source]
execute(context)[source]

创建 operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

请参阅 get_template_context 以获取更多上下文。

class airflow.providers.google.cloud.operators.gcs.GCSSynchronizeBucketsOperator(*, source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, delete_extra_files=False, allow_overwrite=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

继承自: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

同步 Google Cloud Services 中的存储桶或存储桶目录内容。

参数 source_objectdestination_object 描述根同步目录。如果未传递它们,将同步整个存储桶。它们应指向目录。

注意

不支持同步单个文件。只能同步整个目录。

另请参阅

有关如何使用此操作器的更多信息,请参阅指南:GCSSynchronizeBucketsOperator

参数
  • source_bucket (str) – 包含源对象的存储桶名称。

  • destination_bucket (str) – 包含目标对象的存储桶名称。

  • source_object (str | None) – 源存储桶中的根同步目录。

  • destination_object (str | None) – 目标存储桶中的根同步目录。

  • recursive (bool) – 如果为 True,将考虑子目录

  • allow_overwrite (bool) – 如果为 True,当找到不匹配的文件时将覆盖文件。默认情况下不允许覆盖文件

  • delete_extra_files (bool) –

    如果为 True,删除源中存在但在目标中不存在的其他文件。默认情况下不删除其他文件。

    注意

    如果指定错误的源/目标组合,此选项会快速删除数据。

  • gcp_conn_id (str) – (可选) 用于连接 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选) 要使用短期凭证模拟的服务帐号,或者为了获取列表中最后一个帐号的 access_token 而必需的链式帐号列表,该 access_token 将用于请求中的模拟。如果设置为字符串,则该帐号必须授予原始帐号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一个帐号将此角色授予原始帐号。(templated)

template_fields: collections.abc.Sequence[str] = ('source_bucket', 'destination_bucket', 'source_object', 'destination_object', 'recursive',...[source]
source_bucket[source]
destination_bucket[source]
source_object = None[source]
destination_object = None[source]
recursive = True[source]
delete_extra_files = False[source]
allow_overwrite = False[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

创建 operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

请参阅 get_template_context 以获取更多上下文。

此条目是否有帮助?