airflow.providers.standard.operators.bash

BashOperator

执行 Bash 脚本、命令或一组命令。

模块内容

class airflow.providers.standard.operators.bash.BashOperator(*, bash_command, env=None, append_env=False, output_encoding='utf-8', skip_on_exit_code=99, cwd=None, output_processor=lambda result: ..., **kwargs)[源码]

基类: airflow.models.baseoperator.BaseOperator

执行 Bash 脚本、命令或一组命令。

参阅

有关如何使用此 operator 的更多信息,请参阅指南: BashOperator

如果 BaseOperator.do_xcom_push 为 True,则 Bash 命令完成时,写入标准输出的最后一行也将被推送到 XCom 中

参数
  • bash_command (str | airflow.utils.types.ArgNotSet) – 要执行的命令、一组命令或 Bash 脚本的引用(必须是‘.sh’或‘.bash’)。(模板化)

  • env (dict[str, str] | None) – 如果 env 不是 None,它必须是一个 dict,定义新进程的环境变量;这些变量将取代默认行为(继承当前进程环境)。(模板化)

  • append_env (bool) – 如果为 False(默认),则使用 env 参数中传递的环境变量,不继承当前进程环境。如果为 True,则继承当前进程的环境变量,然后用户传递的环境变量将更新现有继承变量,或者新变量会添加到其中。

  • output_encoding (str) – Bash 命令的输出编码。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果任务以该退出码退出,则将任务留在 skipped 状态(默认值:99)。如果设置为 None,则任何非零退出码都将被视为失败。

  • cwd (str | None) – 执行命令的工作目录(模板化)。如果为 None(默认),则命令在临时目录中运行。要使用当前 DAG 文件夹作为工作目录,您可以设置模板 {{ task.dag.folder }}。当 bash_command 是‘.sh’或‘.bash’文件时,Airflow 必须对工作目录有写入权限。脚本(Jinja 模板)将渲染到此目录中的一个新的临时文件中。

  • output_processor (Callable[[str], Any]) – 用于进一步处理 Bash 脚本输出的函数(默认是 lambda output: output)。

Airflow 将评估 Bash 命令的退出码。通常,非零退出码将导致任务失败,零退出码将导致任务成功。退出码 99(或在 skip_on_exit_code 中设置的其他退出码)将抛出 airflow.exceptions.AirflowSkipException,这将使任务处于 skipped 状态。通过设置 skip_on_exit_code=None,可以将所有非零退出码视为失败。

退出码

行为

0

0

skip_on_exit_code (默认: 99)

抛出 airflow.exceptions.AirflowSkipException

其他

抛出 airflow.exceptions.AirflowException

注意

除非整个 shell 以非零退出码退出,否则 Airflow 将不会识别非零退出码。如果非零退出码来自子命令,这可能是一个问题。解决此问题的最简单方法是在命令前加上 set -e;

bash_command = "set -e; python3 script.py '{{ data_interval_end }}'"

注意

要简单地执行 .sh.bash 脚本(不包含任何 Jinja 模板),请在脚本名称 bash_command 参数后添加一个空格——例如 bash_command="my_script.sh "。这是因为当文件以 .sh.bash 结尾时,Airflow 会尝试加载此文件并将其作为 Jinja 模板处理。

如果您的脚本中有 Jinja 模板,请不要在末尾添加空格。并在 DAG 的 template_searchpath 中添加脚本所在的目录。如果您指定了 cwd,则 Airflow 必须对此目录有写入权限。脚本(Jinja 模板)将渲染到此目录中的一个新的临时文件中。

警告

bash_command 中使用“用户”输入或 Jinja 模板时应小心,因为此 Bash operator 不会对命令执行任何转义或清理操作。

这主要适用于使用“dag_run” conf,因为这些可以通过 Web UI 中的用户提交。大多数默认模板变量没有风险。

例如,不要这样做:

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
)

相反,您应该通过 env kwarg 传递它,并在 bash_command 内部使用双引号,如下所示:

bash_task = BashOperator(
    task_id="bash_task",
    bash_command="echo \"here is the message: '$message'\"",
    env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'},
)

版本 2.10.0 中新增: output_processor 参数。

template_fields: collections.abc.Sequence[str] = ('bash_command', 'env', 'cwd')[源码]
template_fields_renderers[源码]
template_ext: collections.abc.Sequence[str] = ('.sh', '.bash')[源码]
ui_color = '#f0ede4'[源码]
bash_command[源码]
env = None[源码]
output_encoding = 'utf-8'[源码]
skip_on_exit_code = 99[源码]
cwd = None[源码]
append_env = False[源码]
output_processor[源码]
property subprocess_hook[源码]

返回用于运行 Bash 命令的 Hook。

get_env(context)[源码]

构建要暴露给 Bash 命令的环境变量集。

execute(context)[源码]

创建 operator 时派生。

Context 与渲染 Jinja 模板时使用的字典相同。

有关更多 context,请参阅 get_template_context。

on_kill()[源码]

重写此方法以在任务实例被终止时清理子进程。

在 operator 内部使用 threading、subprocess 或 multiprocessing 模块时,需要进行清理,否则会留下僵尸进程。

此条目有用吗?