将日志写入 Elasticsearch

可以将 Airflow 配置为从 Elasticsearch 读取任务日志,并可选择将日志以标准格式或 JSON 格式写入 stdout。然后可以使用 fluentd、logstash 或其他工具收集这些日志并将其转发到 Elasticsearch 集群。

Airflow 还支持直接将日志写入 Elasticsearch,无需 filebeat 和 logstash 等额外软件。要启用此功能,请在 airflow.cfg 中将 write_to_esjson_format 设置为 True,将 write_stdout 设置为 False。请注意,如果在日志记录部分同时将 write_to_esdelete_local_logs 设置为 true,则 Airflow 在成功将任务日志写入 ElasticSearch 后会删除本地的任务日志副本。

您可以选择将所有来自 worker 的任务日志输出到最高父级进程,而不是标准文件位置。这在 Kubernetes 等容器环境中提供了一些额外的灵活性,因为容器的 stdout 已经记录到了宿主节点上。然后可以使用日志传输工具将其转发到 Elasticsearch。要使用此功能,请在 airflow.cfg 中设置 write_stdout 选项。您还可以选择使用 json_format 选项将日志输出为 JSON 格式。Airflow 使用标准的 Python 日志记录模块,JSON 字段直接从 LogRecord 对象中提取。要使用此功能,请在 airflow.cfg 中设置 json_fields 选项。将您希望为日志收集的字段添加到逗号分隔的字符串中。这些字段来自 logging 模块中的 LogRecord 对象。不同属性的文档可以在此处找到

首先,要使用此 handler,airflow.cfg 必须按如下方式配置

[logging]
remote_logging = True

[elasticsearch]
host = <host>:<port>

要将任务日志以 JSON 格式输出到 stdout,可以使用以下配置

[logging]
remote_logging = True

[elasticsearch]
host = <host>:<port>
write_stdout = True
json_format = True

要将任务日志输出到 ElasticSearch,可以使用以下配置:(如果您不想保留任务日志的本地副本,请将 delete_local_logs 设置为 true)

[logging]
remote_logging = True
delete_local_logs = False

[elasticsearch]
host = <host>:<port>
write_stdout = False
json_format = True
write_to_es = True
target_index = [name of the index to store logs]

通过 TLS 将日志写入 Elasticsearch

要向 ElasticSearch 添加自定义配置(例如,开启 ssl_verify、添加自定义自签名证书等),请在您的 airflow.cfg 中使用 elasticsearch_configs 设置

请注意,当您启用将日志写入 ElasticSearch 时,这些配置也适用

[logging]
remote_logging = True

[elasticsearch_configs]
verify_certs=True
ca_certs=/path/to/CA_certs

此外,在 elasticsearch_configs 部分,您可以传递 Elasticsearch Python 客户端支持的任何参数。这些参数将直接传递给 elasticsearch.Elasticsearch(**kwargs) 客户端。例如

[elasticsearch_configs]
http_compress = True
ca_certs = /root/ca.pem
api_key = "SOMEAPIKEY"
verify_certs = True

[elasticsearch] log_id_template 的更改

如果您需要更改 [elasticsearch] log_id_template,Airflow 2.3.0+ 能够跟踪旧值,以便您现有的任务运行日志仍然可以获取。一旦您使用了 Airflow 2.3.0+,通常情况下,您可以随意更改 log_id_template,Airflow 将会跟踪这些更改。

然而,当您升级到 2.3.0+ 时,Airflow 可能无法正确保存您之前的 log_id_template。如果升级后发现任务日志无法访问,请尝试在 log_template 表中添加一行 id=0,其中包含您之前的 log_id_template`. 例如,如果您在 2.2.5 中使用了默认设置

INSERT INTO log_template (id, filename, elasticsearch_id, created_at) VALUES (0, '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log', '{dag_id}-{task_id}-{execution_date}-{try_number}', NOW());

此条目是否有帮助?