操作符

从概念上来说,操作符是预定义的 任务 的模板,您可以在 DAG 中以声明方式定义它

with DAG("my-dag") as dag:
    ping = HttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="[email protected]", subject="Update complete")

    ping >> email

Airflow 拥有非常广泛的操作符集,其中一些内置于核心或预安装的提供程序中。核心中的部分常用操作符包括

  • BashOperator - 执行 bash 命令

  • PythonOperator - 调用任意 Python 函数

  • EmailOperator - 发送电子邮件

  • 使用 @task 装饰器执行任意 Python 函数。它不支持呈现作为参数传递的 Jinja 模板。

注意

@task 装饰器推荐用于执行 Python 可调用对象,其参数中没有模板呈现,而不是经典的 PythonOperator

有关所有核心操作符的列表,请参阅:核心操作符和钩子参考

如果所需的操作符未随 Airflow 默认安装,你可能会在我们的庞大社区 提供程序包 中找到它。此处一些流行的操作符包括

但还有更多 - 你可以在我们的 提供程序包 文档中查看所有社区管理的操作符、钩子、传感器和传输的完整列表。

注意

在 Airflow 的代码中,我们经常混合 任务 和操作符的概念,它们基本上可以互换。但是,当我们谈论任务时,我们指的是 DAG 的通用“执行单元”;当我们谈论操作符时,我们指的是一个可重用的、预制的任务模板,其逻辑已为你完成,只需一些参数即可。

Jinja 模板

Airflow 利用了 Jinja 模板 的强大功能,这与 结合使用时,可以成为一个强大的工具。

例如,假设您想使用 BashOperator 将数据间隔的开始作为环境变量传递给 Bash 脚本

# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

此处,{{ ds }} 是一个模板变量,并且由于 BashOperatorenv 参数使用 Jinja 进行模板化,因此数据间隔的开始日期将作为名为 DATA_INTERVAL_START 的环境变量在您的 Bash 脚本中可用。

您可以对文档中标记为“templated”的每个参数使用 Jinja 模板。模板替换发生在调用操作符的 pre_execute 函数之前。

您还可以对嵌套字段使用 Jinja 模板,只要这些嵌套字段在其所属的结构中标记为模板即可:在 template_fields 属性中注册的字段将提交给模板替换,例如以下示例中的 path 字段

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
    dag=dag,
)

注意

template_fields 属性是一个类变量,并且保证为 Sequence[str] 类型(即字符串的列表或元组)。

只要所有中间字段都标记为模板字段,也可以替换深度嵌套字段

class MyDataTransformer:
    template_fields: Sequence[str] = ("reader",)

    def __init__(self, my_reader):
        self.reader = my_reader

    # [additional code here...]


class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
    dag=dag,
)

在创建 DAG 时,您可以将自定义选项传递给 Jinja Environment。一种常见的用法是避免 Jinja 从模板字符串中删除尾随换行符

my_dag = DAG(
    dag_id="my-dag",
    jinja_environment_kwargs={
        "keep_trailing_newline": True,
        # some other jinja2 Environment options here
    },
)

请参阅 Jinja 文档 以查找所有可用的选项。

某些操作符还会将以特定后缀结尾的字符串(在 template_ext 中定义)视为呈现字段时的文件引用。这对于直接从文件加载脚本或查询(而不是将它们包含到 DAG 代码中)很有用。

例如,考虑一个运行多行 bash 脚本的 BashOperator,它将加载 script.sh 中的文件,并将其内容用作 bash_command 的值

run_script = BashOperator(
    task_id="run_script",
    bash_command="script.sh",
)

默认情况下,以这种方式提供的路径应相对于 DAG 的文件夹提供(因为这是默认的 Jinja 模板搜索路径),但可以通过在 DAG 上设置 template_searchpath 参数来添加其他路径。

在某些情况下,您可能希望从模板化中排除一个字符串并直接使用它。考虑以下任务

print_script = BashOperator(
    task_id="print_script",
    bash_command="cat script.sh",
)

这将失败,并显示 TemplateNotFound: cat script.sh,因为 Airflow 会将该字符串视为文件路径,而不是命令。我们可以通过将 literal() 包裹该值来防止 Airflow 将此值视为对文件的引用。此方法禁用宏和文件的呈现,并且可以应用于选定的嵌套字段,同时为其余内容保留默认模板化规则。

from airflow.utils.template import literal


fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command=literal("cat script.sh"),
)

2.8 版中的新增功能: 添加了 literal()

或者,如果您想阻止 Airflow 将值视为对文件的引用,则可以覆盖 template_ext

fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()

将字段呈现为原生 Python 对象

默认情况下,所有 template_fields 都呈现为字符串。

示例,假设 extract 任务将一个字典(示例:{"1001": 301.27, "1002": 433.21, "1003": 502.22})推送到 XCom 表中。现在,当运行以下任务时,order_data 参数会传递一个字符串,例如:'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

transform = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
    python_callable=transform,
)

如果你希望渲染后的模板字段返回一个原生 Python 对象(在我们的示例中是 dict),你可以将 render_template_as_native_obj=True 传递给 DAG,如下所示

dag = DAG(
    dag_id="example_template_as_python_object",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    render_template_as_native_obj=True,
)


@task(task_id="extract")
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)


def transform(order_data):
    print(type(order_data))
    total_order_value = 0
    for value in order_data.values():
        total_order_value += value
    return {"total_order_value": total_order_value}


extract_task = extract()

transform_task = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
    python_callable=transform,
)

extract_task >> transform_task

在这种情况下,order_data 参数被传递:{"1001": 301.27, "1002": 433.21, "1003": 502.22}

render_template_as_native_obj 设置为 True 时,Airflow 使用 Jinja 的 NativeEnvironment。使用 NativeEnvironment,渲染模板会生成一个原生 Python 类型。

保留的 params 关键字

在 Apache Airflow 2.2.0 中,params 变量在 DAG 序列化期间使用。请不要在第三方运算符中使用该名称。如果你升级了你的环境并收到了以下错误

AttributeError: 'str' object has no attribute '__module__'

请更改你的运算符中的 params 名称。

此条目有帮助吗?