Google Cloud Storage 操作符¶
Cloud Storage 允许随时随地存储和检索任意量的数据。您可以使用 Cloud Storage 进行一系列场景,包括提供网站内容、存储数据用于存档和灾难恢复,或者通过直接下载向用户分发大型数据对象。
有关往返于 Google Cloud Storage 的专用传输操作符列表,请参阅Google Transfer Operators。
前置任务¶
要使用这些操作符,您必须执行以下几项操作:
使用Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用结算功能,如Google Cloud 文档中所述。
启用 API,如Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关安装的详细信息可在Installation 中找到。
操作符¶
GCSTimeSpanFileTransformOperator¶
使用 GCSTimeSpanFileTransformOperator
来转换在特定时间跨度(数据间隔)内修改的文件。时间跨度由其开始和结束时间戳定义。如果 DAG 没有调度 *下一个* DAG 实例,则时间跨度的结束是无限的,这意味着该操作符处理所有早于 data_interval_start
的文件。
tests/system/google/cloud/gcs/example_gcs_transform_timespan.py
gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
task_id="gcs_timespan_transform_files",
source_bucket=BUCKET_NAME_SRC,
source_prefix=SOURCE_PREFIX,
source_gcp_conn_id=SOURCE_GCP_CONN_ID,
destination_bucket=BUCKET_NAME_DST,
destination_prefix=DESTINATION_PREFIX,
destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
transform_script=["python", TRANSFORM_SCRIPT_PATH],
)
GCSBucketCreateAclEntryOperator¶
在指定的存储桶上创建一个新的 ACL 条目。
有关参数定义,请参阅 GCSBucketCreateAclEntryOperator
使用此操作符¶
tests/system/google/cloud/gcs/example_gcs_acl.py
gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
bucket=BUCKET_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_BUCKET_ROLE,
task_id="gcs_bucket_create_acl_entry_task",
)
模板化¶
template_fields: Sequence[str] = (
"bucket",
"entity",
"role",
"user_project",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Storage 文档,了解如何在存储桶中创建新的 ACL 条目:create a new ACL entry for a bucket。
GCSObjectCreateAclEntryOperator¶
在指定的对象上创建一个新的 ACL 条目。
有关参数定义,请参阅 GCSObjectCreateAclEntryOperator
使用此操作符¶
tests/system/google/cloud/gcs/example_gcs_acl.py
gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
bucket=BUCKET_NAME,
object_name=FILE_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_OBJECT_ROLE,
task_id="gcs_object_create_acl_entry_task",
)
模板化¶
template_fields: Sequence[str] = (
"bucket",
"object_name",
"entity",
"generation",
"role",
"user_project",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Storage insert 文档,了解如何为 ObjectAccess 创建 ACL 条目:create a ACL entry for ObjectAccess。
GCSListObjectsOperator¶
使用 GCSListObjectsOperator
列出 Google Cloud Storage 存储桶中的对象。您可以选择指定前缀来仅列出名称以此前缀开头的对象,并指定分隔符来模拟类似目录的组织结构。
tests/system/google/cloud/gcs/example_gcs_copy_delete.py
list_buckets = GCSListObjectsOperator(task_id="list_buckets", bucket=BUCKET_NAME_SRC)
GCSDeleteObjectsOperator¶
使用 GCSDeleteObjectsOperator
从 Google Cloud Storage 存储桶中删除一个或多个对象。
tests/system/google/cloud/gcs/example_gcs_copy_delete.py
delete_files = GCSDeleteObjectsOperator(
task_id="delete_files", bucket_name=BUCKET_NAME_SRC, objects=[FILE_NAME]
)
删除存储桶¶
删除存储桶允许您从 Google Cloud Storage 中移除存储桶。这是通过 GCSDeleteBucketOperator
操作符执行的。
tests/system/google/cloud/gcs/example_gcs_upload_download.py
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME)
您可以将 Jinja 模板化用于 bucket_name
, gcp_conn_id
, impersonation_chain
, user_project
等参数,从而动态地确定值。
参考¶
更多信息请参阅:
传感器¶
GCSObjectExistenceSensor¶
使用 GCSObjectExistenceSensor
来等待(轮询) Google Cloud Storage 中某个文件的存在。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_object_exists = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task",
)
如果您希望在传感器运行时释放 worker 槽,则还可以在此操作符中使用可延迟模式。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_object_exists_defered = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)
GCSObjectsWithPrefixExistenceSensor¶
使用 GCSObjectsWithPrefixExistenceSensor
来等待(轮询) Google Cloud Storage 中具有指定前缀的文件的存在。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task",
)
如果您希望此传感器异步运行,从而更有效地利用 Airflow 部署中的资源,可以将 deferrable
参数设置为 True。但是,此功能需要启用触发器组件才能工作。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_object_with_prefix_exists_async = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task_async",
deferrable=True,
)
GCSUploadSessionCompleteSensor¶
使用 GCSUploadSessionCompleteSensor
来检查 Google Cloud Storage 中具有指定前缀的文件数量是否发生变化。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_complete_task",
)
如果您希望在传感器运行时释放 worker 槽,可以将参数 deferrable
设置为 True。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_upload_session_async_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_async_complete",
deferrable=True,
)
GCSObjectUpdateSensor¶
使用 GCSObjectUpdateSensor
来检查 Google Cloud Storage 中的对象是否已更新。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_update_object_exists = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task",
)
如果您希望此传感器异步运行,从而有效地利用 Airflow 部署中的资源,可以将 deferrable
参数设置为 True。但是,此功能需要启用触发器组件才能工作。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_update_object_exists_async = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task_async",
deferrable=True,
)
更多信息¶
传感器具有不同的模式,用于确定任务执行期间资源的表现。有关使用传感器的最佳实践,请参阅Airflow 传感器文档。
参考¶
更多信息请参阅: