Airflow 中的审计日志

理解审计日志

审计日志是 Airflow 系统的历史记录,记录了谁在何时执行了什么操作。这些日志对于维护系统完整性、满足合规性要求以及在出现问题时进行取证分析至关重要。

本质上,审计日志回答了三个基本问题:

  • Who(谁):哪个用户或系统组件发起了操作

  • What(什么):执行的具体操作是什么

  • When(何时):事件发生的精确时间戳

审计日志的主要目的包括:

  • 监管合规:满足数据治理和审计追踪的要求

  • 安全监控:检测未经授权的访问或可疑活动

  • 运维排障:了解导致系统问题的事件序列

  • 变更管理:跟踪对关键系统组件的修改

注意

访问审计日志需要 Audit Logs.can_read 权限。拥有此权限的用户可以查看所有审计条目,无论其是否具备特定 DAG 的访问权限。

理解事件日志

事件日志代表了 Airflow 系统的运营心跳。与侧重于问责和合规的审计日志不同,事件日志捕获的是系统行为、应用程序性能和运维指标的技术细节。

事件日志具有几个关键功能:

  • 调试与排障:提供详细的错误消息和堆栈跟踪

  • 性能监控:记录执行时间、资源使用情况和系统指标

  • 运营洞察:跟踪系统健康状况、组件交互和工作流执行情况

  • 开发支持:为代码调试和优化提供详细信息

事件日志通常存储在日志文件或外部日志系统中,并包含以下信息:

  • 任务执行详情和输出

  • 系统错误和警告

  • 性能指标和计时信息

  • 组件启动和关闭事件

  • 资源利用数据

审计日志与事件日志对比

尽管两种日志系统对于系统管理都至关重要,但它们的服务目的和受众各不相同:

特性

审计日志

事件日志

主要目的

问责与合规跟踪

运营监控与系统调试

目标受众

安全团队、审计员、合规官

开发人员、系统管理员、运维团队

内容重点

用户操作与管理变更

系统行为、错误与性能数据

存储位置

结构化数据库表 (log)

日志文件、外部日志系统

保留要求

长期(合规需要几个月到几年),若未从数据库清除

短至中期(几天到几周)

查询模式

“谁清除了任务实例以进行重新执行?”

通常不进行查询(除非使用日志聚合框架)。通常按任务执行读取日志,描述:“为什么这次任务执行失败?”

访问审计日志

Airflow 提供了多种访问审计日志数据的接口,每种接口都适用于不同的用例和技术要求。

Web 用户界面

Airflow Web 界面提供了查看审计日志最便捷的方法。导航至 Browse → Audit Logs 即可访问具备内置过滤、排序和搜索功能的界面。此界面非常适合临时调查和日常监控。

REST API 集成

如需编程访问和系统集成,请使用 /eventLogs REST API 端点。此方法支持自动化监控、与外部安全工具的集成以及自定义报告应用程序。

审计日志的范围

Airflow 的审计日志系统捕获三个不同运营领域的事件:

用户发起的动作

当用户通过任何接口(Web UI、REST API 或命令行工具)与 Airflow 交互时,会产生这些事件。示例包括:

  • 手动触发 DAG 运行和修改

  • 变量、连接和池的配置变更

  • 任务实例状态修改(清除、标记为成功/失败)

  • 管理操作和用户管理活动

系统生成的事件

这些事件在正常运行过程中由 Airflow 的内部进程自动创建:

  • 任务生命周期状态转换(排队中、运行中、成功、失败)

  • 系统监控事件(心跳超时、外部状态变更)

  • 自动恢复操作(任务重新调度、重试尝试)

  • 资源管理活动

命令行界面操作

这些事件捕获通过 Airflow CLI 工具执行的活动:

  • 直接任务执行命令

  • DAG 管理操作

  • 系统管理和维护任务

  • 自动化脚本执行

常见审计日志场景

为方便分析审计日志,以下是一些常见场景及其对应的查询示例:

“谁触发了这个 DAG?”

SELECT dttm, owner, extra
FROM log
WHERE event = 'trigger_dag_run' AND dag_id = 'example_dag'
ORDER BY dttm DESC;

“这个失败的任务发生了什么?”

SELECT dttm, event, owner, extra
FROM log
WHERE dag_id = 'example_dag' AND task_id = 'example_task'
ORDER BY dttm DESC;

“最近谁修改了变量?”

SELECT dttm, event, owner, extra
FROM log
WHERE event LIKE '%variable%'
ORDER BY dttm DESC LIMIT 20;

事件目录

以下章节提供了 Airflow 审计日志系统跟踪的所有事件的完整参考。了解这些事件类型将有助于解读审计日志并针对特定用例构建有效的查询。

任务实例事件

系统生成的任务事件:

  • running:任务实例开始执行

  • success:任务实例成功完成

  • failed:任务实例在执行期间失败

  • skipped:任务实例被跳过

  • upstream_failed:任务实例因上游失败而失败

  • up_for_retry:任务实例计划重试

  • up_for_reschedule:任务实例被重新调度

  • queued:任务实例排队等待执行

  • scheduled:任务实例已调度

  • deferred:任务实例已推迟(等待触发器)

  • restarting:任务实例正在重启

  • removed:任务实例已被移除

系统监控事件:

  • heartbeat timeout:任务实例停止发送心跳并将被终止

  • state mismatch:任务实例状态在外部(Airflow 之外)被更改

  • stuck in queued reschedule:任务实例卡在排队状态并被重新调度

  • stuck in queued tries exceeded:任务实例超过了最大重试次数

用户发起的任务事件:

  • fail task:用户手动将任务标记为失败

  • skip task:用户手动将任务标记为跳过

  • action_set_failed:用户通过 UI/API 将任务实例设置为失败

  • action_set_success:用户通过 UI/API 将任务实例设置为成功

  • action_set_retry:用户将任务实例设置为重试

  • action_set_skipped:用户将任务实例设置为跳过

  • action_set_running:用户将任务实例设置为运行中

  • action_clear:用户清除了任务实例状态

用户操作事件

DAG 操作:

  • trigger_dag_run:用户触发了 DAG 运行

  • delete_dag_run:用户删除了一个 DAG 运行

  • patch_dag_run:用户修改了一个 DAG 运行

  • clear_dag_run:用户清除了一个 DAG 运行

  • get_dag_run:用户获取了 DAG 运行信息

  • get_dag_runs_batch:用户批量获取了 DAG 运行

  • post_dag_run:用户创建了一个 DAG 运行

  • patch_dag:用户修改了 DAG 配置

  • get_dag:用户获取了 DAG 信息

  • get_dags:用户批量获取了 DAG

  • delete_dag:用户删除了一个 DAG

任务实例操作:

  • post_clear_task_instances:用户清除了任务实例

  • patch_task_instance:用户修改了一个任务实例

  • get_task_instances_batch:用户获取了任务实例信息

  • delete_task_instance:用户删除了一个任务实例

  • get_task_instance:用户获取了单个任务实例信息

  • get_task_instance_tries:用户获取了任务实例重试信息

  • patch_task_instances_batch:用户修改了多个任务实例

变量操作:

  • delete_variable:用户删除了一个变量

  • patch_variable:用户修改了一个变量

  • post_variable:用户创建了一个变量

  • bulk_variables:用户执行了批量变量操作

连接操作:

  • delete_connection:用户删除了一个连接

  • post_connection:用户创建了一个连接

  • patch_connection:用户修改了一个连接

  • bulk_connections:用户执行了批量连接操作

  • create_default_connections:用户创建了默认连接

池操作:

  • get_pool:用户获取了池信息

  • get_pools:用户批量获取了池

  • post_pool:用户创建了一个池

  • patch_pool:用户修改了一个池

  • delete_pool:用户删除了一个池

  • bulk_pools:用户执行了批量池操作

资产操作:

  • get_asset:用户获取了资产信息

  • get_assets:用户批量获取了资产

  • get_asset_alias:用户获取了资产别名信息

  • get_asset_aliases:用户批量获取了资产别名

  • post_asset_events:用户创建了资产事件

  • get_asset_events:用户获取了资产事件

  • materialize_asset:用户触发了资产物化

  • get_asset_queued_events:用户获取了已排队的资产事件

  • delete_asset_queued_events:用户删除了已排队的资产事件

  • get_dag_asset_queued_events:用户获取了 DAG 资产已排队事件

  • delete_dag_asset_queued_events:用户删除了 DAG 资产已排队事件

  • get_dag_asset_queued_event:用户获取了特定的 DAG 资产已排队事件

  • delete_dag_asset_queued_event:用户删除了特定的 DAG 资产已排队事件

回填操作:

  • get_backfill:用户获取了回填信息

  • get_backfills:用户批量获取了回填

  • post_backfill:用户创建了一个回填

  • pause_backfill:用户暂停了一个回填

  • unpause_backfill:用户取消暂停了一个回填

  • cancel_backfill:用户取消了一个回填

  • create_backfill_dry_run:用户执行了回填空运行

用户和角色管理:

  • get_user:用户获取了用户信息

  • get_users:用户批量获取了用户

  • post_user:用户创建了一个用户帐户

  • patch_user:用户修改了一个用户帐户

  • delete_user:用户删除了一个用户帐户

  • get_role:用户获取了角色信息

  • get_roles:用户批量获取了角色

  • post_role:用户创建了一个角色

  • patch_role:用户修改了一个角色

  • delete_role:用户删除了一个角色

CLI 事件

DAG 管理命令:

  • cli_dags_list:列出系统中所有 DAG

  • cli_dags_show:显示 DAG 信息和结构

  • cli_dags_state:检查 DAG 运行状态

  • cli_dags_next_execution:显示 DAG 的下一次执行时间

  • cli_dags_trigger:从命令行触发 DAG 运行

  • cli_dags_delete:删除 DAG 及其元数据

  • cli_dags_pause:暂停 DAG

  • cli_dags_unpause:取消暂停 DAG

  • cli_dags_backfill:对日期范围执行 DAG 运行回填

  • cli_dags_test:测试 DAG 而不影响数据库

任务管理命令:

  • cli_tasks_list:列出特定 DAG 的任务

  • cli_tasks_run:执行特定的任务实例

  • cli_tasks_test:测试任务而不影响数据库

  • cli_tasks_state:检查任务实例状态

  • cli_tasks_failed_deps:显示任务失败的依赖项

  • cli_tasks_render:渲染任务模板

  • cli_tasks_clear:清除任务实例状态

数据库和系统命令:

  • cli_db_init:初始化 Airflow 数据库

  • cli_db_upgrade:升级数据库架构

  • cli_db_reset:重置数据库(危险操作)

  • cli_db_shell:打开数据库 shell

  • cli_db_check:检查数据库连接和架构

  • cli_db_migrate:迁移数据库架构(旧版命令)

  • cli_migratedb:旧版数据库迁移命令

  • cli_initdb:旧版数据库初始化命令

  • cli_resetdb:旧版数据库重置命令

  • cli_upgradedb:旧版数据库升级命令

用户和安全命令:

  • cli_users_create:创建新用户帐户

  • cli_users_delete:删除用户帐户

  • cli_users_list:列出系统中所有用户

  • cli_users_add_role:添加角色到用户

  • cli_users_remove_role:从用户移除角色

配置和变量命令:

  • cli_variables_get:获取变量值

  • cli_variables_set:设置变量值

  • cli_variables_delete:删除变量

  • cli_variables_list:列出所有变量

  • cli_variables_import:从文件导入变量

  • cli_variables_export:导出变量到文件

连接管理命令:

  • cli_connections_get:获取连接详情

  • cli_connections_add:添加新连接

  • cli_connections_delete:删除连接

  • cli_connections_list:列出所有连接

  • cli_connections_import:从文件导入连接

  • cli_connections_export:导出连接到文件

池管理命令:

  • cli_pools_get:获取池信息

  • cli_pools_set:创建或更新池

  • cli_pools_delete:删除池

  • cli_pools_list:列出所有池

  • cli_pools_import:从文件导入池

  • cli_pools_export:导出池到文件

服务和进程命令:

  • cli_webserver:启动 Airflow Web 服务器

  • cli_scheduler:启动 Airflow 调度器

  • cli_worker:启动 Celery worker

  • cli_flower:启动 Flower 监控工具

  • cli_triggerer:启动触发器进程

  • cli_standalone:以单机模式启动 Airflow

  • cli_api_server:启动 Airflow API 服务器

  • cli_dag_processor:启动 DAG 处理器服务

  • cli_celery_worker:启动 Celery worker(替代命令)

  • cli_celery_flower:启动 Celery Flower(替代命令)

维护和实用工具命令:

  • cli_cheat_sheet:显示 CLI 命令参考

  • cli_version:显示 Airflow 版本信息

  • cli_info:显示系统信息

  • cli_config_get_value:获取配置值

  • cli_config_list:列出配置选项

  • cli_plugins:列出已安装插件

  • cli_rotate_fernet_key:轮换 Fernet 加密密钥

  • cli_sync_perm:同步权限

  • cli_shell:启动交互式 Python shell

  • cli_kerberos:启动 Kerberos 票据更新器

测试和开发命令:

  • cli_test:运行测试

  • cli_render:渲染模板

  • cli_dag_deps:显示 DAG 依赖项

  • cli_task_deps:显示任务依赖项

旧版命令:

  • cli_run:旧版任务运行命令

  • cli_backfill:旧版回填命令

  • cli_clear:旧版清除命令

  • cli_list_dags:旧版 DAG 列表命令

  • cli_list_tasks:旧版任务列表命令

  • cli_pause:旧版暂停命令

  • cli_unpause:旧版取消暂停命令

  • cli_trigger_dag:旧版 DAG 触发命令

每个 CLI 命令审计日志条目包含:

  • 用户标识:谁执行了该命令

  • 命令详情:带参数的完整命令

  • 执行上下文:工作目录、环境变量

  • 时间戳:执行命令的时间

  • 退出状态:成功或失败指示

自定义事件

Airflow 允许以编程方式创建自定义审计日志条目。

from airflow.models.log import Log
from airflow.utils.session import provide_session


@provide_session
def log_custom_event(session=None):
    log_entry = Log(event="custom_event", owner="username", extra="Additional context information")
    session.add(log_entry)
    session.commit()

审计日志条目剖析

每个审计日志记录都包含结构化信息,提供了所记录事件的完整全貌。了解这些字段对于有效的日志分析至关重要。

字段名称

说明与用法

dttm

指示事件发生时间的时间戳(UTC 时区)

event

操作或事件的描述性名称(例如 trigger_dag_run, failed

owner

执行者身份:用户操作为用户名,“airflow” 表示系统事件

dag_id

受影响 DAG 的标识符(适用时)

task_id

受影响任务的标识符(适用时)

run_id

用于跟踪执行实例的特定 DAG 运行标识符

try_number

任务重试和重新执行的尝试次数

map_index

动态映射任务的索引

logical_date

DAG 运行的逻辑执行日期

extra

JSON 格式的附加上下文(参数、错误详情等)

审计日志查询方法

有效的审计日志分析需要了解可用于查询和检索日志数据的各种方法。每种方法都有其优势,并适用于不同的场景。

REST API 示例:

# Get all audit logs
curl -X GET "https://:8080/api/v1/eventLogs"

# Filter by event type
curl -X GET "https://:8080/api/v1/eventLogs?event=trigger_dag_run"

# Filter by DAG
curl -X GET "https://:8080/api/v1/eventLogs?dag_id=example_dag"

# Filter by date range
curl -X GET "https://:8080/api/v1/eventLogs?after=2024-01-01T00:00:00Z&before=2024-12-31T23:59:59Z"

数据库查询示例:

-- Get recent user actions
SELECT dttm, event, owner, dag_id, task_id, extra
FROM log
WHERE owner IS NOT NULL
ORDER BY dttm DESC
LIMIT 100;

-- Get task failure events
SELECT dttm, dag_id, task_id, run_id, extra
FROM log
WHERE event = 'failed'
ORDER BY dttm DESC;

-- Get user actions on specific DAG
SELECT dttm, event, owner, extra
FROM log
WHERE dag_id = 'example_dag' AND owner IS NOT NULL
ORDER BY dttm DESC;

查询事件日志

事件日志(运营日志)通常根据日志记录配置通过不同方法进行访问:

日志文件:

# View scheduler logs
tail -f $AIRFLOW_HOME/logs/scheduler/latest/*.log

# View webserver logs
tail -f $AIRFLOW_HOME/logs/webserver/webserver.log

# View task logs for specific DAG run
cat $AIRFLOW_HOME/logs/dag_id/task_id/2024-01-01T00:00:00+00:00/1.log

任务日志的 REST API:

# Get task instance logs
curl -X GET "https://:8080/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}"

# Get task logs with metadata
curl -X GET "https://:8080/api/v1/dags/example_dag/dagRuns/2024-01-01T00:00:00+00:00/taskInstances/example_task/logs/1?full_content=true"

Python 日志集成:

import logging
from airflow.utils.log.logging_mixin import LoggingMixin


class MyOperator(BaseOperator, LoggingMixin):
    def execute(self, context):
        # These will appear in event logs
        self.log.info("Task started")
        self.log.warning("Warning message")
        self.log.error("Error occurred")

外部日志系统:

当使用外部日志系统(例如 ELK stack、Splunk、CloudWatch)时

# Example Elasticsearch query
curl -X GET "elasticsearch:9200/airflow-*/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "bool": {
      "must": [
        {"match": {"dag_id": "example_dag"}},
        {"range": {"@timestamp": {"gte": "2024-01-01", "lte": "2024-01-31"}}}
      ]
    }
  }
}'

实用查询示例

以下示例演示了针对常见运营和安全场景的审计日志查询的实际应用。这些查询可作为模板,根据特定需求进行调整。

安全调查

-- Find all actions by a specific user in the last 24 hours
SELECT dttm, event, dag_id, task_id, extra
FROM log
WHERE owner = 'suspicious_user'
  AND dttm > NOW() - INTERVAL '24 hours'
ORDER BY dttm DESC;

合规报告

-- Get all variable and connection changes for audit report
SELECT dttm, event, owner, extra
FROM log
WHERE event IN ('post_variable', 'patch_variable', 'delete_variable',
                'post_connection', 'patch_connection', 'delete_connection')
  AND dttm BETWEEN '2024-01-01' AND '2024-01-31'
ORDER BY dttm;

故障排除 DAG 问题

-- See all events for a problematic DAG run
SELECT dttm, event, task_id, owner, extra
FROM log
WHERE dag_id = 'example_dag'
  AND run_id = '2024-01-15T10:00:00+00:00'
ORDER BY dttm;

此条目是否有帮助?