BashOperator

使用 BashOperatorBash shell 中执行命令。要执行的 Bash 命令或脚本由以下因素确定:

  1. 使用 BashOperator 时的 bash_command 参数,或

  2. 如果使用 TaskFlow 装饰器 @task.bash,则由装饰的可调用对象返回的非空字符串值。

提示

建议使用 @task.bash 装饰器而不是传统的 BashOperator 来执行 Bash 命令。

airflow/example_dags/example_bash_decorator.py[源代码]

@task.bash
def run_after_loop() -> str:
    return "echo https://airflow.org.cn/"

run_this = run_after_loop()

airflow/example_dags/example_bash_operator.py[源代码]

run_this = BashOperator(
    task_id="run_after_loop",
    bash_command="echo https://airflow.org.cn/",
)

模板

您可以使用 Jinja 模板 来参数化 Bash 命令。

airflow/example_dags/example_bash_decorator.py[源代码]

@task.bash
def also_run_this() -> str:
    return 'echo "ti_key={{ task_instance_key_str }}"'

also_this = also_run_this()

airflow/example_dags/example_bash_operator.py[源代码]

also_run_this = BashOperator(
    task_id="also_run_this",
    bash_command='echo "ti_key={{ task_instance_key_str }}"',
)

使用 @task.bash TaskFlow 装饰器,您可以返回格式化字符串,并利用所有 执行上下文变量直接访问装饰的任务的优势。

airflow/example_dags/example_bash_decorator.py[源代码]

@task.bash
def also_run_this_again(task_instance_key_str) -> str:
    return f'echo "ti_key={task_instance_key_str}"'

also_this_again = also_run_this_again()

建议您利用此方法,因为它非常适合整体 TaskFlow 范例。

注意

当在 Bash 命令中使用 Jinja 模板时,应注意“用户”输入,因为不会执行 Bash 命令的转义和清理。

这主要适用于使用 dag_run.conf,因为用户可以在 Web UI 中提交该参数。大多数默认模板变量没有风险。

例如,不要执行以下操作

@task.bash
def bash_task() -> str:
    return 'echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"'


# Or directly accessing `dag_run.conf`
@task.bash
def bash_task(dag_run) -> str:
    message = dag_run.conf["message"] if dag_run.conf else ""
    return f'echo "here is the message: {message}"'
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"',
)

相反,您应该通过 env kwarg 传递此参数,并在 Bash 命令内部使用双引号。

@task.bash(env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'})
def bash_task() -> str:
    return "echo \"here is the message: '$message'\""
bash_task = BashOperator(
    task_id="bash_task",
    bash_command="echo \"here is the message: '$message'\"",
    env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'},
)

跳过

通常,非零退出代码会产生 AirflowException,从而导致任务失败。如果希望任务以 skipped 状态结束,则可以使用代码 99 退出(或者,如果您传递 skip_on_exit_code,则可以使用另一个退出代码)。

airflow/example_dags/example_bash_decorator.py[源代码]

@task.bash
def this_will_skip() -> str:
    return 'echo "hello world"; exit 99;'

this_skips = this_will_skip()

airflow/example_dags/example_bash_operator.py[源代码]

this_will_skip = BashOperator(
    task_id="this_will_skip",
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)

输出处理器

output_processor 参数允许您指定一个 lambda 函数,该函数在将 bash 脚本的输出作为 XCom 推送之前对其进行处理。此功能对于在 BashOperator 中直接操作脚本的输出特别有用,而无需额外的 operator 或任务。

例如,考虑一个 bash 脚本的输出是 JSON 字符串的场景。使用 output_processor,您可以将此字符串转换为 JSON 对象,然后再将其存储在 XCom 中。这简化了工作流程,并确保下游任务以所需的格式接收处理后的数据。

以下是如何将 result_processor 与 BashOperator 一起使用

@task.bash(output_processor=lambda output: json.loads(output))
def bash_task() -> str:
    return """
        jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
        example.json
    """
bash_task = BashOperator(
    task_id="filter_today_changes",
    bash_command="""
        jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
        example.json
    """,
    output_processor=lambda output: json.loads(output),
)

从文件执行命令

BashOperator@task.bash TaskFlow 装饰器都使您能够执行存储在文件中的 Bash 命令。这些文件必须具有 .sh.bash 扩展名。

使用 Jinja 模板

您可以执行包含 Jinja 模板的 bash 脚本。执行此操作时,Airflow 会加载您的文件内容,呈现模板,并将呈现的脚本写入临时文件。默认情况下,该文件位于临时目录中(在 /tmp 下)。您可以使用 cwd 参数更改此位置。

注意

Airflow 必须具有对 /tmpcwd 目录的写入权限,才能将临时文件写入磁盘。

要执行 bash 脚本,请将其放置在相对于包含 DAG 文件的目录的位置。因此,如果您的 DAG 文件位于 /usr/local/airflow/dags/test_dag.py 中,则可以将您的 test.sh 文件移动到 /usr/local/airflow/dags/ 下的任何位置(例如:/usr/local/airflow/dags/scripts/test.sh),并将相对路径传递给 bash_command,如下所示

@task.bash
def bash_example():
    # "scripts" folder is under "/usr/local/airflow/dags"
    return "scripts/test.sh"
t2 = BashOperator(
    task_id="bash_example",
    # "scripts" folder is under "/usr/local/airflow/dags"
    bash_command="scripts/test.sh",
)

出于许多原因,为 Bash 脚本创建单独的文件夹可能是可取的,例如分隔脚本的逻辑和管道代码,允许在以不同语言编写的文件中正确突出显示代码,以及在构建管道时提供一般的灵活性。

也可以将您的 template_searchpath 定义为指向 DAG 构造函数调用中的任何文件夹位置。

@dag(..., template_searchpath="/opt/scripts")
def example_bash_dag():
    @task.bash
    def bash_example():
        return "test.sh "
with DAG("example_bash_dag", ..., template_searchpath="/opt/scripts"):
    t2 = BashOperator(
        task_id="bash_example",
        bash_command="test.sh ",
    )

不使用 Jinja 模板

如果您的脚本不包含任何 Jinja 模板,请在脚本名称后添加一个空格来禁用 Airflow 的呈现。

@task.bash
def run_command_from_script() -> str:
    return "$AIRFLOW_HOME/scripts/example.sh "


run_script = run_command_from_script()
run_script = BashOperator(
    task_id="run_command_from_script",
    bash_command="$AIRFLOW_HOME/scripts/example.sh ",
)

未找到 Jinja 模板

如果在尝试执行 Bash 脚本时遇到“未找到模板”异常,请在脚本名称后添加一个空格。这是因为 Airflow 尝试对其应用 Jinja 模板,这将失败。

@task.bash
def bash_example():
    # This fails with 'Jinja template not found' error
    # return "/home/batcher/test.sh",
    # This works (has a space after)
    return "/home/batcher/test.sh "
BashOperator(
    task_id="bash_example",
    # This fails with 'Jinja template not found' error
    # bash_command="/home/batcher/test.sh",
    # This works (has a space after)
    bash_command="/home/batcher/test.sh ",
)

但是,如果您想在 Bash 脚本中使用模板,请不要添加空格,而是查看使用 Jinja 模板的 bash 脚本部分。

使用 Python 增强 Bash

@task.bash TaskFlow 装饰器允许您在任务中将 Bash 和 Python 组合成强大的组合。

@task.bash 任务中使用 Python 条件语句、其他函数调用等可以帮助定义、增强甚至构建要执行的 Bash 命令。

例如,使用条件逻辑来确定任务行为

airflow/example_dags/example_bash_decorator.py[源代码]

@task.bash
def sleep_in(day: str) -> str:
    if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
        return f"sleep {60 * 60}"
    else:
        raise AirflowSkipException("No sleeping in today!")

sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")

或调用函数以帮助构建 Bash 命令

airflow/example_dags/example_bash_decorator.py[源代码]

def _get_files_in_cwd() -> list[str]:
    from pathlib import Path

    dir_contents = Path.cwd().glob("airflow/example_dags/*.py")
    files = [str(elem) for elem in dir_contents if elem.is_file()]

    return files

@task.bash
def get_file_stats() -> str:
    from shlex import join

    files = _get_files_in_cwd()
    cmd = join(["stat", *files])

    return cmd

get_file_stats()

这种类型的预执行增强有无数的可能性。

BashSensor

使用 BashSensor 来使用任意命令进行感知。命令成功时应返回 0,否则返回任何其他值。

airflow/example_dags/example_sensors.py[源代码]

t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0")

t4 = BashSensor(task_id="Sensor_fails_after_3_seconds", timeout=3, soft_fail=True, bash_command="exit 1")

此条目是否有帮助?