airflow.providers.google.cloud.sensors.gcs

此模块包含 Google Cloud Storage 传感器。

模块内容

GCSObjectExistenceSensor

检查 Google Cloud Storage 中是否存在文件。

GCSObjectUpdateSensor

检查 Google Cloud Storage 中的对象是否已更新。

GCSObjectsWithPrefixExistenceSensor

检查给定前缀的 GCS 对象是否存在,并通过 XCom 传递匹配项。

GCSUploadSessionCompleteSensor

如果非活动期已过且存储桶中的对象数量没有增加,则返回 True。

函数

ts_function(context)

充当 GoogleCloudStorageObjectUpdatedSensor 的默认回调函数。

get_time()

充当 datetime.datetime.now 的包装器,以简化单元测试中的模拟。

class airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor(*, bucket, object, use_glob=False, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, retry=DEFAULT_RETRY, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源代码]

基类: airflow.sensors.base.BaseSensorOperator

检查 Google Cloud Storage 中是否存在文件。

参数
  • bucket (str) – 对象所在的 Google Cloud Storage 存储桶。

  • object (str) – 要在 Google Cloud Storage 存储桶中检查的对象的名称。

  • use_glob (bool) – 设置为 True 时,object 参数将被解释为 glob

  • google_cloud_conn_id (str) – 连接到 Google Cloud Storage 时要使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的帐户链表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予直接在其前面的身份,列表中的第一个帐户则将此角色授予发起帐户(模板化)。

  • retry (google.api_core.retry.Retry) – (可选) 如何重试 RPC

template_fields: collections.abc.Sequence[str] = ('bucket', 'object', 'impersonation_chain')[源代码]
ui_color = '#f0eee4'[源代码]
poke(context)[源代码]

覆盖此类的派生时。

execute(context)[源代码]

Airflow 在工作节点上运行此方法,并使用触发器进行延迟。

execute_complete(context, event)[源代码]

充当触发器触发时的回调函数 - 立即返回。

依赖于触发器抛出异常,否则它会假定执行成功。

airflow.providers.google.cloud.sensors.gcs.ts_function(context)[源代码]

充当 GoogleCloudStorageObjectUpdatedSensor 的默认回调函数。

默认行为是检查对象是否在数据间隔结束后更新。

class airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor(bucket, object, ts_func=ts_function, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.sensors.base.BaseSensorOperator

检查 Google Cloud Storage 中的对象是否已更新。

参数
  • bucket (str) – 对象所在的 Google Cloud Storage 存储桶。

  • object (str) – 要在 Google 云存储桶中下载的对象的名称。

  • ts_func (Callable) – 用于定义更新条件的回调函数。默认回调函数返回 logical_date + schedule_interval。回调函数将上下文作为参数。

  • google_cloud_conn_id (str) – 连接到 Google Cloud Storage 时要使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的帐户链表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予直接在其前面的身份,列表中的第一个帐户则将此角色授予发起帐户(模板化)。

  • deferrable (bool) – 在可延期模式下运行传感器

template_fields: collections.abc.Sequence[str] = ('bucket', 'object', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
poke(context)[source]

覆盖此类的派生时。

execute(context)[source]

Airflow 在工作节点上运行此方法,并使用触发器进行延迟。

execute_complete(context, event=None)[source]

立即返回,并依靠触发器抛出成功事件。触发器的回调函数。

class airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefixExistenceSensor(bucket, prefix, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.sensors.base.BaseSensorOperator

检查给定前缀的 GCS 对象是否存在,并通过 XCom 传递匹配项。

当找到与给定前缀匹配的文件时,poke 方法的标准将被满足,并且匹配的对象将从操作符返回并通过 XCom 传递给下游任务。

参数
  • bucket (str) – 对象所在的 Google Cloud Storage 存储桶。

  • prefix (str) – 要在 Google 云存储桶中检查的前缀的名称。

  • google_cloud_conn_id (str) – 连接到 Google Cloud Storage 时要使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的帐户链表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予直接在其前面的身份,列表中的第一个帐户则将此角色授予发起帐户(模板化)。

  • deferrable (bool) – 在可延期模式下运行传感器

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
poke(context)[source]

覆盖此类的派生时。

execute(context)[source]

已重写以允许传递匹配项。

execute_complete(context, event)[source]

立即返回,并依靠触发器抛出成功事件。触发器的回调函数。

airflow.providers.google.cloud.sensors.gcs.get_time()[source]

充当 datetime.datetime.now 的包装器,以简化单元测试中的模拟。

class airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor(bucket, prefix, inactivity_period=60 * 60, min_objects=1, previous_objects=None, allow_delete=True, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.sensors.base.BaseSensorOperator

如果非活动期已过且存储桶中的对象数量没有增加,则返回 True。

检查 Google Cloud Storage 存储桶中前缀处的对象数量的变化,如果非活动时间段过去且对象数量没有增加,则返回 True。请注意,此传感器在重新调度模式下不会正常工作,因为 GCS 存储桶中列出的对象的状态将在重新调度的调用之间丢失。

参数
  • bucket (str) – 对象所在的 Google Cloud Storage 存储桶。

  • prefix (str) – 要在 Google 云存储桶中检查的前缀的名称。

  • inactivity_period (float) – 指定上传会话结束的非活动总秒数。请注意,此机制不是实时的,并且此操作符可能在经过此期间后,直到 poke_interval 返回才会有感应到没有额外的对象。

  • min_objects (int) – 上传会话被认为有效的所需最小对象数。

  • previous_objects (set[str] | None) – 上次 poke 期间找到的对象 ID 集。

  • allow_delete (bool) – 此传感器是否应将 poke 之间被删除的对象视为有效行为。如果为 true,则当发生这种情况时将记录警告消息。如果为 false,则会引发错误。

  • google_cloud_conn_id (str) – 连接到 Google Cloud Storage 时要使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的帐户链表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予直接在其前面的身份,列表中的第一个帐户则将此角色授予发起帐户(模板化)。

  • deferrable (bool) – 在可延期模式下运行传感器

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
is_bucket_updated(current_objects)[source]

检查是否已添加新对象且 inactivity_period 已过,并更新状态。

参数

current_objects (set[str]) – 上次探测期间存储桶中的对象 ID 集合。

poke(context)[source]

覆盖此类的派生时。

execute(context)[source]

Airflow 在工作节点上运行此方法,并使用触发器进行延迟。

execute_complete(context, event=None)[source]

依赖于触发器抛出异常,否则假定执行成功。

当触发器触发时的回调 - 立即返回。

此条目是否有帮助?