airflow.providers.opensearch.log.os_task_handler¶
属性¶
类¶
OpensearchTaskHandler 是一个 Python 日志处理器,用于读取和写入日志到 OpenSearch。 |
函数¶
|
从 obj 获取 item,如果未找到则返回 default。 |
模块内容¶
- airflow.providers.opensearch.log.os_task_handler.getattr_nested(obj, item, default)[source]¶
从 obj 获取 item,如果未找到则返回 default。
例如,调用
getattr_nested(a, 'b.c', "NA")
将返回a.b.c
(如果该值存在),否则返回 “NA”。
- class airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host, port, username, password, host_field='host', offset_field='offset', index_patterns=conf.get('opensearch', 'index_patterns', fallback='_all'), index_patterns_callable=conf.get('opensearch', 'index_patterns_callable', fallback=''), os_kwargs='default_os_kwargs')[source]¶
基类:
airflow.utils.log.file_task_handler.FileTaskHandler
,airflow.utils.log.logging_mixin.ExternalLoggingMixin
,airflow.utils.log.logging_mixin.LoggingMixin
OpensearchTaskHandler 是一个 Python 日志处理器,用于读取和写入日志到 OpenSearch。
与 ElasticsearchTaskHandler 类似,Airflow 本身不处理日志的索引。日志被刷新到本地文件,可能需要额外的软件(例如 Filebeat、Logstash)将日志发送到 OpenSearch。此处理程序随后启用从 OpenSearch 获取和显示日志的功能。
为了高效地查询和排序 Elasticsearch 结果,此处理程序假定每条日志消息都包含一个字段 log_id,由任务实例 (ti) 的主键组成:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日志消息根据 offset 进行排序,offset 是一个唯一的整数,表示日志消息的顺序。此处的时间戳不可靠,因为多条日志消息可能具有相同的时间戳。
- 参数:
base_log_folder (str) – 在本地存储日志的基文件夹。
end_of_log_mark (str) – 表示日志结束的标记字符串。
write_stdout (bool) – 是否也将日志写入标准输出。
json_format (bool) – 是否将日志格式化为 JSON。
json_fields (str) – 要包含在 JSON 日志输出中的字段的逗号分隔列表。
host (str) – OpenSearch 主机名。
port (int) – OpenSearch 端口。
username (str) – 用于 OpenSearch 认证的用户名。
password (str) – 用于 OpenSearch 认证的密码。
host_field (str) – 日志中用于表示主机的字段名(默认为 “host”)。
offset_field (str) – 日志偏移量的字段名(默认为 “offset”)。
index_patterns (str) – 用于存储日志的索引模式或模板。
index_patterns_callable (str) – 根据上下文动态生成索引模式的可调用对象。
os_kwargs (dict | None | Literal['default_os_kwargs']) – 其他 OpenSearch 客户端选项。可以设置为 “default_os_kwargs” 以从 Airflow 的设置加载默认配置。
- formatter: logging.Formatter[source]¶
- set_context(ti, *, identifier=None)[source]¶
为 airflow 任务处理器提供 task_instance 上下文。
- 参数:
ti (airflow.models.taskinstance.TaskInstance) – task instance 对象
identifier (str | None) – 如果设置,标识中继与任务实例相关的异常情况日志的 Airflow 组件