airflow.providers.standard.operators.bash
¶
模块内容¶
类¶
执行 Bash 脚本、命令或一系列命令。 |
- class airflow.providers.standard.operators.bash.BashOperator(*, bash_command, env=None, append_env=False, output_encoding='utf-8', skip_on_exit_code=99, cwd=None, output_processor=lambda result: ..., **kwargs)[源代码]¶
基类:
airflow.models.baseoperator.BaseOperator
执行 Bash 脚本、命令或一系列命令。
另请参阅
有关如何使用此操作符的更多信息,请查看指南: BashOperator
如果 BaseOperator.do_xcom_push 为 True,则当 bash 命令完成时,写入 stdout 的最后一行也会被推送到 XCom
- 参数
bash_command (str | airflow.utils.types.ArgNotSet) – 要执行的命令、一组命令或对 Bash 脚本的引用(必须是 ‘.sh’ 或 ‘.bash’)。(已模板化)
env (dict[str, str] | None) – 如果 env 不是 None,则它必须是一个字典,用于定义新进程的环境变量;这些变量将代替继承当前进程环境,而后者是默认行为。(已模板化)
append_env (bool) – 如果为 False(默认),则使用 env 参数中传递的环境变量,并且不继承当前进程环境。如果为 True,则继承当前传递的环境变量,然后用户传递的环境变量将更新现有的继承的环境变量,或将新变量附加到其中
output_encoding (str) – Bash 命令的输出编码
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果任务以此退出代码退出,则将任务保持在
skipped
状态(默认值:99)。如果设置为None
,则任何非零退出代码都将被视为失败。cwd (str | None) – 在其中执行命令的工作目录(已模板化)。如果为 None(默认),则该命令在临时目录中运行。要将当前 DAG 文件夹用作工作目录,您可以设置模板
{{ dag_run.dag.folder }}
。当 bash_command 是 ‘.sh’ 或 ‘.bash’ 文件时,Airflow 必须具有对工作目录的写入访问权限。脚本将在该目录中渲染(Jinja 模板)到新的临时文件中。output_processor (Callable[[str], Any]) – 用于进一步处理 bash 脚本输出的函数(默认为 lambda output: output)。
Airflow 将评估 Bash 命令的退出代码。通常,非零退出代码将导致任务失败,零将导致任务成功。退出代码
99
(或在skip_on_exit_code
中设置的另一个代码)将抛出airflow.exceptions.AirflowSkipException
,这将使任务保持在skipped
状态。您可以通过设置skip_on_exit_code=None
来将所有非零退出代码视为失败。退出代码
行为
0
成功
skip_on_exit_code(默认值:99)
否则
注意
除非整个 shell 以非零退出代码退出,否则 Airflow 不会识别非零退出代码。如果非零退出来自子命令,则可能会出现问题。解决此问题的最简单方法是在命令前加上
set -e;
bash_command = "set -e; python3 script.py '{{ data_interval_end }}'"
注意
要简单地执行
.sh
或.bash
脚本(没有任何 Jinja 模板),请在脚本名称后添加一个空格bash_command
参数 – 例如bash_command="my_script.sh "
。这是因为当 Airflow 以.sh
或.bash
结尾时,它会尝试加载此文件并将其作为 Jinja 模板进行处理。如果您的脚本中有 Jinja 模板,请不要放置任何空格。并在 DAG 的
template_searchpath
中添加脚本的目录。如果您指定了cwd
,则 Airflow 必须具有对此目录的写入访问权限。脚本将被渲染(Jinja 模板)到此目录中的新临时文件中。警告
在使用“用户”输入或在
bash_command
中使用 Jinja 模板时应小心,因为此 bash 操作符不会对命令执行任何转义或清理。这主要适用于使用 “dag_run” conf,因为可以通过 Web UI 中的用户提交该配置。大多数默认模板变量都没有风险。
例如,**不要**这样做
bash_task = BashOperator( task_id="bash_task", bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"', )
相反,您应该通过
env
kwarg 传递此信息,并在 bash_command 中使用双引号,如下所示bash_task = BashOperator( task_id="bash_task", bash_command="echo \"here is the message: '$message'\"", env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'}, )
2.10.0 版本新增: output_processor 参数。
- template_fields: collections.abc.Sequence[str] = ('bash_command', 'env', 'cwd')[源代码]¶
- template_ext: collections.abc.Sequence[str] = ('.sh', '.bash')[源代码]¶