创建自定义算子
Airflow 允许您创建新算子以满足您或您的团队的需求。这种可扩展性是使 Apache Airflow 强大的众多特性之一。
您可以通过继承公共 SDK 基类 BaseOperator 来创建任何想要的算子。
在派生类中,需要重写两个方法
构造函数 - 定义算子所需的参数。您只需指定特定于您算子的参数。您可以在 Dag 文件中指定
default_args。有关详细信息,请参见 Default args。执行 - 当运行器调用算子时要执行的代码。此方法包含 Airflow 上下文作为参数,可用于读取配置值。
注意
在实现自定义算子时,请勿在 __init__ 方法中进行任何耗时操作。算子将在每个调度周期针对使用它的每个任务实例化一次,进行数据库调用会显著放慢调度并浪费资源。
让我们在新文件 hello_operator.py 中实现一个示例 HelloOperator
from airflow.sdk import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message
注意
为了使导入工作,您应将文件放置在位于 PYTHONPATH 环境变量中的目录。Airflow 默认会将 dags/、plugins/ 和 config/ 目录(位于 Airflow home)添加到 PYTHONPATH 中。例如,在我们的示例中,文件放置在 custom_operator/ 目录下。有关 Python 和 Airflow 如何管理模块的详细信息,请参见 模块管理。
您现在可以按如下方式使用派生的自定义算子
from custom_operator.hello_operator import HelloOperator
with dag:
hello_task = HelloOperator(task_id="sample-task", name="foo_bar")
您也可以继续使用 plugins 文件夹来存放自定义算子。如果您在 plugins 文件夹中有文件 hello_operator.py,可以按如下方式导入该算子
from hello_operator import HelloOperator
如果算子需要与外部服务(API、数据库等)通信,最好使用 Hooks 实现通信层。这样实现的逻辑可以在不同算子中被其他用户复用。这种方法比为每个外部服务使用 CustomServiceBaseOperator 能提供更好的解耦和集成利用率。
另一个考虑因素是临时状态。如果操作需要在内存中保存状态(例如需要在 on_kill 方法中使用的作业 ID 来取消请求),则该状态应保存在算子中,而不是在 Hook 中。这样服务 Hook 可以完全无状态,整个操作的逻辑都位于算子中。
钩子
Hooks 充当在 Dag 中与外部共享资源通信的接口。例如,Dag 中的多个任务可能需要访问同一个 MySQL 数据库。您无需为每个任务创建单独的连接,而是可以从 Hook 中获取连接并使用。Hook 还帮助避免在 Dag 中存储连接的认证参数。请参见 管理连接 了解如何创建和管理连接,以及 Providers 了解如何通过 provider 添加自定义连接类型的详细信息。
让我们扩展之前的示例,从 MySQL 中获取名称
class HelloDBOperator(BaseOperator):
def __init__(self, name: str, mysql_conn_id: str, database: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.mysql_conn_id = mysql_conn_id
self.database = database
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database)
sql = "select name from user"
result = hook.get_first(sql)
message = f"Hello {result['name']}"
print(message)
return message
当算子在 Hook 对象上执行查询时,如果不存在连接,会创建一个新的连接。Hook 会从 Airflow 后端检索用户名、密码等认证参数,并将这些参数传递给 airflow.hooks.base.BaseHook.get_connection()。您应仅在 execute 方法或任何从 execute 调用的方法中创建 Hook。构造函数会在 Airflow 解析 Dag 时频繁调用,在那里实例化 Hook 会导致大量不必要的数据库连接。execute 只会在 Dag 运行期间被调用。
用户界面
Airflow 还允许开发者控制算子在 Dag UI 中的显示方式。覆盖 ui_color 可以更改算子在 UI 中的背景颜色。覆盖 ui_fgcolor 可更改标签颜色。覆盖 custom_operator_name 可以将显示名称更改为除类名之外的其他名称。
class HelloOperator(BaseOperator):
ui_color = "#ff0000"
ui_fgcolor = "#000000"
custom_operator_name = "Howdy"
# ...
模板化
您可以使用 Jinja 模板 为算子参数化。Airflow 在渲染算子时会对 template_fields 中出现的字段名进行模板渲染。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
您可以按如下方式使用模板
with dag:
hello_task = HelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="Earth",
)
在此示例中,Jinja 会查找 name 参数,并将 {{ task_instance.task_id }} 替换为 task_id_1。
参数也可以包含文件名,例如 bash 脚本或 SQL 文件。需要在 template_ext 中添加文件的扩展名。如果 template_field 包含的字符串以 template_ext 中声明的扩展名结尾,Jinja 会读取文件内容并将模板替换为实际值。请注意,Jinja 替换的是算子属性,而不是 args。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("guest_name",)
template_ext = ".sql"
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.guest_name = name
在示例中,template_fields 应为 ['guest_name'],而不是 ['name']。
此外,您可以提供一个 template_fields_renderers 字典,用于定义模板字段的值在 Web UI 中以何种样式渲染。例如
class MyRequestOperator(BaseOperator):
template_fields: Sequence[str] = ("request_body",)
template_fields_renderers = {"request_body": "json"}
def __init__(self, request_body: str, **kwargs) -> None:
super().__init__(**kwargs)
self.request_body = request_body
当 template_field 本身是字典时,也可以指定点分隔的键路径,以提取并恰当地渲染各个元素。例如
class MyConfigOperator(BaseOperator):
template_fields: Sequence[str] = ("configuration",)
template_fields_renderers = {
"configuration": "json",
"configuration.query.sql": "sql",
}
def __init__(self, configuration: dict, **kwargs) -> None:
super().__init__(**kwargs)
self.configuration = configuration
然后按如下方式使用此模板
with dag:
config_task = MyConfigOperator(
task_id="task_id_1",
configuration={"query": {"job_id": "123", "sql": "select * from my_table"}},
)
这将导致 UI 将 configuration 渲染为 JSON,并且在 query.sql 中的配置值将使用 SQL 词法分析器进行渲染。
当前可用的 lexer
bash
bash_command
doc
doc_json
doc_md
doc_rst
doc_yaml
doc_md
hql
html
jinja
json
md
mysql
postgresql
powershell
py
python_callable
rst
sql
tsql
yaml
如果使用不存在的 lexer,模板字段的值将以美化的对象形式渲染。
限制
为防止误用,在算子的构造函数中(如果存在)定义和分配模板字段时必须遵守以下限制(否则请参见下文)
1. 模板字段对应的参数在构造函数中必须与字段同名。以下示例无效,因为传入构造函数的参数名称与模板字段不一致。
class HelloOperator(BaseOperator):
template_fields = "foo"
def __init__(self, foo_id) -> None: # should be def __init__(self, foo) -> None
self.foo = foo_id # should be self.foo = foo
2. 模板字段的实例成员必须通过直接赋值或调用父类的构造函数(在其中这些字段被定义为 template_fields)并显式分配参数的方式,从构造函数的相应参数进行赋值。以下示例无效,因为实例成员 self.foo 根本未被赋值,尽管它是模板字段。
class HelloOperator(BaseOperator):
template_fields = ("foo", "bar")
def __init__(self, foo, bar) -> None:
self.bar = bar
以下示例同样无效,因为 MyHelloOperator 的实例成员 self.foo 隐式地通过传递给其父构造函数的 kwargs 初始化。
class HelloOperator(BaseOperator):
template_fields = "foo"
def __init__(self, foo) -> None:
self.foo = foo
class MyHelloOperator(HelloOperator):
template_fields = ("foo", "bar")
def __init__(self, bar, **kwargs) -> None: # should be def __init__(self, foo, bar, **kwargs)
super().__init__(**kwargs) # should be super().__init__(foo=foo, **kwargs)
self.bar = bar
3. 在构造函数中对参数进行操作是不允许的。对值的任何操作应在 execute() 方法中进行。因此,以下示例无效。
class HelloOperator(BaseOperator):
template_fields = "foo"
def __init__(self, foo) -> None:
self.foo = foo.lower() # assignment should be only self.foo = foo
当算子继承自基算子且自身未定义构造函数时,上述限制不适用。然而,模板字段必须在父类中按照这些限制正确设置。
因此,以下示例是有效的
class HelloOperator(BaseOperator):
template_fields = "foo"
def __init__(self, foo) -> None:
self.foo = foo
class MyHelloOperator(HelloOperator):
template_fields = "foo"
上述限制由名为 ‘validate-operators-init’ 的 pre‑hook 强制执行。
通过子类化添加模板字段
创建自定义算子的常见用例之一是简单地扩展现有的 template_fields。可能会出现某个算子没有将特定参数定义为模板字段,但您希望能够动态地将参数作为 Jinja 表达式传入。这可以通过对现有算子进行快速子类化轻松实现。
假设您想使用前面定义的 HelloOperator
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
然而,您希望动态地对 world 参数进行参数化。由于 template_fields 属性保证是 Sequence[str] 类型(即字符串列表或元组),您可以子类化 HelloOperator,轻松修改 template_fields 为所需的值。
class MyHelloOperator(HelloOperator):
template_fields: Sequence[str] = (*HelloOperator.template_fields, "world")
现在您可以这样使用 MyHelloOperator
with dag:
hello_task = MyHelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="{{ var.value.my_world }}",
)
在此示例中,world 参数将通过 Jinja 表达式动态设置为名为 “my_world” 的 Airflow 变量的值。
为算子定义额外链接
对于您的算子,您可以 定义额外链接,该链接可以将用户重定向到外部系统。例如,您可以添加一个链接,将用户重定向到算子的手册。
传感器
Airflow 为一种特殊类型的算子提供了原语,其目的是在固定间隔内轮询某些状态(例如文件是否存在),直至满足成功条件。
您可以通过扩展 airflow.sensors.base.BaseSensorOperator 并定义 poke 方法来创建任意传感器,以轮询外部状态并评估成功标准。
传感器拥有一个强大的特性——'reschedule' 模式,它允许传感器任务被重新调度,而不是在每次轮询之间阻塞工作槽。当您可以容忍更长的轮询间隔并预计会长时间轮询时,这非常有用。
使用重新调度模式时,需要注意传感器无法在重新调度的执行之间保持内部状态。在这种情况下,您应使用 airflow.sensors.base.poke_mode_only() 装饰您的传感器。这将告知用户您的传感器不适合在重新调度模式下使用。
一个保持内部状态且不能在重新调度模式下使用的传感器示例是 airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor。它轮询前缀下的对象数量(该数量为传感器的内部状态),并在一定时间内对象数量未变化时成功。