Airflow 中的审计日志
理解审计日志
审计日志是 Airflow 系统的历史记录,记录了谁在何时执行了什么操作。这些日志对于维护系统完整性、满足合规性要求以及在出现问题时进行取证分析至关重要。
本质上,审计日志回答了三个基本问题:
Who(谁):哪个用户或系统组件发起了操作
What(什么):执行的具体操作是什么
When(何时):事件发生的精确时间戳
审计日志的主要目的包括:
监管合规:满足数据治理和审计追踪的要求
安全监控:检测未经授权的访问或可疑活动
运维排障:了解导致系统问题的事件序列
变更管理:跟踪对关键系统组件的修改
注意
访问审计日志需要 Audit Logs.can_read 权限。拥有此权限的用户可以查看所有审计条目,无论其是否具备特定 DAG 的访问权限。
理解事件日志
事件日志代表了 Airflow 系统的运营心跳。与侧重于问责和合规的审计日志不同,事件日志捕获的是系统行为、应用程序性能和运维指标的技术细节。
事件日志具有几个关键功能:
调试与排障:提供详细的错误消息和堆栈跟踪
性能监控:记录执行时间、资源使用情况和系统指标
运营洞察:跟踪系统健康状况、组件交互和工作流执行情况
开发支持:为代码调试和优化提供详细信息
事件日志通常存储在日志文件或外部日志系统中,并包含以下信息:
任务执行详情和输出
系统错误和警告
性能指标和计时信息
组件启动和关闭事件
资源利用数据
审计日志与事件日志对比
尽管两种日志系统对于系统管理都至关重要,但它们的服务目的和受众各不相同:
特性 |
审计日志 |
事件日志 |
|---|---|---|
主要目的 |
问责与合规跟踪 |
运营监控与系统调试 |
目标受众 |
安全团队、审计员、合规官 |
开发人员、系统管理员、运维团队 |
内容重点 |
用户操作与管理变更 |
系统行为、错误与性能数据 |
存储位置 |
结构化数据库表 ( |
日志文件、外部日志系统 |
保留要求 |
长期(合规需要几个月到几年),若未从数据库清除 |
短至中期(几天到几周) |
查询模式 |
“谁清除了任务实例以进行重新执行?” |
通常不进行查询(除非使用日志聚合框架)。通常按任务执行读取日志,描述:“为什么这次任务执行失败?” |
访问审计日志
Airflow 提供了多种访问审计日志数据的接口,每种接口都适用于不同的用例和技术要求。
- Web 用户界面
Airflow Web 界面提供了查看审计日志最便捷的方法。导航至 Browse → Audit Logs 即可访问具备内置过滤、排序和搜索功能的界面。此界面非常适合临时调查和日常监控。
- REST API 集成
如需编程访问和系统集成,请使用
/eventLogsREST API 端点。此方法支持自动化监控、与外部安全工具的集成以及自定义报告应用程序。
审计日志的范围
Airflow 的审计日志系统捕获三个不同运营领域的事件:
- 用户发起的动作
当用户通过任何接口(Web UI、REST API 或命令行工具)与 Airflow 交互时,会产生这些事件。示例包括:
手动触发 DAG 运行和修改
变量、连接和池的配置变更
任务实例状态修改(清除、标记为成功/失败)
管理操作和用户管理活动
- 系统生成的事件
这些事件在正常运行过程中由 Airflow 的内部进程自动创建:
任务生命周期状态转换(排队中、运行中、成功、失败)
系统监控事件(心跳超时、外部状态变更)
自动恢复操作(任务重新调度、重试尝试)
资源管理活动
- 命令行界面操作
这些事件捕获通过 Airflow CLI 工具执行的活动:
直接任务执行命令
DAG 管理操作
系统管理和维护任务
自动化脚本执行
常见审计日志场景
为方便分析审计日志,以下是一些常见场景及其对应的查询示例:
“谁触发了这个 DAG?”
SELECT dttm, owner, extra
FROM log
WHERE event = 'trigger_dag_run' AND dag_id = 'example_dag'
ORDER BY dttm DESC;
“这个失败的任务发生了什么?”
SELECT dttm, event, owner, extra
FROM log
WHERE dag_id = 'example_dag' AND task_id = 'example_task'
ORDER BY dttm DESC;
“最近谁修改了变量?”
SELECT dttm, event, owner, extra
FROM log
WHERE event LIKE '%variable%'
ORDER BY dttm DESC LIMIT 20;
事件目录
以下章节提供了 Airflow 审计日志系统跟踪的所有事件的完整参考。了解这些事件类型将有助于解读审计日志并针对特定用例构建有效的查询。
任务实例事件
系统生成的任务事件:
running:任务实例开始执行success:任务实例成功完成failed:任务实例在执行期间失败skipped:任务实例被跳过upstream_failed:任务实例因上游失败而失败up_for_retry:任务实例计划重试up_for_reschedule:任务实例被重新调度queued:任务实例排队等待执行scheduled:任务实例已调度deferred:任务实例已推迟(等待触发器)restarting:任务实例正在重启removed:任务实例已被移除
系统监控事件:
heartbeat timeout:任务实例停止发送心跳并将被终止state mismatch:任务实例状态在外部(Airflow 之外)被更改stuck in queued reschedule:任务实例卡在排队状态并被重新调度stuck in queued tries exceeded:任务实例超过了最大重试次数
用户发起的任务事件:
fail task:用户手动将任务标记为失败skip task:用户手动将任务标记为跳过action_set_failed:用户通过 UI/API 将任务实例设置为失败action_set_success:用户通过 UI/API 将任务实例设置为成功action_set_retry:用户将任务实例设置为重试action_set_skipped:用户将任务实例设置为跳过action_set_running:用户将任务实例设置为运行中action_clear:用户清除了任务实例状态
用户操作事件
DAG 操作:
trigger_dag_run:用户触发了 DAG 运行delete_dag_run:用户删除了一个 DAG 运行patch_dag_run:用户修改了一个 DAG 运行clear_dag_run:用户清除了一个 DAG 运行get_dag_run:用户获取了 DAG 运行信息get_dag_runs_batch:用户批量获取了 DAG 运行post_dag_run:用户创建了一个 DAG 运行patch_dag:用户修改了 DAG 配置get_dag:用户获取了 DAG 信息get_dags:用户批量获取了 DAGdelete_dag:用户删除了一个 DAG
任务实例操作:
post_clear_task_instances:用户清除了任务实例patch_task_instance:用户修改了一个任务实例get_task_instances_batch:用户获取了任务实例信息delete_task_instance:用户删除了一个任务实例get_task_instance:用户获取了单个任务实例信息get_task_instance_tries:用户获取了任务实例重试信息patch_task_instances_batch:用户修改了多个任务实例
变量操作:
delete_variable:用户删除了一个变量patch_variable:用户修改了一个变量post_variable:用户创建了一个变量bulk_variables:用户执行了批量变量操作
连接操作:
delete_connection:用户删除了一个连接post_connection:用户创建了一个连接patch_connection:用户修改了一个连接bulk_connections:用户执行了批量连接操作create_default_connections:用户创建了默认连接
池操作:
get_pool:用户获取了池信息get_pools:用户批量获取了池post_pool:用户创建了一个池patch_pool:用户修改了一个池delete_pool:用户删除了一个池bulk_pools:用户执行了批量池操作
资产操作:
get_asset:用户获取了资产信息get_assets:用户批量获取了资产get_asset_alias:用户获取了资产别名信息get_asset_aliases:用户批量获取了资产别名post_asset_events:用户创建了资产事件get_asset_events:用户获取了资产事件materialize_asset:用户触发了资产物化get_asset_queued_events:用户获取了已排队的资产事件delete_asset_queued_events:用户删除了已排队的资产事件get_dag_asset_queued_events:用户获取了 DAG 资产已排队事件delete_dag_asset_queued_events:用户删除了 DAG 资产已排队事件get_dag_asset_queued_event:用户获取了特定的 DAG 资产已排队事件delete_dag_asset_queued_event:用户删除了特定的 DAG 资产已排队事件
回填操作:
get_backfill:用户获取了回填信息get_backfills:用户批量获取了回填post_backfill:用户创建了一个回填pause_backfill:用户暂停了一个回填unpause_backfill:用户取消暂停了一个回填cancel_backfill:用户取消了一个回填create_backfill_dry_run:用户执行了回填空运行
用户和角色管理:
get_user:用户获取了用户信息get_users:用户批量获取了用户post_user:用户创建了一个用户帐户patch_user:用户修改了一个用户帐户delete_user:用户删除了一个用户帐户get_role:用户获取了角色信息get_roles:用户批量获取了角色post_role:用户创建了一个角色patch_role:用户修改了一个角色delete_role:用户删除了一个角色
CLI 事件
DAG 管理命令:
cli_dags_list:列出系统中所有 DAGcli_dags_show:显示 DAG 信息和结构cli_dags_state:检查 DAG 运行状态cli_dags_next_execution:显示 DAG 的下一次执行时间cli_dags_trigger:从命令行触发 DAG 运行cli_dags_delete:删除 DAG 及其元数据cli_dags_pause:暂停 DAGcli_dags_unpause:取消暂停 DAGcli_dags_backfill:对日期范围执行 DAG 运行回填cli_dags_test:测试 DAG 而不影响数据库
任务管理命令:
cli_tasks_list:列出特定 DAG 的任务cli_tasks_run:执行特定的任务实例cli_tasks_test:测试任务而不影响数据库cli_tasks_state:检查任务实例状态cli_tasks_failed_deps:显示任务失败的依赖项cli_tasks_render:渲染任务模板cli_tasks_clear:清除任务实例状态
数据库和系统命令:
cli_db_init:初始化 Airflow 数据库cli_db_upgrade:升级数据库架构cli_db_reset:重置数据库(危险操作)cli_db_shell:打开数据库 shellcli_db_check:检查数据库连接和架构cli_db_migrate:迁移数据库架构(旧版命令)cli_migratedb:旧版数据库迁移命令cli_initdb:旧版数据库初始化命令cli_resetdb:旧版数据库重置命令cli_upgradedb:旧版数据库升级命令
用户和安全命令:
cli_users_create:创建新用户帐户cli_users_delete:删除用户帐户cli_users_list:列出系统中所有用户cli_users_add_role:添加角色到用户cli_users_remove_role:从用户移除角色
配置和变量命令:
cli_variables_get:获取变量值cli_variables_set:设置变量值cli_variables_delete:删除变量cli_variables_list:列出所有变量cli_variables_import:从文件导入变量cli_variables_export:导出变量到文件
连接管理命令:
cli_connections_get:获取连接详情cli_connections_add:添加新连接cli_connections_delete:删除连接cli_connections_list:列出所有连接cli_connections_import:从文件导入连接cli_connections_export:导出连接到文件
池管理命令:
cli_pools_get:获取池信息cli_pools_set:创建或更新池cli_pools_delete:删除池cli_pools_list:列出所有池cli_pools_import:从文件导入池cli_pools_export:导出池到文件
服务和进程命令:
cli_webserver:启动 Airflow Web 服务器cli_scheduler:启动 Airflow 调度器cli_worker:启动 Celery workercli_flower:启动 Flower 监控工具cli_triggerer:启动触发器进程cli_standalone:以单机模式启动 Airflowcli_api_server:启动 Airflow API 服务器cli_dag_processor:启动 DAG 处理器服务cli_celery_worker:启动 Celery worker(替代命令)cli_celery_flower:启动 Celery Flower(替代命令)
维护和实用工具命令:
cli_cheat_sheet:显示 CLI 命令参考cli_version:显示 Airflow 版本信息cli_info:显示系统信息cli_config_get_value:获取配置值cli_config_list:列出配置选项cli_plugins:列出已安装插件cli_rotate_fernet_key:轮换 Fernet 加密密钥cli_sync_perm:同步权限cli_shell:启动交互式 Python shellcli_kerberos:启动 Kerberos 票据更新器
测试和开发命令:
cli_test:运行测试cli_render:渲染模板cli_dag_deps:显示 DAG 依赖项cli_task_deps:显示任务依赖项
旧版命令:
cli_run:旧版任务运行命令cli_backfill:旧版回填命令cli_clear:旧版清除命令cli_list_dags:旧版 DAG 列表命令cli_list_tasks:旧版任务列表命令cli_pause:旧版暂停命令cli_unpause:旧版取消暂停命令cli_trigger_dag:旧版 DAG 触发命令
每个 CLI 命令审计日志条目包含:
用户标识:谁执行了该命令
命令详情:带参数的完整命令
执行上下文:工作目录、环境变量
时间戳:执行命令的时间
退出状态:成功或失败指示
自定义事件
Airflow 允许以编程方式创建自定义审计日志条目。
from airflow.models.log import Log
from airflow.utils.session import provide_session
@provide_session
def log_custom_event(session=None):
log_entry = Log(event="custom_event", owner="username", extra="Additional context information")
session.add(log_entry)
session.commit()
审计日志条目剖析
每个审计日志记录都包含结构化信息,提供了所记录事件的完整全貌。了解这些字段对于有效的日志分析至关重要。
字段名称 |
说明与用法 |
|---|---|
|
指示事件发生时间的时间戳(UTC 时区) |
|
操作或事件的描述性名称(例如 |
|
执行者身份:用户操作为用户名,“airflow” 表示系统事件 |
|
受影响 DAG 的标识符(适用时) |
|
受影响任务的标识符(适用时) |
|
用于跟踪执行实例的特定 DAG 运行标识符 |
|
任务重试和重新执行的尝试次数 |
|
动态映射任务的索引 |
|
DAG 运行的逻辑执行日期 |
|
JSON 格式的附加上下文(参数、错误详情等) |
审计日志查询方法
有效的审计日志分析需要了解可用于查询和检索日志数据的各种方法。每种方法都有其优势,并适用于不同的场景。
REST API 示例:
# Get all audit logs
curl -X GET "https://:8080/api/v1/eventLogs"
# Filter by event type
curl -X GET "https://:8080/api/v1/eventLogs?event=trigger_dag_run"
# Filter by DAG
curl -X GET "https://:8080/api/v1/eventLogs?dag_id=example_dag"
# Filter by date range
curl -X GET "https://:8080/api/v1/eventLogs?after=2024-01-01T00:00:00Z&before=2024-12-31T23:59:59Z"
数据库查询示例:
-- Get recent user actions
SELECT dttm, event, owner, dag_id, task_id, extra
FROM log
WHERE owner IS NOT NULL
ORDER BY dttm DESC
LIMIT 100;
-- Get task failure events
SELECT dttm, dag_id, task_id, run_id, extra
FROM log
WHERE event = 'failed'
ORDER BY dttm DESC;
-- Get user actions on specific DAG
SELECT dttm, event, owner, extra
FROM log
WHERE dag_id = 'example_dag' AND owner IS NOT NULL
ORDER BY dttm DESC;
查询事件日志
事件日志(运营日志)通常根据日志记录配置通过不同方法进行访问:
日志文件:
# View scheduler logs
tail -f $AIRFLOW_HOME/logs/scheduler/latest/*.log
# View webserver logs
tail -f $AIRFLOW_HOME/logs/webserver/webserver.log
# View task logs for specific DAG run
cat $AIRFLOW_HOME/logs/dag_id/task_id/2024-01-01T00:00:00+00:00/1.log
任务日志的 REST API:
# Get task instance logs
curl -X GET "https://:8080/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}"
# Get task logs with metadata
curl -X GET "https://:8080/api/v1/dags/example_dag/dagRuns/2024-01-01T00:00:00+00:00/taskInstances/example_task/logs/1?full_content=true"
Python 日志集成:
import logging
from airflow.utils.log.logging_mixin import LoggingMixin
class MyOperator(BaseOperator, LoggingMixin):
def execute(self, context):
# These will appear in event logs
self.log.info("Task started")
self.log.warning("Warning message")
self.log.error("Error occurred")
外部日志系统:
当使用外部日志系统(例如 ELK stack、Splunk、CloudWatch)时
# Example Elasticsearch query
curl -X GET "elasticsearch:9200/airflow-*/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"bool": {
"must": [
{"match": {"dag_id": "example_dag"}},
{"range": {"@timestamp": {"gte": "2024-01-01", "lte": "2024-01-31"}}}
]
}
}
}'
实用查询示例
以下示例演示了针对常见运营和安全场景的审计日志查询的实际应用。这些查询可作为模板,根据特定需求进行调整。
安全调查
-- Find all actions by a specific user in the last 24 hours
SELECT dttm, event, dag_id, task_id, extra
FROM log
WHERE owner = 'suspicious_user'
AND dttm > NOW() - INTERVAL '24 hours'
ORDER BY dttm DESC;
合规报告
-- Get all variable and connection changes for audit report
SELECT dttm, event, owner, extra
FROM log
WHERE event IN ('post_variable', 'patch_variable', 'delete_variable',
'post_connection', 'patch_connection', 'delete_connection')
AND dttm BETWEEN '2024-01-01' AND '2024-01-31'
ORDER BY dttm;
故障排除 DAG 问题
-- See all events for a problematic DAG run
SELECT dttm, event, task_id, owner, extra
FROM log
WHERE dag_id = 'example_dag'
AND run_id = '2024-01-15T10:00:00+00:00'
ORDER BY dttm;