使用命令行界面¶
本文档旨在概述使用 CLI 时的所有常见任务。
注意
有关 CLI 命令的更多信息,请参阅 命令行界面和环境变量参考
设置 Bash/Zsh 补全¶
当使用 bash (或 zsh
) 作为你的 shell 时,airflow
可以使用 argcomplete 进行自动补全。
对于所有启用 argcomplete 的 Python 应用程序的全局激活,请运行
sudo activate-global-python-argcomplete
对于永久(但非全局)的 airflow 激活,请使用
register-python-argcomplete airflow >> ~/.bashrc
对于仅 airflow 的 argcomplete 的一次性激活,请使用
eval "$(register-python-argcomplete airflow)"
如果你正在使用 zsh
,请将以下内容添加到你的 .zshrc
autoload bashcompinit
bashcompinit
eval "$(register-python-argcomplete airflow)"
创建连接¶
有关使用 CLI 创建连接的信息,请参阅 从 CLI 创建连接
将 DAG 结构导出为图像¶
Airflow 允许你将 DAG 结构打印或保存为图像。这对于记录或共享你的 DAG 结构很有用。你需要安装 Graphviz。
例如,要将 example_complex
DAG 打印到终端
airflow dags show example_complex
这会将渲染的 DAG 结构(类似于 Graph)以 DOT 格式打印到屏幕上。
支持多种文件格式。要使用它们,请添加参数 --save [filename].[format]
。
要将 example_complex
DAG 保存为 PNG 文件
airflow dags show example_complex --save example_complex.png
这将保存以下图像作为文件
支持以下文件格式
bmp
canon
,dot
,gv
,xdot
,xdot1.2
,xdot1.4
cgimage
cmap
eps
exr
fig
gd
,gd2
gif
gtk
ico
imap
,cmapx
imap_np
,cmapx_np
ismap
jp2
jpg
,jpeg
,jpe
json
,json0
,dot_json
,xdot_json
pct
,pict
pic
plain
,plain-ext
png
pov
ps
ps2
psd
sgi
svg
,svgz
tga
tif
,tiff
tk
vml
,vmlz
vrml
wbmp
webp
xlib
x11
默认情况下,Airflow 在 airflow.cfg
文件的 [core]
部分中由 dags_folder
选项指定的目录中查找 DAG。你可以使用 --subdir
参数选择新目录。
显示 DAG 结构¶
有时,你将处理包含复杂依赖关系的 DAG。然后,预览 DAG 以查看其是否正确会有所帮助。
如果你有 macOS,你可以将 iTerm2 与 imgcat 脚本一起使用,以在控制台中显示 DAG 结构。你还需要安装 Graphviz。
其他终端不支持高质量图形的显示。你可以将图像转换为文本形式,但其分辨率将阻止你阅读它。
为此,你应该在 airflow dags show
命令中使用 --imgcat
开关。例如,如果要显示 example_bash_operator
DAG,则可以使用以下命令
airflow dags show example_bash_operator --imgcat
你将看到与下面屏幕截图类似的结果。
格式化命令输出¶
一些 Airflow 命令(如 airflow dags list
或 airflow tasks states-for-dag-run
)支持 --output
标志,该标志允许用户更改命令输出的格式。可能的选项
table
- 将信息呈现为纯文本表
simple
- 将信息呈现为简单表,该表可以由标准 Linux 实用程序解析
json
- 以 json 字符串的形式呈现信息
yaml
- 以有效的 yaml 格式呈现信息
json
和 yaml
格式都使其更容易使用命令行工具(如 jq 或 yq)来操作数据。例如
airflow tasks states-for-dag-run example_complex 2020-11-13T00:00:00+00:00 --output json | jq ".[] | {sd: .start_date, ed: .end_date}"
{
"sd": "2020-11-29T14:53:46.811030+00:00",
"ed": "2020-11-29T14:53:46.974545+00:00"
}
{
"sd": "2020-11-29T14:53:56.926441+00:00",
"ed": "2020-11-29T14:53:57.118781+00:00"
}
{
"sd": "2020-11-29T14:53:56.915802+00:00",
"ed": "2020-11-29T14:53:57.125230+00:00"
}
{
"sd": "2020-11-29T14:53:56.922131+00:00",
"ed": "2020-11-29T14:53:57.129091+00:00"
}
{
"sd": "2020-11-29T14:53:56.931243+00:00",
"ed": "2020-11-29T14:53:57.126306+00:00"
}
从元数据数据库中清除历史记录¶
注意
强烈建议你在运行 db clean
命令之前备份元数据数据库。
db clean
命令的工作原理是从每个表中删除早于提供的 --clean-before-timestamp
的记录。
您可以选择提供要执行删除操作的表列表。如果未提供表列表,则将包含所有表。
您可以使用 --dry-run
选项来打印要清理的主表中行数。
默认情况下,db clean
将以 _airflow_deleted__<table>__<timestamp>
形式的表存档已清除的行。如果您不希望以这种方式保留数据,则可以提供参数 --skip-archive
。
从存档表中导出已清除的记录¶
db export-archived
命令将由 db clean
命令创建的存档表的内容,以指定的格式导出,默认情况下导出到 CSV 文件。导出的文件将包含在 db clean
过程中从主表中清除的记录。
您可以使用 --export-format
选项指定导出格式。默认格式为 csv,目前也是唯一支持的格式。
您还必须使用 --output-path
选项指定要导出数据的路径位置。此位置必须存在。
其他选项包括:--tables
用于指定要导出的表,--drop-archives
用于在导出后删除存档表。
删除存档表¶
如果在 db clean
过程中,您没有使用会删除存档表的 --skip-archive
选项,您仍然可以使用 db drop-archived
命令删除存档表。此操作是不可逆的,建议您在删除存档表之前使用 db export-archived
命令将表备份到磁盘。
您可以使用 --tables
选项指定要删除的表。如果未指定表,则将删除所有存档表。
注意级联删除¶
请注意,某些表定义了带有 ON DELETE CASCADE
的外键关系,因此在一个表中的删除可能会触发其他表中的删除。例如,task_instance
表键到 dag_run
表,因此如果删除 DagRun 记录,则其所有关联的任务实例也将被删除。
DAG 运行的特殊处理¶
通常,Airflow 通过查找最新的 DagRun 来确定接下来运行哪个 DagRun。 如果您删除所有 DAG 运行,Airflow 可能会调度一个已经完成的旧 DAG 运行,例如,如果您设置了 catchup=True
。 因此,db clean
命令将保留最新的非手动触发的 DAG 运行,以保持调度的连续性。
可回填 DAG 的注意事项¶
并非所有 DAG 都设计用于 Airflow 的回填命令。但对于那些设计用于此命令的 DAG,需要特别注意。如果您删除 DAG 运行,并且您在包含已删除 DAG 运行的日期范围内运行回填,则这些运行将被重新创建并再次运行。因此,如果您有属于此类别的 DAG,您可能需要避免删除 DAG 运行,而只清理其他大型表,例如任务实例和日志等。
升级 Airflow¶
运行 airflow db migrate --help
获取使用详情。
降级 Airflow¶
注意
建议您在运行 db downgrade
或任何其他数据库操作之前备份数据库。
您可以使用 db downgrade
命令降级到特定的 Airflow 版本。或者,您可以提供要降级到的 Alembic 修订 ID。
如果您想预览命令但不执行它们,请使用 --show-sql-only
选项。
选项 --from-revision
和 --from-version
只能与 --show-sql-only
选项一起使用,因为在实际运行迁移时,我们应该始终从当前修订降级。
有关 Airflow 版本和 Alembic 修订之间的映射,请参阅数据库迁移参考。
导出连接¶
您可以使用 CLI 从数据库导出连接。 支持的文件格式为 json
、yaml
和 env
。
您可以将目标文件指定为参数
airflow connections export connections.json
或者,您可以指定 file-format
参数来覆盖文件格式
airflow connections export /tmp/connections --file-format yaml
您也可以为 STDOUT 指定 -
airflow connections export -
JSON 格式包含一个对象,其中键包含连接 ID,值包含连接的定义。 在此格式中,连接被定义为 JSON 对象。 以下是一个示例 JSON 文件。
{
"airflow_db": {
"conn_type": "mysql",
"host": "mysql",
"login": "root",
"password": "plainpassword",
"schema": "airflow",
"port": null,
"extra": null
},
"druid_broker_default": {
"conn_type": "druid",
"host": "druid-broker",
"login": null,
"password": null,
"schema": null,
"port": 8082,
"extra": "{\"endpoint\": \"druid/v2/sql\"}"
}
}
YAML 文件结构与 JSON 相似。 连接 ID 和一个或多个连接的定义的键值对。 在此格式中,连接被定义为 YAML 对象。 以下是一个示例 YAML 文件。
airflow_db:
conn_type: mysql
extra: null
host: mysql
login: root
password: plainpassword
port: null
schema: airflow
druid_broker_default:
conn_type: druid
extra: '{"endpoint": "druid/v2/sql"}'
host: druid-broker
login: null
password: null
port: 8082
schema: null
您还可以以 .env
格式导出连接。 键是连接 ID,值是连接的序列化表示形式,使用 Airflow 的连接 URI 格式或 JSON。 要使用 JSON,请提供选项 --serialization-format=json
,否则将使用 Airflow 连接 URI 格式。 以下是使用两种格式的示例 .env
文件。
URI 示例
airflow_db=mysql://root:plainpassword@mysql/airflow
druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql
JSON 示例输出
airflow_db={"conn_type": "mysql", "login": "root", "password": "plainpassword", "host": "mysql", "schema": "airflow"}
druid_broker_default={"conn_type": "druid", "host": "druid-broker", "port": 8082, "extra": "{\"endpoint\": \"druid/v2/sql\"}"}
测试 DAG 导入错误¶
可以使用 CLI 通过 list-import-errors
子命令检查任何已发现的 DAG 是否存在导入错误。 可以创建一个自动化步骤,如果任何 DAG 无法导入,则通过检查命令输出来使该步骤失败,尤其是在使用 --output
生成标准文件格式时。 例如,当没有错误时的默认输出是 No data found
,而 json 输出是 []
。 然后可以在 CI 或预提交中运行检查,以加快审查过程和测试。
示例命令,如果存在任何错误,则使用 jq 来解析输出,该命令将失败
airflow dags list-import-errors --output=json | jq -e 'select(type=="array" and length == 0)'
该行可以按原样添加到自动化中,或者如果您想打印输出,可以使用 tee
airflow dags list-import-errors | tee import_errors.txt && jq -e 'select(type=="array" and length == 0)' import_errors.txt
Jenkins 管道中的示例
stage('All DAGs are loadable') {
steps {
sh 'airflow dags list-import-errors | tee import_errors.txt && jq -e \'select(type=="array" and length == 0)\' import_errors.txt'
}
}
注意
为了使此操作准确无误,您必须确保 Airflow 不会将任何额外的文本记录到 stdout。 例如,您可能需要修复任何弃用警告,将 2>/dev/null
添加到命令中,或者如果您有一个在加载时生成日志的插件,则在 Airflow 配置中设置 lazy_load_plugins = True
。