在 Google Cloud Storage 中传输数据¶
Google Cloud Storage (GCS) 用于存储来自各种应用程序的大型数据。请注意,在 GCS 术语中,文件被称为对象,因此在本指南中,“对象”和“文件”这两个术语可以互换使用。有几个操作器用于作为 Google Cloud 服务的一部分复制数据。本页面展示了如何使用这些操作器。另请参阅 Google Cloud Storage 操作器,了解用于管理 Google Cloud Storage 存储桶的操作器。
Cloud Storage Transfer Service¶
有许多操作器用于管理 Google Cloud Data Transfer 服务。如果您想创建新的数据传输任务,请使用操作器 CloudDataTransferServiceCreateJobOperator
。您也可以使用该服务的先前操作器 - CloudDataTransferServiceGCSToGCSOperator
。
这些操作器不会在本地控制复制过程,而是使用 Google 资源,这使得它们能够更快、更经济地执行此任务。当 Airflow 未托管在 Google Cloud 中时,经济效益尤为突出,因为这些操作器可以减少出站流量。
如果启用指定对象传输到目标后应从源删除的选项,这些操作器将修改源对象。
当您使用 Google Cloud Data Transfer 服务时,您可以指定是否允许覆盖目标中已存在的对象,是否应删除仅存在于目标的,或者对象传输到目标后是否应从源删除。
可以使用包含和排除前缀以及基于文件修改日期来指定源对象。
如果您需要有关如何使用的信息,请参阅指南:Google Cloud Transfer Service 操作器
适用于 Google Cloud Storage 的专用传输操作器¶
请参阅 Google 传输操作器,了解往返 Google Cloud Storage 的专用传输操作器列表。
本地传输¶
有两个操作器用于复制数据,整个过程都在本地控制。
下一节将介绍它们。
先决条件任务¶
要使用这些操作器,您必须完成一些准备工作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
按照 Google Cloud 文档中的说明,为您的项目启用结算功能。
按照 Cloud Console 文档中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'安装的详细信息请参见 安装。
操作器¶
GCSToGCSOperator¶
GCSToGCSOperator
允许您在 GCS 中复制一个或多个文件。文件可以在两个不同的存储桶之间或同一存储桶内复制。复制始终进行,不考虑目标存储桶的初始状态。
仅当文件移动选项处于活动状态时,此操作器才会删除源存储桶中的对象。在两个不同存储桶之间复制文件时,此操作器从不删除目标存储桶中的数据。
当您使用此操作器时,您可以指定对象传输到目标后是否应从源删除。可以使用单个通配符以及基于文件修改日期来指定源对象。
可以根据对象的路径使用 match_glob 字段进行过滤。您应避免在源对象的路径中使用 delimiter
字段或通配符,因为这两种做法都已弃用。此外,还可以基于文件的创建日期 (is_older_than
) 或修改日期 (last_modified_time
和 maximum_modified_time
) 进行过滤。
此操作器默认的工作方式可以与 cp
命令进行比较。当文件移动选项处于活动状态时,此操作器功能类似于 mv
命令。
下面是使用 GCSToGCSOperator 复制单个文件、使用通配符复制多个文件、复制多个文件、移动单个文件和移动多个文件的示例。
复制单个文件¶
以下示例会将单个文件 OBJECT_1
从 BUCKET_1_SRC
GCS 存储桶复制到 BUCKET_1_DST
存储桶。注意,如果标志 exact_match=False
,则 source_object
将被视为在 BUCKET_1_SRC
GCS 存储桶中搜索对象的**前缀**。因此,如果找到任何匹配项,它们也将被复制。为防止这种情况发生,请使用 exact_match=False
。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
copy_single_file = GCSToGCSOperator(
task_id="copy_single_gcs_file",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + OBJECT_1,
destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used
destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used
exact_match=True,
)
复制多个文件¶
复制多个文件有几种方法,下面展示了各种示例。
如前所述,delimiter
字段以及在源对象中使用通配符 (*
) 都已弃用。因此,不建议使用它们,而是使用 match_glob
,如下所示:
以下示例将复制与 data/
文件夹中的 glob 模式匹配的文件,从 BUCKET_1_SRC
GCS 存储桶复制到 BUCKET_1_DST
存储桶的 backup/
文件夹中。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
copy_files_with_match_glob = GCSToGCSOperator(
task_id="copy_files_with_match_glob",
source_bucket=BUCKET_NAME_SRC,
source_object="data/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
match_glob="**/*.txt",
)
以下示例将复制 subdir/
文件夹中的所有文件(即 subdir/a.csv、subdir/b.csv、subdir/c.csv),从 BUCKET_1_SRC
GCS 存储桶复制到 BUCKET_1_DST
存储桶的 backup/
文件夹中(即 backup/a.csv、backup/b.csv、backup/c.csv)。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
copy_files = GCSToGCSOperator(
task_id="copy_files",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + "subdir/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
copy_files_with_list = GCSToGCSOperator(
task_id="copy_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[
PREFIX + OBJECT_1,
PREFIX + OBJECT_2,
], # Instead of files each element could be a wildcard expression
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
最后,可以通过省略 source_object
参数而向 source_objects
参数提供列表来复制文件。在此示例中,OBJECT_1
和 OBJECT_2
将从 BUCKET_1_SRC
复制到 BUCKET_1_DST
。提供大小为 1 的列表功能与向 source_object
参数提供值相同。
移动单个文件¶
将 True
提供给 move
参数会使操作器在复制完成后删除 source_object
。注意,如果标志 exact_match=False
,则 source_object
将被视为在 BUCKET_1_SRC
GCS 存储桶中搜索对象的**前缀**。因此,如果找到任何匹配项,它们也将被复制。为防止这种情况发生,请使用 exact_match=False
。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
move_single_file = GCSToGCSOperator(
task_id="move_single_file",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + OBJECT_1,
destination_bucket=BUCKET_NAME_DST,
destination_object="backup_" + OBJECT_1,
exact_match=True,
move_object=True,
)
移动多个文件¶
通过向 move
参数提供 True
可以移动多个文件。有关通配符和 delimiter
参数的相同规则也适用于移动和复制。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
move_files_with_list = GCSToGCSOperator(
task_id="move_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[PREFIX + OBJECT_1, PREFIX + OBJECT_2],
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
GCSSynchronizeBucketsOperator¶
GCSSynchronizeBucketsOperator
操作器检查目标存储桶的初始状态,然后将其与源存储桶进行比较。基于此,它会创建一个操作计划,描述应从目标存储桶删除哪些对象、应覆盖哪些对象以及应复制哪些对象。
此操作器从不修改源存储桶中的数据。
当您使用此操作器时,您可以指定是否允许覆盖目标中已存在的对象,是否应删除仅存在于目标的,是否处理子目录或应处理哪个子目录。
此操作器的工作方式可以与 rsync
命令进行比较。
基本同步¶
以下示例将确保 BUCKET_1_SRC
中的所有文件(包括子目录中的文件)也存在于 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名文件,则不会覆盖它们。不会删除 BUCKET_1_DST
中不存在于 BUCKET_1_SRC
的任何文件。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
sync_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)
完整存储桶同步¶
此示例将确保 BUCKET_1_SRC
中的所有文件(包括子目录中的文件)也存在于 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名文件,则会覆盖它们。会删除 BUCKET_1_DST
中不存在于 BUCKET_1_SRC
的任何文件。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
sync_full_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_full_bucket",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
delete_extra_files=True,
allow_overwrite=True,
)
同步到子目录¶
以下示例将确保 BUCKET_1_SRC
中的所有文件(包括子目录中的文件)也存在于 BUCKET_1_DST
存储桶的 subdir
文件夹中。如果 BUCKET_1_DST/subdir
中已存在同名文件,则不会覆盖它们,也不会删除 BUCKET_1_DST/subdir
中不存在于 BUCKET_1_SRC
的任何文件。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
sync_to_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_to_subdirectory",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
destination_object="subdir/",
)
从子目录同步¶
此示例将确保 BUCKET_1_SRC/subdir
中的所有文件(包括子目录中的文件)也存在于 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名文件,则不会覆盖它们,也不会删除 BUCKET_1_DST
中不存在于 BUCKET_1_SRC/subdir
的任何文件。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
sync_from_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_from_subdirectory",
source_bucket=BUCKET_NAME_SRC,
source_object="subdir/",
destination_bucket=BUCKET_NAME_DST,
)
参考¶
更多信息,请参阅