airflow.providers.google.cloud.log.stackdriver_task_handler

集成 Stackdriver 的处理器。

属性

DEFAULT_LOGGER_NAME

StackdriverTaskHandler

直接进行 Stackdriver 日志 API 调用的处理器。

模块内容

airflow.providers.google.cloud.log.stackdriver_task_handler.DEFAULT_LOGGER_NAME = 'airflow'[source]
class airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler(gcp_key_path=None, scopes=_DEFAULT_SCOPESS, name=NOTSET, transport=BackgroundThreadTransport, resource=_GLOBAL_RESOURCE, labels=None, gcp_log_name=DEFAULT_LOGGER_NAME)[source]

继承自: logging.Handler

直接进行 Stackdriver 日志 API 调用的处理器。

这是 Python 标准 logging 处理器,可用于将 Python 标准日志消息直接路由到 Stackdriver 日志 API。

它也可用于保存执行任务的日志。为此,应将其设置为名称为“tasks”的处理器。在这种情况下,它也将用于读取日志以便在 Web UI 中显示。

此处理器支持异步和同步传输。

参数:
  • gcp_key_path (str | None) – Google Cloud 凭据 JSON 文件的路径。如果省略,将使用基于 Application Default Credentials 的授权。

  • scopes (collections.abc.Collection[str] | None) – 凭据的 OAuth 范围,

  • name (str | airflow.utils.types.ArgNotSet) – Stackdriver 日志中自定义日志的名称。默认为 'airflow'。Python 日志记录器的名称将在 python_logger 字段中表示。

  • transport (type[google.cloud.logging.handlers.transports.Transport]) – 用于创建新传输对象的类。它应继承自基础 google.cloud.logging.handlers.Transport 类型并实现 :meth`google.cloud.logging.handlers.Transport.send`。默认为 google.cloud.logging.handlers.BackgroundThreadTransport。另一个选项是 google.cloud.logging.handlers.SyncTransport

  • resource (google.cloud.logging.Resource) – (可选)条目的受监控资源,默认为全局资源类型。

  • labels (dict[str, str] | None) – (可选)条目的标签映射。

LABEL_TASK_ID = 'task_id'[source]
LABEL_DAG_ID = 'dag_id'[source]
LABEL_LOGICAL_DATE = 'logical_date'[source]
LABEL_TRY_NUMBER = 'try_number'[source]
LOG_VIEWER_BASE_URL = 'https://console.cloud.google.com/logs/viewer'[source]
LOG_NAME = 'Google Stackdriver'[source]
trigger_supported = True[source]
trigger_should_queue = False[source]
trigger_should_wrap = False[source]
trigger_send_end_marker = False[source]
gcp_key_path: str | None = None[source]
scopes: collections.abc.Collection[str] | None[source]
gcp_log_name: str = 'airflow'[source]
transport_type: type[google.cloud.logging.handlers.transports.Transport][source]
resource: google.cloud.logging.Resource[source]
labels: dict[str, str] | None = None[source]
task_instance_labels: dict[str, str] | None[source]
task_instance_hostname = 'default-hostname'[source]
emit(record)[source]

实际记录指定的日志记录。

参数:

record (logging.LogRecord) – 要记录的记录。

set_context(task_instance)[source]

配置日志记录器以添加有关当前任务的信息。

参数:

task_instance (airflow.models.TaskInstance) – 当前执行的任务

read(task_instance, try_number=None, metadata=None)[source]

从 Stackdriver 日志读取给定任务实例的日志。

参数:
  • task_instance (airflow.models.TaskInstance) – 任务实例对象

  • try_number (int | None) – 要从中读取日志的任务实例尝试次数 (try_number)。如果为 None,则返回所有日志

  • metadata (dict | None) – 日志元数据。用于流式日志读取和自动尾随。

返回:

一个元组,包含(主机名和日志组成的双元素元组的单元素元组列表,以及元数据列表)

返回类型:

tuple[list[tuple[tuple[str, str]]], list[dict[str, str | bool]]]

property log_name[source]

返回日志名称。

get_external_log_url(task_instance, try_number)[source]

创建外部日志收集服务的地址。

参数:
  • task_instance (airflow.models.TaskInstance) – 任务实例对象

  • try_number (int) – 要从中读取日志的任务实例尝试次数 (try_number)

返回:

外部日志收集服务的 URL

返回类型:

str

close()[source]

清理此处理器使用的所有资源。

此版本将处理器从内部处理器映射 _handlers 中移除,该映射用于按名称查找处理器。子类应确保在覆盖的 close() 方法中调用此方法。

本条目是否有帮助?