在 Google Cloud Storage 中传输数据

Google Cloud Storage (GCS) 用于存储来自各种应用程序的大型数据。请注意,在 GCS 术语中,文件被称为对象,因此在本指南中,“对象”和“文件”这两个术语可以互换使用。有几个操作器用于作为 Google Cloud 服务的一部分复制数据。本页面展示了如何使用这些操作器。另请参阅 Google Cloud Storage 操作器,了解用于管理 Google Cloud Storage 存储桶的操作器。

Cloud Storage Transfer Service

有许多操作器用于管理 Google Cloud Data Transfer 服务。如果您想创建新的数据传输任务,请使用操作器 CloudDataTransferServiceCreateJobOperator。您也可以使用该服务的先前操作器 - CloudDataTransferServiceGCSToGCSOperator

这些操作器不会在本地控制复制过程,而是使用 Google 资源,这使得它们能够更快、更经济地执行此任务。当 Airflow 未托管在 Google Cloud 中时,经济效益尤为突出,因为这些操作器可以减少出站流量。

如果启用指定对象传输到目标后应从源删除的选项,这些操作器将修改源对象。

当您使用 Google Cloud Data Transfer 服务时,您可以指定是否允许覆盖目标中已存在的对象,是否应删除仅存在于目标的,或者对象传输到目标后是否应从源删除。

可以使用包含和排除前缀以及基于文件修改日期来指定源对象。

如果您需要有关如何使用的信息,请参阅指南:Google Cloud Transfer Service 操作器

适用于 Google Cloud Storage 的专用传输操作器

请参阅 Google 传输操作器,了解往返 Google Cloud Storage 的专用传输操作器列表。

本地传输

有两个操作器用于复制数据,整个过程都在本地控制。

下一节将介绍它们。

先决条件任务

要使用这些操作器,您必须完成一些准备工作

操作器

GCSToGCSOperator

GCSToGCSOperator 允许您在 GCS 中复制一个或多个文件。文件可以在两个不同的存储桶之间或同一存储桶内复制。复制始终进行,不考虑目标存储桶的初始状态。

仅当文件移动选项处于活动状态时,此操作器才会删除源存储桶中的对象。在两个不同存储桶之间复制文件时,此操作器从不删除目标存储桶中的数据。

当您使用此操作器时,您可以指定对象传输到目标后是否应从源删除。可以使用单个通配符以及基于文件修改日期来指定源对象。

可以根据对象的路径使用 match_glob 字段进行过滤。您应避免在源对象的路径中使用 delimiter 字段或通配符,因为这两种做法都已弃用。此外,还可以基于文件的创建日期 (is_older_than) 或修改日期 (last_modified_timemaximum_modified_time) 进行过滤。

此操作器默认的工作方式可以与 cp 命令进行比较。当文件移动选项处于活动状态时,此操作器功能类似于 mv 命令。

下面是使用 GCSToGCSOperator 复制单个文件、使用通配符复制多个文件、复制多个文件、移动单个文件和移动多个文件的示例。

复制单个文件

以下示例会将单个文件 OBJECT_1BUCKET_1_SRC GCS 存储桶复制到 BUCKET_1_DST 存储桶。注意,如果标志 exact_match=False,则 source_object 将被视为在 BUCKET_1_SRC GCS 存储桶中搜索对象的**前缀**。因此,如果找到任何匹配项,它们也将被复制。为防止这种情况发生,请使用 exact_match=False

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

copy_single_file = GCSToGCSOperator(
    task_id="copy_single_gcs_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,  # If not supplied the source_bucket value will be used
    destination_object="backup_" + OBJECT_1,  # If not supplied the source_object value will be used
    exact_match=True,
)

复制多个文件

复制多个文件有几种方法,下面展示了各种示例。

如前所述,delimiter 字段以及在源对象中使用通配符 (*) 都已弃用。因此,不建议使用它们,而是使用 match_glob,如下所示:

以下示例将复制与 data/ 文件夹中的 glob 模式匹配的文件,从 BUCKET_1_SRC GCS 存储桶复制到 BUCKET_1_DST 存储桶的 backup/ 文件夹中。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

copy_files_with_match_glob = GCSToGCSOperator(
    task_id="copy_files_with_match_glob",
    source_bucket=BUCKET_NAME_SRC,
    source_object="data/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
    match_glob="**/*.txt",
)

以下示例将复制 subdir/ 文件夹中的所有文件(即 subdir/a.csv、subdir/b.csv、subdir/c.csv),从 BUCKET_1_SRC GCS 存储桶复制到 BUCKET_1_DST 存储桶的 backup/ 文件夹中(即 backup/a.csv、backup/b.csv、backup/c.csv)。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

copy_files = GCSToGCSOperator(
    task_id="copy_files",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + "subdir/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

copy_files_with_list = GCSToGCSOperator(
    task_id="copy_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[
        PREFIX + OBJECT_1,
        PREFIX + OBJECT_2,
    ],  # Instead of files each element could be a wildcard expression
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

最后,可以通过省略 source_object 参数而向 source_objects 参数提供列表来复制文件。在此示例中,OBJECT_1OBJECT_2 将从 BUCKET_1_SRC 复制到 BUCKET_1_DST。提供大小为 1 的列表功能与向 source_object 参数提供值相同。

移动单个文件

True 提供给 move 参数会使操作器在复制完成后删除 source_object。注意,如果标志 exact_match=False,则 source_object 将被视为在 BUCKET_1_SRC GCS 存储桶中搜索对象的**前缀**。因此,如果找到任何匹配项,它们也将被复制。为防止这种情况发生,请使用 exact_match=False

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

move_single_file = GCSToGCSOperator(
    task_id="move_single_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup_" + OBJECT_1,
    exact_match=True,
    move_object=True,
)

移动多个文件

通过向 move 参数提供 True 可以移动多个文件。有关通配符和 delimiter 参数的相同规则也适用于移动和复制。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

move_files_with_list = GCSToGCSOperator(
    task_id="move_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[PREFIX + OBJECT_1, PREFIX + OBJECT_2],
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

GCSSynchronizeBucketsOperator

GCSSynchronizeBucketsOperator 操作器检查目标存储桶的初始状态,然后将其与源存储桶进行比较。基于此,它会创建一个操作计划,描述应从目标存储桶删除哪些对象、应覆盖哪些对象以及应复制哪些对象。

此操作器从不修改源存储桶中的数据。

当您使用此操作器时,您可以指定是否允许覆盖目标中已存在的对象,是否应删除仅存在于目标的,是否处理子目录或应处理哪个子目录。

此操作器的工作方式可以与 rsync 命令进行比较。

基本同步

以下示例将确保 BUCKET_1_SRC 中的所有文件(包括子目录中的文件)也存在于 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名文件,则不会覆盖它们。不会删除 BUCKET_1_DST 中不存在于 BUCKET_1_SRC 的任何文件。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

sync_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)

完整存储桶同步

此示例将确保 BUCKET_1_SRC 中的所有文件(包括子目录中的文件)也存在于 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名文件,则会覆盖它们。会删除 BUCKET_1_DST 中不存在于 BUCKET_1_SRC 的任何文件。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

sync_full_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_full_bucket",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    delete_extra_files=True,
    allow_overwrite=True,
)

同步到子目录

以下示例将确保 BUCKET_1_SRC 中的所有文件(包括子目录中的文件)也存在于 BUCKET_1_DST 存储桶的 subdir 文件夹中。如果 BUCKET_1_DST/subdir 中已存在同名文件,则不会覆盖它们,也不会删除 BUCKET_1_DST/subdir 中不存在于 BUCKET_1_SRC 的任何文件。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

sync_to_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_to_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="subdir/",
)

从子目录同步

此示例将确保 BUCKET_1_SRC/subdir 中的所有文件(包括子目录中的文件)也存在于 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名文件,则不会覆盖它们,也不会删除 BUCKET_1_DST 中不存在于 BUCKET_1_SRC/subdir 的任何文件。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

sync_from_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_from_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_NAME_DST,
)

参考

更多信息,请参阅

此条目有帮助吗?