airflow.providers.google.cloud.transfers.gcs_to_gcs

此模块包含一个 Google Cloud Storage operator。

属性

WILDCARD

GCSToGCSOperator

将对象从一个 bucket 复制到另一个,如果需要,可以重命名。

模块内容

airflow.providers.google.cloud.transfers.gcs_to_gcs.WILDCARD = '*'[source]
class airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSToGCSOperator(*, source_bucket, source_object=None, source_objects=None, destination_bucket=None, destination_object=None, delimiter=None, move_object=False, replace=True, gcp_conn_id='google_cloud_default', last_modified_time=None, maximum_modified_time=None, is_older_than=None, impersonation_chain=None, source_object_required=False, exact_match=False, match_glob=None, **kwargs)[source]

继承自: airflow.models.BaseOperator

将对象从一个 bucket 复制到另一个,如果需要,可以重命名。

另请参阅

有关如何使用此 operator 的更多信息,请参阅指南: GCSToGCSOperator

参数:
  • source_bucket – 源 Google Cloud Storage bucket,对象位于其中。(templated)

  • source_object – 要复制的 Google Cloud Storage bucket 中对象的源名称。(templated) 您只能在 bucket 内的对象(文件名)中使用一个通配符。通配符可以出现在对象名称内部或末尾。在 bucket 名称末尾附加通配符不受支持。

  • source_objects – 要复制的 Google Cloud Storage bucket 中对象的源名称列表。(templated)

  • destination_bucket – 对象应存放的目标 Google Cloud Storage bucket。如果 destination_bucket 为 None,则默认为 source_bucket。(templated)

  • destination_object – 目标 Google Cloud Storage bucket 中对象的目标名称。(templated) 如果 source_object 参数中提供了通配符,则这是将添加到最终目标对象路径前缀。请注意,通配符之前的源路径部分将被移除;如果需要保留,应将其添加到 destination_object。例如,对于前缀 foo/* 和 destination_object blah/,文件 foo/baz 将被复制到 blah/baz;要保留前缀,可以将 destination_object 写为例如 blah/foo,在这种情况下,复制的文件将命名为 blah/foo/baz。source_objects 内的源对象也适用同样规则。

  • move_object – 当 move_object 为 True 时,对象将被移动而不是复制到新位置。这相当于 mv 命令而非 cp 命令。

  • replace – 是否要替换现有的目标文件。

  • delimiter – (已废弃) 用于将结果限制为给定“文件夹”中的“文件”。如果 source_objects = [‘foo/bah/’] 且 delimiter = ‘.avro’,则只会将“foo/bah/”文件夹中使用“.avro”作为分隔符的“文件”复制到目标对象。

  • gcp_conn_id – (可选) 用于连接到 Google Cloud 的连接 ID。

  • last_modified_time – 指定时,只有在 last_modified_time 之后修改的对象才会被复制或移动。如果未设置 tzinfo,则假定为 UTC。

  • maximum_modified_time – 指定时,只有在 maximum_modified_time 之前修改的对象才会被复制或移动。如果未设置 tzinfo,则假定为 UTC。

  • is_older_than – 指定时,只有比指定时间(以秒为单位)旧的对象才会被复制。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐号,使用短期凭据模拟,或通过链式列表模拟列表中的最后一个帐号,该帐号的 access_token 将被用于请求。如果设置为字符串,则该帐号必须向原始帐号授予 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须向直接前置的身份授予 Service Account Token Creator IAM 角色,列表中的第一个帐号将此角色授予原始帐号 (templated)。

  • source_object_required – 当源对象不存在时是否引发异常。当源对象是文件夹或模式时,此参数无效。

  • exact_match – 指定时,只复制源对象(文件名)的完全匹配项。

  • match_glob (str | None) – (可选) 根据给定的 glob 模式字符串过滤对象 ( 例如, '**/*/.json')

示例:

以下 Operator 会将 data bucket 中名为 sales/sales-2017/january.avro 的单个文件复制到 data_backup bucket 中名为 copied_sales/2017/january-backup.avro 的文件。

copy_single_file = GCSToGCSOperator(
    task_id="copy_single_file",
    source_bucket="data",
    source_objects=["sales/sales-2017/january.avro"],
    destination_bucket="data_backup",
    destination_object="copied_sales/2017/january-backup.avro",
    exact_match=True,
    gcp_conn_id=google_cloud_conn_id,
)

以下 Operator 会将 data bucket 中 sales/sales-2017 文件夹(即名称以此前缀开头的所有文件)中的所有 Avro 文件复制到 data_backup bucket 中的 copied_sales/2017 文件夹。

copy_files = GCSToGCSOperator(
    task_id='copy_files',
    source_bucket='data',
    source_objects=['sales/sales-2017'],
    destination_bucket='data_backup',
    destination_object='copied_sales/2017/',
    match_glob='**/*.avro'
    gcp_conn_id=google_cloud_conn_id
)

Or ::

copy_files = GCSToGCSOperator(
    task_id='copy_files',
    source_bucket='data',
    source_object='sales/sales-2017/*.avro',
    destination_bucket='data_backup',
    destination_object='copied_sales/2017/',
    gcp_conn_id=google_cloud_conn_id
)

以下 Operator 会将 data bucket 中 sales/sales-2017 文件夹(即名称以此前缀开头的所有文件)中的所有 Avro 文件移动到 data_backup bucket 中的相同文件夹,并在过程中删除原始文件。

move_files = GCSToGCSOperator(
    task_id="move_files",
    source_bucket="data",
    source_object="sales/sales-2017/*.avro",
    destination_bucket="data_backup",
    move_object=True,
    gcp_conn_id=google_cloud_conn_id,
)
以下 Operator 会将 data bucket 中 sales/sales-2019

sales/sales-2020 文件夹中的所有 Avro 文件移动到 data_backup bucket 中的相同文件夹,并在过程中删除原始文件。

move_files = GCSToGCSOperator(
    task_id="move_files",
    source_bucket="data",
    source_objects=["sales/sales-2019/*.avro", "sales/sales-2020"],
    destination_bucket="data_backup",
    delimiter=".avro",
    move_object=True,
    gcp_conn_id=google_cloud_conn_id,
)
template_fields: collections.abc.Sequence[str] = ('source_bucket', 'source_object', 'source_objects', 'destination_bucket', 'destination_object',...[source]
ui_color = '#f0eee4'[source]
source_bucket[source]
source_object = None[source]
source_objects = None[source]
destination_bucket = None[source]
destination_object = None[source]
delimiter = None[source]
move_object = False[source]
replace = True[source]
gcp_conn_id = 'google_cloud_default'[source]
last_modified_time = None[source]
maximum_modified_time = None[source]
is_older_than = None[source]
impersonation_chain = None[source]
source_object_required = False[source]
exact_match = False[source]
match_glob = None[source]
execute(context)[source]

在创建 operator 时派生。

Context 是与渲染 jinja 模板时使用的字典相同。

有关更多 context,请参阅 get_template_context。

get_openlineage_facets_on_complete(task_instance)[source]

实现 _on_complete,因为 execute 方法会对内部进行预处理。

这意味着我们无需规范化 self.source_object 和 self.source_objects、目标 bucket 等。

此条目有帮助吗?