BashOperator¶
使用 BashOperator
在 Bash shell 中执行命令。要执行的 Bash 命令或脚本由以下方式确定:
使用
BashOperator
时,使用bash_command
参数,或者如果使用 TaskFlow 装饰器
@task.bash
,则从装饰的可调用对象返回一个非空的字符串值。
提示
建议使用 @task.bash
装饰器而不是经典的 BashOperator
来执行 Bash 命令。
@task.bash
def run_after_loop() -> str:
return "echo https://airflow.org.cn/"
run_this = run_after_loop()
run_this = BashOperator(
task_id="run_after_loop",
bash_command="echo https://airflow.org.cn/",
)
模板化¶
您可以使用 Jinja 模板 来参数化 Bash 命令。
@task.bash
def also_run_this() -> str:
return 'echo "ti_key={{ task_instance_key_str }}"'
also_this = also_run_this()
also_run_this = BashOperator(
task_id="also_run_this",
bash_command='echo "ti_key={{ task_instance_key_str }}"',
)
使用 @task.bash
TaskFlow 装饰器允许您返回格式化的字符串,并利用所有 直接访问已装饰任务的执行上下文变量的优势。
@task.bash
def also_run_this_again(task_instance_key_str) -> str:
return f'echo "ti_key={task_instance_key_str}"'
also_this_again = also_run_this_again()
我们鼓励您利用这种方法,因为它很好地融入了整体 TaskFlow 范例。
注意
在 Bash 命令中使用 Jinja 模板时,应注意“用户”输入,因为不会对 Bash 命令执行转义和清理。
这主要适用于使用 dag_run.conf
,因为它可以通过 Web UI 中的用户提交。大多数默认模板变量没有风险。
例如,**不要**这样做
@task.bash
def bash_task() -> str:
return 'echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"'
# Or directly accessing `dag_run.conf`
@task.bash
def bash_task(dag_run) -> str:
message = dag_run.conf["message"] if dag_run.conf else ""
return f'echo "here is the message: {message}"'
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"',
)
相反,您应该通过 env
kwarg 传递此参数,并在 Bash 命令中使用双引号。
@task.bash(env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'})
def bash_task() -> str:
return "echo \"here is the message: '$message'\""
bash_task = BashOperator(
task_id="bash_task",
bash_command="echo \"here is the message: '$message'\"",
env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'},
)
跳过¶
通常,非零退出代码会产生 AirflowException,从而导致任务失败。在希望任务以 skipped
状态结束的情况下,您可以使用代码 99
退出(如果您传递 skip_on_exit_code
,则可以使用其他退出代码)。
@task.bash
def this_will_skip() -> str:
return 'echo "hello world"; exit 99;'
this_skips = this_will_skip()
this_will_skip = BashOperator(
task_id="this_will_skip",
bash_command='echo "hello world"; exit 99;',
dag=dag,
)
输出处理器¶
output_processor
参数允许您指定一个 lambda 函数,该函数在将 Bash 脚本的输出推送到 XCom 之前对其进行处理。此功能对于直接在 BashOperator 中操作脚本的输出非常有用,而无需额外的操作符或任务。
例如,考虑一个场景,其中 bash 脚本的输出是一个 JSON 字符串。使用 output_processor
,您可以将此字符串转换为 JSON 对象,然后再将其存储在 XCom 中。这简化了工作流程,并确保下游任务以所需的格式接收处理后的数据。
以下是如何使用带有 BashOperator 的 result_processor
@task.bash(output_processor=lambda output: json.loads(output))
def bash_task() -> str:
return """
jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
example.json
"""
bash_task = BashOperator(
task_id="filter_today_changes",
bash_command="""
jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
example.json
""",
output_processor=lambda output: json.loads(output),
)
从文件执行命令¶
BashOperator
和 @task.bash
TaskFlow 装饰器都使您能够执行存储在文件中的 Bash 命令。这些文件**必须**具有 .sh
或 .bash
扩展名。
使用 Jinja 模板¶
您可以执行包含 Jinja 模板的 bash 脚本。当您这样做时,Airflow 会加载文件的内容,渲染模板,并将渲染后的脚本写入临时文件。默认情况下,该文件放置在临时目录中(在 /tmp
下)。您可以使用 cwd
参数更改此位置。
注意
Airflow 必须具有对 /tmp
或 cwd
目录的写入权限,才能将临时文件写入磁盘。
要执行 bash 脚本,请将其放置在相对于包含 DAG 文件的目录的位置。因此,如果您的 DAG 文件在 /usr/local/airflow/dags/test_dag.py
中,您可以将 test.sh
文件移动到 /usr/local/airflow/dags/
下的任何位置(例如:/usr/local/airflow/dags/scripts/test.sh
),并将相对路径传递给 bash_command
,如下所示
@task.bash
def bash_example():
# "scripts" folder is under "/usr/local/airflow/dags"
return "scripts/test.sh"
t2 = BashOperator(
task_id="bash_example",
# "scripts" folder is under "/usr/local/airflow/dags"
bash_command="scripts/test.sh",
)
出于许多原因,为 Bash 脚本创建单独的文件夹可能是可取的,例如分离脚本的逻辑和管道代码,允许在以不同语言组成的文件中进行适当的代码突出显示,以及在结构化管道方面的通用灵活性。
也可以将您的 template_searchpath
定义为指向 DAG 构造函数调用中的任何文件夹位置。
@dag(..., template_searchpath="/opt/scripts")
def example_bash_dag():
@task.bash
def bash_example():
return "test.sh "
with DAG("example_bash_dag", ..., template_searchpath="/opt/scripts"):
t2 = BashOperator(
task_id="bash_example",
bash_command="test.sh ",
)
不使用 Jinja 模板¶
如果您的脚本不包含任何 Jinja 模板,请通过在脚本名称后添加一个空格来禁用 Airflow 的渲染。
@task.bash
def run_command_from_script() -> str:
return "$AIRFLOW_HOME/scripts/example.sh "
run_script = run_command_from_script()
run_script = BashOperator(
task_id="run_command_from_script",
bash_command="$AIRFLOW_HOME/scripts/example.sh ",
)
找不到 Jinja 模板¶
如果在尝试执行 Bash 脚本时遇到“找不到模板”异常,请在脚本名称后添加一个空格。这是因为 Airflow 尝试对其应用 Jinja 模板,这将失败。
@task.bash
def bash_example():
# This fails with 'Jinja template not found' error
# return "/home/batcher/test.sh",
# This works (has a space after)
return "/home/batcher/test.sh "
BashOperator(
task_id="bash_example",
# This fails with 'Jinja template not found' error
# bash_command="/home/batcher/test.sh",
# This works (has a space after)
bash_command="/home/batcher/test.sh ",
)
但是,如果您想在 Bash 脚本中使用模板,请不要添加空格,而是检查带有 Jinja 模板的 bash 脚本部分。
使用 Python 丰富 Bash¶
@task.bash
TaskFlow 装饰器允许您将 Bash 和 Python 组合到任务中的强大组合中。
在 @task.bash
任务中使用 Python 条件语句、其他函数调用等可以帮助定义、增强甚至构建要执行的 Bash 命令。
例如,使用条件逻辑来确定任务行为
@task.bash
def sleep_in(day: str) -> str:
if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
return f"sleep {60 * 60}"
else:
raise AirflowSkipException("No sleeping in today!")
sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")
或者调用函数来帮助构建 Bash 命令
def _get_files_in_cwd() -> list[str]:
from pathlib import Path
dir_contents = Path.cwd().glob("airflow/example_dags/*.py")
files = [str(elem) for elem in dir_contents if elem.is_file()]
return files
@task.bash
def get_file_stats() -> str:
from shlex import join
files = _get_files_in_cwd()
cmd = join(["stat", *files])
return cmd
get_file_stats()
这种类型的预执行增强有无数的可能性。