常见问题解答¶
调度 / DAG 文件解析¶
为什么任务没有被调度?¶
您的任务可能没有被调度的原因有很多。以下是一些常见原因
您的脚本是否“编译”,Airflow 引擎是否可以解析它并找到您的 DAG 对象?要测试这一点,您可以运行
airflow dags list
并确认您的 DAG 出现在列表中。您还可以运行airflow tasks list foo_dag_id --tree
并确认您的任务按预期出现在列表中。如果您使用 CeleryExecutor,您可能需要确认这在调度器运行的地方以及工作节点运行的地方都有效。包含您的 DAG 的文件内容中的某个位置是否包含字符串“airflow”和“DAG”?在搜索 DAG 目录时,Airflow 会忽略不包含“airflow”和“DAG”的文件,以防止 DagBag 解析导入与用户 DAG 同处的所有 Python 文件。
您的
start_date
是否设置正确?对于基于时间的 DAG,只有在开始日期之后的第一个调度间隔过去后,才会触发该任务。您的
schedule
参数是否设置正确?默认值为一天 (datetime.timedelta(1)
)。您必须直接将不同的schedule
指定给您实例化的 DAG 对象,而不是作为default_param
,因为任务实例不会覆盖其父 DAG 的schedule
。您的
start_date
是否超出了您在用户界面中可以看到的位置?如果您将start_date
设置为例如 3 个月前的某个时间,您将无法在用户界面的主视图中看到它,但您应该能够在菜单 -> 浏览 ->任务 实例
中看到它。是否满足任务的依赖关系?任务直接上游的任务实例需要处于
success
状态。此外,如果您设置了depends_on_past=True
,则之前的任务实例需要成功或被跳过(除非它是该任务的首次运行)。此外,如果wait_for_downstream=True
,请确保您理解它的含义 - 前一个任务实例的 所有 直接下游任务都必须成功或被跳过。您可以从任务的任务 实例 详细信息
页面查看如何设置这些属性。您需要的 DagRun 是否已创建且处于活动状态?DagRun 表示整个 DAG 的特定执行,并且具有状态(运行中、成功、失败等)。调度器会在向前移动时创建新的 DagRun,但永远不会及时返回以创建新的 DagRun。调度器仅评估
running
DagRun 以查看它可以触发哪些任务实例。请注意,清除任务实例(从用户界面或命令行界面)确实会将 DagRun 的状态设置回运行中。您可以通过单击 DAG 的调度标签来批量查看 DagRun 列表并更改状态。是否已达到 DAG 的
concurrency
参数?concurrency
定义了一个 DAG 允许拥有的running
任务实例的数量,超过该数量则会排队。是否已达到 DAG 的
max_active_runs
参数?max_active_runs
定义了允许同时存在的 DAG 的running
并发实例的数量。
您可能还需要阅读有关 调度器 的信息,并确保您完全理解调度器的周期。
如何提高 DAG 性能?¶
有一些 Airflow 配置可以实现更大的调度容量和频率
DAG 具有提高效率的配置
max_active_tasks
:覆盖 max_active_tasks_per_dag。max_active_runs
:覆盖 max_active_runs_per_dag。
操作符或任务也具有提高效率和调度优先级的配置
如何减少 DAG 调度延迟 / 任务延迟?¶
Airflow 2.0 开箱即用具有较低的 DAG 调度延迟(特别是与 Airflow 1.10.x 相比),但是,如果您需要更高的吞吐量,您可以 启动多个调度器。
如何控制不同 DAG 文件的 DAG 文件解析超时?¶
(仅适用于 Airflow >= 2.3.0)
您可以在 airflow_local_settings.py
中添加一个 get_dagbag_import_timeout
函数,该函数会在解析 DAG 文件之前被调用。您可以根据 DAG 文件返回不同的超时值。当返回值小于或等于 0 时,表示 DAG 解析期间没有超时。
def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
"""
This setting allows to dynamically control the DAG file parsing timeout.
It is useful when there are a few DAG files requiring longer parsing times, while others do not.
You can control them separately instead of having one value for all DAG files.
If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
"""
if "slow" in dag_file_path:
return 90
if "no-timeout" in dag_file_path:
return 0
return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")
有关如何配置本地设置的详细信息,请参阅 配置本地设置。
当有大量(>1000)DAG 文件时,如何加快新文件的解析速度?¶
(仅适用于 Airflow >= 2.1.1)
将 file_parsing_sort_mode 更改为 modified_time
,将 min_file_process_interval 提高到 600
(10 分钟),6000
(100 分钟)或更高的值。
如果文件最近被修改过,DAG 解析器将跳过 min_file_process_interval
检查。
对于从单独的文件导入/创建 DAG 的情况,这可能不起作用。示例:dag_file.py
导入 dag_loader.py
,而 DAG 文件的实际逻辑如下所示。在这种情况下,如果 dag_loader.py
更新了,但 dag_file.py
没有更新,则更改将不会反映出来,直到达到 min_file_process_interval
,因为 DAG 解析器将查找 dag_file.py
文件的修改时间。
from dag_loader import create_dag
globals()[dag.dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
from airflow import DAG
from airflow.decorators import task
import pendulum
def create_dag(dag_id, schedule, dag_number, default_args):
dag = DAG(
dag_id,
schedule=schedule,
default_args=default_args,
pendulum.datetime(2021, 9, 13, tz="UTC"),
)
with dag:
@task()
def hello_world():
print("Hello World")
print(f"This is DAG: {dag_number}")
hello_world()
return dag
DAG 构建¶
start_date
是怎么回事?¶
start_date
部分是 DagRun 时代之前的遗留产物,但在很多方面仍然相关。创建新的 DAG 时,您可能希望为任务设置全局 start_date
。这可以通过直接在 DAG()
对象中声明 start_date
来完成。DAG 的第一个 DagRun 将根据 start_date
之后第一个完整的 data_interval
创建。例如,对于 start_date=datetime(2024, 1, 1)
和 schedule="0 0 3 * *"
的 DAG,第一个 DAG 运行将在 2024-02-03 午夜触发,data_interval_start=datetime(2024, 1, 3)
和 data_interval_end=datetime(2024, 2, 3)
。从那时起,调度程序根据您的 schedule
创建新的 DagRun,并且当满足依赖关系时,相应的任务实例会运行。当向 DAG 引入新任务时,您需要特别注意 start_date
,并且可能需要重新激活未激活的 DagRun,以使新任务正确上线。
我们建议不要使用动态值作为 start_date
,尤其是 datetime.now()
,因为它可能会非常令人困惑。任务会在周期结束后触发,理论上,@hourly
DAG 永远不会达到现在之后的一个小时,因为 now()
会不断移动。
以前,我们还建议使用与 DAG 的 schedule
相关的四舍五入的 start_date
。这意味着 @hourly
会在 00:00
分钟:秒,@daily
作业在午夜,@monthly
作业在每月的第一天。现在不再需要这样做了。Airflow 现在将通过使用 start_date
作为开始查找的时刻,自动对齐 start_date
和 schedule
。
您可以使用任何传感器或 TimeDeltaSensor
来延迟计划间隔内的任务执行。虽然 schedule
允许指定 datetime.timedelta
对象,但我们建议改用宏或 cron 表达式,因为它强制执行了四舍五入的计划的概念。
当使用 depends_on_past=True
时,特别注意 start_date
非常重要,因为过去的依赖关系不仅仅在为任务指定的 start_date
的特定计划上强制执行。在引入新的 depends_on_past=True
时,还需要及时关注 DagRun 活动状态,除非您计划为新任务运行回填。
还需要注意的是,在回填 CLI 命令的上下文中,任务的 start_date
会被回填的 start_date
命令覆盖。这允许对 depends_on_past=True
的任务进行回填。如果不是这种情况,回填将不会开始。
使用时区¶
创建一个时区感知日期时间(例如,DAG 的 start_date
)非常简单。只需确保使用 pendulum
提供时区感知日期即可。不要尝试使用标准库 时区,因为它们已知存在限制,并且我们故意不允许在 DAG 中使用它们。
execution_date
是什么意思?¶
执行日期 或 execution_date
是 逻辑日期 的历史名称,通常也是 DAG 运行所表示的数据间隔的开始。
Airflow 是作为 ETL 需求的解决方案开发的。在 ETL 世界中,您通常会汇总数据。因此,如果要汇总 2016-02-19
的数据,您将在 2016-02-20
午夜 UTC 执行此操作,这将正好在 2016-02-19
的所有数据可用之后。 2016-02-19
和 2016-02-20
午夜之间的间隔称为 数据间隔,由于它表示 2016-02-19
的数据,因此该日期也称为运行的 逻辑日期,或该 DAG 运行执行的日期,因此称为 执行日期。
为了向后兼容,datetime 值 execution_date
仍然作为 模板变量,在 Jinja 模板字段中具有各种格式,并在 Airflow 的 Python API 中。它也包含在提供给 Operator 的 execute 函数的上下文字典中。
class MyOperator(BaseOperator):
def execute(self, context):
logging.info(context["execution_date"])
但是,如果可能,您应该始终使用 data_interval_start
或 data_interval_end
,因为这些名称在语义上更正确,并且不易引起误解。
请注意,ds
(data_interval_start
的 YYYY-MM-DD 格式)指的是 *date* **字符串**,而不是 *date* **开始**,这可能会让某些人感到困惑。
如何动态创建 DAG?¶
Airflow 在您的 DAGS_FOLDER
中查找在其全局命名空间中包含 DAG
对象的模块,并将它在 DagBag
中找到的对象添加进去。了解这一点后,我们所需要的是一种在全局命名空间中动态分配变量的方法。这在 python 中很容易使用标准库的 globals()
函数完成,该函数的作用类似于一个简单的字典。
def create_dag(dag_id):
"""
A function returning a DAG object.
"""
return DAG(dag_id)
for i in range(10):
dag_id = f"foo_{i}"
globals()[dag_id] = DAG(dag_id)
# or better, call a function that returns a DAG object!
other_dag_id = f"bar_{i}"
globals()[other_dag_id] = create_dag(other_dag_id)
即使 Airflow 支持每个 python 文件中定义多个 DAG,无论是动态生成的还是其他方式,也不建议这样做,因为 Airflow 希望从故障和部署的角度更好地隔离 DAG,而同一个文件中的多个 DAG 与此背道而驰。
是否允许顶层 Python 代码?¶
虽然不建议在定义 Airflow 构造之外编写任何代码,但 Airflow 支持任何任意的 python 代码,只要它不会破坏 DAG 文件处理器或使文件处理时间超过 dagbag_import_timeout 值即可。
一个常见的例子是在构建动态 DAG 时违反时间限制,这通常需要从另一个服务(如数据库)查询数据。与此同时,请求的服务会被 DAG 文件处理器请求处理文件的数据所淹没。这些意外的交互可能会导致服务恶化,并最终导致 DAG 文件处理失败。
有关更多信息,请参阅 DAG 编写最佳实践。
宏是否在另一个 Jinja 模板中解析?¶
无法在另一个 Jinja 模板中渲染宏或任何 Jinja 模板。这通常在 user_defined_macros
中尝试。
dag = DAG(
# ...
user_defined_macros={"my_custom_macro": "day={{ ds }}"}
)
bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag)
对于 data_interval_start
为 2020-01-01 00:00:00 的 DAG 运行,这将回显 “day={{ ds }}” 而不是 “day=2020-01-01”。
bo = BashOperator(task_id="my_task", bash_command="echo day={{ ds }}", dag=dag)
通过直接在 template_field 中使用 ds 宏,渲染后的值将为 “day=2020-01-01”。
为什么 next_ds
或 prev_ds
可能不包含预期值?¶
在调度 DAG 时,
next_ds
、next_ds_nodash
、prev_ds
和prev_ds_nodash
是使用logical_date
和 DAG 的计划(如果适用)计算的。如果您将schedule
设置为None
或@once
,则next_ds
、next_ds_nodash
、prev_ds
和prev_ds_nodash
的值将被设置为None
。手动触发 DAG 时,计划将被忽略,并且
prev_ds == next_ds == ds
。
任务执行交互¶
TemplateNotFound
是什么意思?¶
TemplateNotFound
错误通常是由于在将路径传递给触发 Jinja 模板化的操作符时与用户期望不一致导致的。一个常见的例子是使用 BashOperators。
另一个经常被忽略的事实是,文件是相对于管道文件所在的位置解析的。您可以将其他目录添加到 DAG 对象的 template_searchpath
中,以允许其他非相对位置。
如何根据另一个任务的失败来触发任务?¶
对于通过依赖关系相关的任务,如果任务执行依赖于其所有上游任务的失败,您可以将 trigger_rule
设置为 TriggerRule.ALL_FAILED
,如果仅依赖于其中一个上游任务的失败,则设置为 TriggerRule.ONE_FAILED
。
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.utils.trigger_rule import TriggerRule
@task()
def a_func():
raise AirflowException
@task(
trigger_rule=TriggerRule.ALL_FAILED,
)
def b_func():
pass
@dag(schedule="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"))
def my_dag():
a = a_func()
b = b_func()
a >> b
dag = my_dag()
有关更多信息,请参阅触发规则。
如果任务之间没有依赖关系,则需要构建自定义操作符。
Airflow UI¶
为什么我的任务失败,UI 中没有日志?¶
日志通常在任务达到终端状态时提供。有时,任务的正常生命周期会被中断,并且任务的工作程序无法写入任务的日志。这通常是由于以下两种原因之一造成的:
僵尸任务.
在排队中卡住后失败的任务(Airflow 2.6.0+)。排队时间超过scheduler.task_queued_timeout的任务将被标记为失败,并且 Airflow UI 中将没有任务日志。
为每个任务设置重试可以大大减少这些问题影响工作流程的机会。
如何停止每个 Web 服务器多次发生同步权限?¶
在 airflow.cfg
中将 [fab] update_fab_perms
配置的值设置为 False
。
如何减少 airflow UI 页面加载时间?¶
如果您的 DAG 加载时间过长,您可以将 airflow.cfg
中的 default_dag_run_display_number
配置的值减小。此可配置项控制 UI 中显示的 DAG 运行次数,默认值为 25
。
为什么暂停 DAG 开关变成红色?¶
如果由于任何原因暂停或取消暂停 DAG 失败,DAG 开关将恢复到之前的状态并变成红色。如果您观察到此行为,请尝试再次暂停 DAG,或者检查控制台或服务器日志,以确定问题是否再次发生。
MySQL 和 MySQL 变体数据库¶
“MySQL Server has gone away” 是什么意思?¶
您可能会偶尔遇到消息为 “MySQL Server has gone away” 的 OperationalError
。这是由于连接池保持连接打开时间过长,并且您获得了一个已过期的旧连接。为确保连接有效,您可以设置 sql_alchemy_pool_recycle (已弃用) 以确保连接在指定秒数后失效并创建新连接。
Airflow 是否支持扩展 ASCII 或 Unicode 字符?¶
如果您打算在 Airflow 中使用扩展 ASCII 或 Unicode 字符,则必须为 MySQL 数据库提供正确的连接字符串,因为它们显式定义了字符集。
sql_alchemy_conn = mysql://airflow@localhost:3306/airflow?charset=utf8
您将遇到 WTForms
模板化和其他 Airflow 模块抛出的 UnicodeDecodeError
,如下所示。
'ascii' codec can't decode byte 0xae in position 506: ordinal not in range(128)
如何修复异常:全局变量 explicit_defaults_for_timestamp
需要开启 (1)?¶
这意味着您的 mysql 服务器中禁用了 explicit_defaults_for_timestamp
,您需要通过以下方式启用它:
在您的
my.cnf
文件中的mysqld
部分下设置explicit_defaults_for_timestamp = 1
。重启 Mysql 服务器。
Airflow 是否收集任何遥测数据?¶
Airflow 集成了 Scarf,以在操作期间收集基本的使用情况数据。此数据可帮助 Airflow 维护人员更好地了解 Airflow 的使用方式。从此数据中获得的见解有助于确定补丁、小版本和安全修复的优先级。此外,此信息还支持与开发路线图相关的关键决策。
部署可以通过将[usage_data_collection] enabled选项设置为 False
或 SCARF_ANALYTICS=false
环境变量来选择退出数据收集。个人用户可以通过 Scarf Do Not Track 文档中记录的各种方式轻松选择退出分析。
收集的遥测数据仅限于以下内容:
Airflow 版本
Python 版本
操作系统和机器架构
执行器
元数据数据库类型及其版本