Airflow 2025 峰会将于 10 月 07-09 日举行。立即注册获取早鸟票!

插件

Airflow 内置了一个简单的插件管理器,只需将文件放到您的 $AIRFLOW_HOME/plugins 文件夹中,即可将其外部功能集成到核心中。

plugins 文件夹中的 Python 模块会被导入,并且 和 Web 视图 会集成到 Airflow 的主集合中并可供使用。

要排除插件问题,可以使用 airflow plugins 命令。此命令会输出已加载插件的信息。

有什么用?

Airflow 提供了处理数据的通用工具箱。不同的组织有不同的技术栈和不同的需求。使用 Airflow 插件可以帮助公司定制其 Airflow 安装,以反映其生态系统。

插件可以作为一种简便的方法来编写、共享和激活新的功能集。

此外,还需要一套更复杂的应用程序来与不同类型的数据和元数据交互。

示例

  • 一套用于解析 Hive 日志并暴露 Hive 元数据(CPU / IO / 阶段 / 倾斜 / 等)的工具

  • 一个异常检测框架,允许人们收集指标、设置阈值和警报

  • 一个审计工具,帮助了解谁访问了什么

  • 一个配置驱动的 SLA 监控工具,允许您设置被监控的表及其应该到达的时间,向相关人员发送警报,并展示中断的可视化

为何基于 Airflow 构建?

Airflow 包含许多在构建应用程序时可以复用的组件

  • 一个可用于渲染视图的 Web 服务器

  • 一个用于存储模型数据的元数据数据库

  • 访问您的数据库,并了解如何连接它们

  • 一组您的应用程序可以推送工作负载的 Worker

  • Airflow 已经部署好,您可以直接利用其部署流程

  • 基本的图表功能,以及底层库和抽象

插件何时加载(和重新加载)?

默认情况下,插件是延迟加载的,一旦加载,就不会重新加载(UI 插件在 Webserver 中自动加载除外)。要在每个 Airflow 进程启动时加载它们,请在 airflow.cfg 中设置 [core] lazy_load_plugins = False

这意味着如果您对插件进行了任何更改,并希望 Webserver 或调度器使用新代码,则需要重启这些进程。但是,直到调度器启动后,更改才会反映在新的运行任务中。

默认情况下,任务执行使用 forking。这避免了创建新的 Python 解释器并重新解析所有 Airflow 代码和启动例程所带来的速度减慢。这种方法具有显著的优势,尤其是对于较短的任务。这意味着,如果您在任务中使用插件并希望它们更新,则需要重启 Worker(如果使用 CeleryExecutor)或调度器(LocalExecutor)。另一种选择是接受启动时的速度损失,将 core.execute_tasks_new_python_interpreter 配置设置为 True,这样会为任务启动一个全新的 Python 解释器。

(另一方面,仅由 DAG 文件导入的模块则没有这个问题,因为 DAG 文件不会在任何长时间运行的 Airflow 进程中加载/解析。)

接口

要创建插件,您需要派生 airflow.plugins_manager.AirflowPlugin 类,并引用您想要插入到 Airflow 中的对象。您需要派生的类如下所示

class AirflowPlugin:
    # The name of your plugin (str)
    name = None
    # A list of references to inject into the macros namespace
    macros = []
    # A list of dictionaries containing FastAPI app objects and some metadata. See the example below.
    fastapi_apps = []
    # A list of dictionaries containing FastAPI middleware factory objects and some metadata. See the example below.
    fastapi_root_middlewares = []

    # A callback to perform actions when Airflow starts and the plugin is loaded.
    # NOTE: Ensure your plugin has *args, and **kwargs in the method definition
    #   to protect against extra parameters injected into the on_load(...)
    #   function in future changes
    def on_load(*args, **kwargs):
        # ... perform Plugin boot actions
        pass

    # A list of global operator extra links that can redirect users to
    # external systems. These extra links will be available on the
    # task page in the form of buttons.
    #
    # Note: the global operator extra link can be overridden at each
    # operator level.
    global_operator_extra_links = []

    # A list of operator extra links to override or add operator links
    # to existing Airflow Operators.
    # These extra links will be available on the task page in form of
    # buttons.
    operator_extra_links = []

    # A list of timetable classes to register so they can be used in dags.
    timetables = []

    # A list of Listeners that plugin provides. Listeners can register to
    # listen to particular events that happen in Airflow, like
    # TaskInstance state changes. Listeners are python modules.
    listeners = []

您可以通过继承来派生它(请参阅下面的示例)。在示例中,所有选项都已定义为类属性,但如果您需要执行额外的初始化,也可以将它们定义为属性。请注意,此类内部的 name 必须指定。

请确保在更改插件后重新启动 Webserver 和调度器,以使更改生效。

示例

下面的代码定义了一个插件,它在 Airflow 中注入了一组说明性的对象定义。

# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin

from fastapi import FastAPI
from fastapi.middleware.trustedhost import TrustedHostMiddleware

# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator


# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
    pass


# Creating a FastAPI application to integrate in Airflow Rest API.
app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World from FastAPI plugin"}


app_with_metadata = {"app": app, "url_prefix": "/some_prefix", "name": "Name of the App"}


# Creating a FastAPI middleware that will operates on all the server api requests.
middleware_with_metadata = {
    "middleware": TrustedHostMiddleware,
    "args": [],
    "kwargs": {"allowed_hosts": ["example.com", "*.example.com"]},
    "name": "Name of the Middleware",
}


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    macros = [plugin_macro]
    fastapi_apps = [app_with_metadata]
    fastapi_root_middlewares = [middleware_with_metadata]

从 CSRF 保护中排除视图

我们强烈建议您使用 CSRF 保护所有视图。但如果需要,可以使用装饰器排除某些视图。

from airflow.www.app import csrf


@csrf.exempt
def my_handler():
    # ...
    return "ok"

作为 Python 包的插件

可以通过 setuptools entrypoint 机制加载插件。为此,请在您的软件包中使用 entrypoint 链接您的插件。如果软件包已安装,Airflow 将自动从 entrypoint 列表中加载注册的插件。

注意

entrypoint 名称(例如 my_plugin)和插件类名称都不会影响插件本身的模块和类名称。

# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin


class MyAirflowPlugin(AirflowPlugin):
    name = "my_namespace"

然后在 pyproject.toml 中

[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"

Airflow 3 中的 Flask Appbuilder 和 Flask Blueprints

Airflow 2 支持插件中的 Flask Appbuilder 视图 (appbuilder_views)、Flask AppBuilder 菜单项 (appbuilder_menu_items) 和 Flask Blueprints (flask_blueprints)。这些在 Airflow 3 中已被 FastAPI 应用取代。所有新插件都应改用 FastAPI 应用 (fastapi_apps)。

但是,为 Flask 和 FAB 插件提供了一个兼容层,以便于过渡到 Airflow 3 - 只需安装 FAB Provider 即可。理想情况下,您应该在升级过程中将您的插件转换为 FastAPI 应用 (fastapi_apps),因为此兼容层已被弃用。

故障排除

您可以使用 Flask CLI 来排除问题。要运行此命令,您需要将变量 FLASK_APP 设置为 airflow.www.app:create_app

例如,要打印所有路由,运行

FLASK_APP=airflow.www.app:create_app flask routes

此条目是否有帮助?