airflow.providers.elasticsearch.log.es_task_handler¶
属性¶
类¶
ElasticsearchTaskHandler 是一个从 Elasticsearch 读取日志的 python 日志处理程序。 |
函数¶
|
从 obj 获取项目,如果未找到则返回 default。 |
模块内容¶
- class airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, write_to_es=False, target_index='airflow-logs', host_field='host', offset_field='offset', host='http://localhost:9200', frontend='localhost:5601', index_patterns=conf.get('elasticsearch', 'index_patterns'), index_patterns_callable=conf.get('elasticsearch', 'index_patterns_callable', fallback=''), es_kwargs='default_es_kwargs', **kwargs)[source]¶
基类:
airflow.utils.log.file_task_handler.FileTaskHandler
,airflow.utils.log.logging_mixin.ExternalLoggingMixin
,airflow.utils.log.logging_mixin.LoggingMixin
ElasticsearchTaskHandler 是一个从 Elasticsearch 读取日志的 python 日志处理程序。
请注意,Airflow 默认不处理将日志索引到 Elasticsearch 中。相反,Airflow 将日志刷新到本地文件中。需要额外的软件设置才能将日志索引到 Elasticsearch 中,例如使用 Filebeat 和 Logstash。
可以将 Airflow 配置为支持直接将日志写入 Elasticsearch。要启用此功能,请将 json_format 和 write_to_es 设置为 True。
为了高效地查询和排序 Elasticsearch 结果,此处理程序假定每条日志消息都有一个由 ti 主键组成的 log_id 字段:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日志消息根据 offset 进行排序,offset 是一个唯一整数,表示日志消息的顺序。此处的时间戳不可靠,因为多条日志消息可能具有相同的时间戳。
- formatter: logging.Formatter[source]¶
- static format_url(host)[source]¶
格式化给定的主机字符串,确保它以“http”开头,并检查它是否表示一个有效的 URL。
- 参数 host:
要格式化和检查的主机字符串。
- set_context(ti, *, identifier=None)[source]¶
向 airflow 任务处理程序提供 task_instance 上下文。
- 参数:
ti (airflow.models.taskinstance.TaskInstance) – 任务实例对象
identifier (str | None) – 如果设置,则标识从与任务实例相关的异常场景中中继日志的 Airflow 组件
- close()[source]¶
清理处理程序使用的所有资源。
此版本会从内部处理程序映射 _handlers 中移除处理程序,该映射用于按名称查找处理程序。子类应确保在覆盖的 close() 方法中调用此方法。
- get_external_log_url(task_instance, try_number)[source]¶
为外部日志收集服务创建地址。
- 参数:
task_instance (airflow.models.taskinstance.TaskInstance) – 任务实例对象
try_number (int) – 要从中读取日志的任务实例尝试次数(try_number)。
- 返回:
外部日志收集服务的 URL
- 返回类型: