常见问题¶
调度 / DAG 文件解析¶
为什么任务没有被调度?¶
任务可能未被调度的原因有很多。以下是一些常见原因:
您的脚本是否“可编译”,Airflow 引擎能否解析它并找到您的 DAG 对象?要测试这一点,您可以运行
airflow dags list
并确认您的 DAG 出现在列表中。您还可以运行airflow dags show foo_dag_id
并确认您的任务按预期以 graphviz 格式显示。如果您使用 CeleryExecutor,您可能需要确认它在调度器运行的地方和 Worker 运行的地方都有效。包含您 DAG 的文件内容中是否某处包含字符串“airflow”和“DAG”?在搜索 DAG 目录时,Airflow 会忽略不包含“airflow”和“DAG”的文件,以防止 DagBag 解析导入与用户 DAG 位于同一位置的所有 python 文件。
您的
start_date
是否设置正确?对于基于时间的 DAG,直到开始日期之后的第一个调度间隔过去后,任务才会被触发。您的
schedule
参数是否设置正确?默认值是一天 (datetime.timedelta(1)
)。您必须直接为实例化的 DAG 对象指定不同的schedule
,而不是作为default_param
,因为任务实例不会覆盖其父 DAG 的schedule
。您的
start_date
是否超出了您在 UI 中能看到的范围?如果您的start_date
设置在比如 3 个月前,您将无法在 UI 的主视图中看到它,但您应该可以在菜单 -> 浏览 ->任务 实例
中看到它。任务的依赖是否已满足?直接位于任务上游的任务实例需要处于
success
状态。此外,如果您设置了depends_on_past=True
,则前一个任务实例需要成功或被跳过(除非是该任务的第一次运行)。此外,如果wait_for_downstream=True
,请确保您理解其含义 - 紧随 前一个 任务实例下游的 所有 任务都必须成功或被跳过。您可以从任务的任务 实例 详情
页面查看这些属性是如何设置的。您需要的 DagRun 是否已创建且处于活动状态?DagRun 代表整个 DAG 的特定执行,并具有状态(运行中、成功、失败等)。调度器会随着时间的推移创建新的 DagRun,但绝不会回到过去创建新的。调度器仅评估处于
running
状态的 DagRun,以查看它可以触发哪些任务实例。请注意,清除任务实例(从 UI 或 CLI)确实会将 DagRun 的状态设置回运行中。您可以通过单击 DAG 的调度标签来批量查看 DagRun 列表并更改状态。您的 DAG 的
concurrency
参数是否已达到?concurrency
定义了一个 DAG 允许拥有多少个running
任务实例,超过该点后任务将进入队列。您的 DAG 的
max_active_runs
参数是否已达到?max_active_runs
定义一个 DAG 允许有多少个running
并发实例。
您可能还想阅读关于 调度器 的内容,并确保您完全理解调度器周期。
如何提升 DAG 性能?¶
有一些 Airflow 配置可以提高调度容量和频率:
DAG 具有提高效率的配置:
max_active_tasks
: 覆盖 max_active_tasks_per_dag。max_active_runs
: 覆盖 max_active_runs_per_dag。
Operators 或任务也具有提高效率和调度优先级的配置:
max_active_tis_per_dag
: 此参数控制每个任务在所有dag_runs
中的并发运行任务实例数量。pool
: 参见 Pools。priority_weight
: 参见 Priority Weights。queue
: 参见 队列 (Queues)(仅适用于 CeleryExecutor 部署)。
如何减少 DAG 调度延迟 / 任务延迟?¶
Airflow 2.0 开箱即用,具有较低的 DAG 调度延迟(特别是与 Airflow 1.10.x 相比),但是,如果您需要更高的吞吐量,可以启动多个调度器。
如何基于另一个任务的失败触发任务?¶
您可以使用 触发规则 (Trigger Rules) 实现此目的。
如何控制不同 DAG 文件的解析超时?¶
(仅对 Airflow >= 2.3.0 有效)
您可以在 airflow_local_settings.py
中添加一个 get_dagbag_import_timeout
函数,该函数会在解析 DAG 文件之前立即调用。您可以根据 DAG 文件返回不同的超时值。当返回值小于或等于 0 时,表示在 DAG 解析期间没有超时。
def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
"""
This setting allows to dynamically control the DAG file parsing timeout.
It is useful when there are a few DAG files requiring longer parsing times, while others do not.
You can control them separately instead of having one value for all DAG files.
If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
"""
if "slow" in dag_file_path:
return 90
if "no-timeout" in dag_file_path:
return 0
return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")
有关如何配置本地设置的详细信息,请参阅 配置本地设置 (Configuring local settings)。
当 DAG 文件很多(>1000)时,如何加快新文件的解析速度?¶
将 file_parsing_sort_mode 更改为 modified_time
,将 min_file_process_interval 提高到 600
(10 分钟)、6000
(100 分钟)或更高值。
如果文件最近被修改,DAG 解析器将跳过 min_file_process_interval
检查。
当 DAG 从单独的文件导入/创建时,这可能不起作用。例如:导入 dag_loader.py
的 dag_file.py
,其中 DAG 文件的实际逻辑如下所示。在这种情况下,如果 dag_loader.py
被更新但 dag_file.py
未更新,更改将不会反映,直到达到 min_file_process_interval
,因为 DAG 解析器将查找 dag_file.py
文件的修改时间。
from dag_loader import create_dag
globals()[dag.dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
from airflow.sdk import DAG
from airflow.sdk import task
import pendulum
def create_dag(dag_id, schedule, dag_number, default_args):
dag = DAG(
dag_id,
schedule=schedule,
default_args=default_args,
pendulum.datetime(2021, 9, 13, tz="UTC"),
)
with dag:
@task()
def hello_world():
print("Hello World")
print(f"This is DAG: {dag_number}")
hello_world()
return dag
DAG 构建¶
start_date
是怎么回事?¶
start_date
部分是 DagRun 时代之前的遗留,但它在许多方面仍然相关。创建新 DAG 时,您可能想为任务设置一个全局 start_date
。这可以通过在 DAG()
对象中直接声明 start_date
来完成。DAG 的第一个 DagRun 将基于 start_date
之后的第一个完整 data_interval
创建。例如,对于 start_date=datetime(2024, 1, 1)
且 schedule=\"0 0 3 * *"
的 DAG,第一个 DAG 运行将在 2024 年 2 月 3 日午夜触发,其 data_interval_start=datetime(2024, 1, 3)
和 data_interval_end=datetime(2024, 2, 3)
。从那时起,调度器会根据您的 schedule
创建新的 DagRun,并且相应的任务实例会在满足依赖时运行。在向 DAG 添加新任务时,您需要特别注意 start_date
,并可能需要重新激活非活动的 DagRun 以正确载入新任务。
我们不建议使用动态值作为 start_date
,尤其是 datetime.now()
,因为它可能非常令人困惑。任务在周期结束时触发,理论上,@hourly
DAG 永远无法到达“现在”之后的一小时,因为 now()
会不断前进。
以前,我们还建议使用与 DAG 的 schedule
相关的四舍五入的 start_date
。这意味着 @hourly
的时间应为 00:00
分钟:秒,@daily
的作业在午夜,@monthly
的作业在每月的第一天。现在不再需要这样做。Airflow 现在将通过使用 start_date
作为开始查找的时刻,自动对齐 start_date
和 schedule
。
您可以使用任何 Sensor 或 TimeDeltaSensor
来延迟任务在调度间隔内的执行。虽然 schedule
确实允许指定一个 datetime.timedelta
对象,但我们建议改用宏或 cron 表达式,因为它强化了这种四舍五入调度的概念。
当使用 depends_on_past=True
时,特别注意 start_date
非常重要,因为过去的依赖不仅针对为任务指定的 start_date
的特定调度强制执行。除非您计划对新任务运行回填,否则在引入新的 depends_on_past=True
时,及时关注 DagRun 的活动状态也很重要。
同样重要的是要注意,任务的 start_date
在回填中会被忽略。
使用时区¶
创建一个时区感知的 datetime(例如 DAG 的 start_date
)非常简单。只需确保使用 pendulum
提供时区感知的日期即可。不要尝试使用标准库的 时区 (timezone) 对象,因为已知它们存在局限性,并且我们刻意不允许在 DAG 中使用它们。
execution_date
是什么意思?¶
执行日期 (Execution date) 或 execution_date
是 逻辑日期 (logical date) 的历史名称,通常也是 DAG 运行所代表的数据间隔的开始。
Airflow 作为 ETL 需求的解决方案而开发。在 ETL 领域,您通常会汇总数据。因此,如果您想汇总 2016-02-19
的数据,您会在世界时 (UTC) 2016-02-20
午夜进行,这正好是在 2016-02-19
的所有数据可用之后。2016-02-19
和 2016-02-20
的午夜之间的这个间隔称为 数据间隔 (data interval),由于它代表 2016-02-19
日期的数据,这个日期也称为运行的 逻辑日期 (logical date),或者这个 DAG 运行执行所对应的日期,因此称为 执行日期 (execution date)。
为了向后兼容,datetime 值 execution_date
在 Jinja 模板字段和 Airflow 的 Python API 中仍然作为具有各种格式的 模板变量 (Template variables)。它也包含在传递给 Operator 的 execute 函数的上下文字典中。
class MyOperator(BaseOperator):
def execute(self, context):
logging.info(context["execution_date"])
但是,如果可能,您应该始终使用 data_interval_start
或 data_interval_end
,因为这些名称在语义上更准确,更不容易引起误解。
请注意,ds
(data_interval_start
的 YYYY-MM-DD 形式)指的是 日期 **字符串**,而不是 日期 **开始**,这可能会让一些人感到困惑。
提示
有关 逻辑日期 (logical date)
的更多信息,请参阅 数据间隔 (Data Interval) 和 运行 DAG (Running dags)。
如何动态创建 DAG?¶
Airflow 会在您的 DAGS_FOLDER
中查找在其全局命名空间中包含 DAG
对象的模块,并将找到的对象添加到 DagBag
中。了解这一点后,我们只需要一种在全局命名空间中动态分配变量的方法即可。这在 Python 中很容易做到,可以使用标准库的 globals()
函数,它就像一个简单的字典。
def create_dag(dag_id):
"""
A function returning a DAG object.
"""
return DAG(dag_id)
for i in range(10):
dag_id = f"foo_{i}"
globals()[dag_id] = DAG(dag_id)
# or better, call a function that returns a DAG object!
other_dag_id = f"bar_{i}"
globals()[other_dag_id] = create_dag(other_dag_id)
即使 Airflow 支持每个 Python 文件定义多个 DAG(动态生成或其他方式),但并不推荐这样做,因为 Airflow 希望在故障和部署方面实现更好的 DAG 隔离,而将多个 DAG 放在同一个文件中则违背了这一原则。
是否允许顶级 Python 代码?¶
虽然不建议在定义 Airflow 构造之外编写任何代码,但 Airflow 确实支持任何任意 Python 代码,只要它不会破坏 DAG 文件处理器或使文件处理时间超过 dagbag_import_timeout 值。
一个常见的例子是构建动态 DAG 时违反时间限制,这通常需要从数据库等其他服务查询数据。与此同时,被请求的服务正被 DAG 文件处理器的请求淹没,请求处理文件所需的数据。这些意外的交互可能会导致服务性能下降,并最终导致 DAG 文件处理失败。
有关详细信息,请参阅 DAG 编写最佳实践 (DAG writing best practices)。
宏能在另一个 Jinja 模板中解析吗?¶
无法在另一个 Jinja 模板中渲染 宏 (Macros) 或任何 Jinja 模板。这通常在 user_defined_macros
中尝试。
dag = DAG(
# ...
user_defined_macros={"my_custom_macro": "day={{ ds }}"}
)
bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag)
对于 data_interval_start
为 2020-01-01 00:00:00 的 DAG 运行,这将输出“day={{ ds }}”,而不是“day=2020-01-01”。
bo = BashOperator(task_id="my_task", bash_command="echo day={{ ds }}", dag=dag)
通过在 template_field 中直接使用 ds 宏,渲染的值结果为“day=2020-01-01”。
为什么 next_ds
或 prev_ds
可能不包含期望的值?¶
调度 DAG 时,
next_ds
next_ds_nodash
prev_ds
prev_ds_nodash
是使用logical_date
和 DAG 的调度(如果适用)计算得出的。如果您将schedule
设置为None
或@once
,则next_ds
、next_ds_nodash
、prev_ds
、prev_ds_nodash
的值将设置为None
。手动触发 DAG 时,调度将被忽略,并且
prev_ds == next_ds == ds
。
任务执行交互¶
TemplateNotFound
是什么意思?¶
TemplateNotFound
错误通常是由于将路径传递给触发 Jinja 模板化的 Operator 时,与用户期望不一致造成的。一个常见情况是 BashOperator
。
另一个经常被忽略的事实是,文件是相对于管道文件所在的位置解析的。您可以将其他目录添加到 DAG 对象的 template_searchpath
中,以允许使用其他非相对位置。
如何基于另一个任务的失败触发任务?¶
对于通过依赖关系关联的任务,如果任务执行依赖于其所有上游任务的失败,您可以将 trigger_rule
设置为 TriggerRule.ALL_FAILED
;如果只依赖于其中一个上游任务的失败,则设置为 TriggerRule.ONE_FAILED
。
import pendulum
from airflow.sdk import dag, task
from airflow.exceptions import AirflowException
from airflow.utils.trigger_rule import TriggerRule
@task()
def a_func():
raise AirflowException
@task(
trigger_rule=TriggerRule.ALL_FAILED,
)
def b_func():
pass
@dag(schedule="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"))
def my_dag():
a = a_func()
b = b_func()
a >> b
dag = my_dag()
有关更多信息,请参阅 触发规则 (Trigger Rules)。
如果任务之间没有依赖关系,您将需要构建一个自定义 Operator。
Airflow UI¶
为什么我的任务失败了,但在 UI 中没有日志?¶
日志通常在任务达到终端状态时提供。有时,任务的正常生命周期会被中断,任务的 Worker 无法写入任务日志。这通常由于以下两个原因之一导致:
任务在排队中卡住后失败(Airflow 2.6.0+)。排队时间超过 scheduler.task_queued_timeout 的任务将被标记为失败,并且在 Airflow UI 中不会有任务日志。
为每个任务设置重试次数会大大降低这些问题影响工作流的可能性。
如何阻止每个 Web 服务器多次发生权限同步?¶
将 airflow.cfg
文件中的 [fab] update_fab_perms
配置值设置为 False
。
为什么暂停 DAG 切换按钮变红了?¶
如果暂停或取消暂停 DAG 因任何原因失败,DAG 切换按钮将恢复到先前的状态并变红。如果您发现此行为,请尝试再次暂停 DAG,或检查控制台或服务器日志以查看问题是否再次发生。
MySQL 和 MySQL 变体数据库¶
“MySQL Server has gone away” 是什么意思?¶
您可能会偶尔遇到消息为“MySQL Server has gone away”的 OperationalError
。这是由于连接池保持连接打开时间过长,并且您获得了一个已过期的旧连接。为了确保连接有效,您可以设置 sql_alchemy_pool_recycle,以确保连接在该秒数后失效并创建新连接。
Airflow 是否支持扩展 ASCII 或 unicode 字符?¶
如果您打算在 Airflow 中使用扩展 ASCII 或 Unicode 字符,则必须向 MySQL 数据库提供正确的连接字符串,因为它们显式定义了字符集。
sql_alchemy_conn = mysql://airflow@localhost:3306/airflow?charset=utf8
您会遇到由 WTForms
模板以及 Airflow 其他模块抛出的 UnicodeDecodeError
,如下所示。
'ascii' codec can't decode byte 0xae in position 506: ordinal not in range(128)
如何修复异常:全局变量 explicit_defaults_for_timestamp
需要开启 (1)?¶
这意味着您的 MySQL 服务器中 explicit_defaults_for_timestamp
已被禁用,您需要通过以下方式启用它:
在
my.cnf
文件中的mysqld
部分下设置explicit_defaults_for_timestamp = 1
。重启 Mysql 服务器。