airflow.providers.opensearch.log.os_task_handler

属性

OsLogMsgType

USE_PER_RUN_LOG_ID

LOG_LINE_DEFAULTS

OpensearchTaskHandler

OpensearchTaskHandler 是一个 Python 日志处理器,用于读取和写入日志到 OpenSearch。

函数

getattr_nested(obj, item, default)

从 obj 获取 item,如果未找到则返回 default。

get_os_kwargs_from_config()

模块内容

airflow.providers.opensearch.log.os_task_handler.OsLogMsgType[source]
airflow.providers.opensearch.log.os_task_handler.USE_PER_RUN_LOG_ID = True[source]
airflow.providers.opensearch.log.os_task_handler.LOG_LINE_DEFAULTS[source]
airflow.providers.opensearch.log.os_task_handler.getattr_nested(obj, item, default)[source]

从 obj 获取 item,如果未找到则返回 default。

例如,调用 getattr_nested(a, 'b.c', "NA") 将返回 a.b.c(如果该值存在),否则返回 “NA”。

airflow.providers.opensearch.log.os_task_handler.get_os_kwargs_from_config()[source]
class airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host, port, username, password, host_field='host', offset_field='offset', index_patterns=conf.get('opensearch', 'index_patterns', fallback='_all'), index_patterns_callable=conf.get('opensearch', 'index_patterns_callable', fallback=''), os_kwargs='default_os_kwargs')[source]

基类: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

OpensearchTaskHandler 是一个 Python 日志处理器,用于读取和写入日志到 OpenSearch。

与 ElasticsearchTaskHandler 类似,Airflow 本身不处理日志的索引。日志被刷新到本地文件,可能需要额外的软件(例如 Filebeat、Logstash)将日志发送到 OpenSearch。此处理程序随后启用从 OpenSearch 获取和显示日志的功能。

为了高效地查询和排序 Elasticsearch 结果,此处理程序假定每条日志消息都包含一个字段 log_id,由任务实例 (ti) 的主键组成:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日志消息根据 offset 进行排序,offset 是一个唯一的整数,表示日志消息的顺序。此处的时间戳不可靠,因为多条日志消息可能具有相同的时间戳。

参数
  • base_log_folder (str) – 在本地存储日志的基文件夹。

  • end_of_log_mark (str) – 表示日志结束的标记字符串。

  • write_stdout (bool) – 是否也将日志写入标准输出。

  • json_format (bool) – 是否将日志格式化为 JSON。

  • json_fields (str) – 要包含在 JSON 日志输出中的字段的逗号分隔列表。

  • host (str) – OpenSearch 主机名。

  • port (int) – OpenSearch 端口。

  • username (str) – 用于 OpenSearch 认证的用户名。

  • password (str) – 用于 OpenSearch 认证的密码。

  • host_field (str) – 日志中用于表示主机的字段名(默认为 “host”)。

  • offset_field (str) – 日志偏移量的字段名(默认为 “offset”)。

  • index_patterns (str) – 用于存储日志的索引模式或模板。

  • index_patterns_callable (str) – 根据上下文动态生成索引模式的可调用对象。

  • os_kwargs (dict | None | Literal['default_os_kwargs']) – 其他 OpenSearch 客户端选项。可以设置为 “default_os_kwargs” 以从 Airflow 的设置加载默认配置。

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = 'Opensearch'[source]
trigger_should_wrap = True[source]
closed = False[source]
mark_end_on_close = True[source]
end_of_log_mark[source]
write_stdout[source]
json_format[source]
json_fields[source]
host_field = 'host'[source]
offset_field = 'offset'[source]
index_patterns[source]
index_patterns_callable[source]
context_set = False[source]
client[source]
formatter: logging.Formatter[source]
handler: logging.FileHandler | logging.StreamHandler[source]
set_context(ti, *, identifier=None)[source]

为 airflow 任务处理器提供 task_instance 上下文。

参数
emit(record)[source]

执行所需操作以实际记录指定的日志记录。

此版本旨在由子类实现,因此会引发 NotImplementedError。

close()[source]

整理处理器使用的所有资源。

此版本将处理器从用于按名称查找处理器的内部处理器映射 _handlers 中移除。子类应确保从重写的 close() 方法中调用此方法。

此条目有帮助吗?