插件
Airflow 内置了一个简单的插件管理器,能够通过将文件放入您的 $AIRFLOW_HOME/plugins 文件夹来将外部功能集成到核心中。
自 Airflow 3.1 起,插件系统支持诸如 React 应用、FastAPI 接口和中间件等新特性,使得扩展 Airflow 并构建丰富的自定义集成更加便捷。
plugins 文件夹中的 Python 模块会被导入,宏和 Web 视图会集成到 Airflow 的主要集合中,从而可供使用。
要排查插件问题,您可以使用 airflow plugins 命令。该命令会输出已加载插件的信息。
有什么用?
Airflow 提供了一套通用的数据处理工具箱。不同组织拥有不同的技术栈和需求。使用 Airflow 插件可以帮助企业定制 Airflow 安装,以匹配其生态系统。
插件可以作为编写、共享和激活新功能集的简易方式。
同时,也需要一套更复杂的应用来与不同类型的数据和元数据交互。
示例
一套用于解析 Hive 日志并暴露 Hive 元数据(CPU / IO / 阶段 / 偏斜 / …)的工具
异常检测框架,允许用户收集指标、设置阈值和警报
审计工具,帮助了解谁访问了什么
基于配置的 SLA 监控工具,允许您设置监控表及其预期落盘时间,提醒相关人员,并提供停机的可视化。
为什么在 Airflow 上进行构建?
Airflow 拥有许多在构建应用时可复用的组件
可用于渲染视图的 Web 服务器
用于存储模型的元数据库
对数据库的访问权限以及连接方式的了解
一系列工作节点,您的应用可以向其推送工作负载
Airflow 已部署,您可以直接依托其部署架构
基础绘图功能、底层库和抽象层
可用构件块
Airflow 插件可以注册以下组件
外部视图 – 在 UI 中添加链接到新页面的按钮/标签。
React 应用 – 在 Airflow UI 中嵌入自定义 React 应用(Airflow 3.1 新增)。
FastAPI 应用 – 添加自定义 API 端点。
FastAPI 中间件 – 拦截并修改 API 请求/响应。
宏 – 定义可在 DAG 模板中使用的可重用 Python 函数。
Operator 额外链接 – 在任务详情视图中添加自定义按钮。
时间表 & 监听器 – 实现自定义调度逻辑和事件钩子。
插件何时(重新)加载?
插件默认采用惰性加载,一旦加载后不会再重新加载(UI 插件除外,它们会在 Webserver 启动时自动加载)。如果希望在每个 Airflow 进程启动时即加载插件,请在 airflow.cfg 中将 [core] lazy_load_plugins = False 设置为 False。
这意味着如果您对插件进行修改,并希望 Webserver 或 Scheduler 使用新代码,则需要重启相应的进程。不过,已在运行的任务在 Scheduler 重启之前不会看到这些更改。
默认情况下,任务执行采用 fork 方式,这避免了创建全新 Python 解释器以及重新解析所有 Airflow 代码和启动例程所带来的开销。对于较短的任务,这种方式收益显著。如果在任务中使用了插件并希望更新插件,则需要重启工作节点(使用 CeleryExecutor 时)或 Scheduler(LocalExecutor 时)。另一种做法是接受启动时的性能损失,将 core.execute_tasks_new_python_interpreter 配置项设为 True,从而为每个任务启动全新的 Python 解释器。
(仅在 DAG 文件中导入的模块则不存在此问题,因为 DAG 文件不会在任何长期运行的 Airflow 进程中被加载/解析。)
接口
要创建插件,需要继承 airflow.plugins_manager.AirflowPlugin 类,并在其中引用希望接入 Airflow 的对象。下面展示了需要继承的类的基本结构。
class AirflowPlugin:
# The name of your plugin (str)
name = None
# A list of references to inject into the macros namespace
macros = []
# A list of dictionaries containing FastAPI app objects and some metadata. See the example below.
fastapi_apps = []
# A list of dictionaries containing FastAPI middleware factory objects and some metadata. See the example below.
fastapi_root_middlewares = []
# A list of dictionaries containing external views and some metadata. See the example below.
external_views = []
# A list of dictionaries containing react apps and some metadata. See the example below.
# Note: React apps are only supported in Airflow 3.1 and later.
# Note: The React app integration is experimental and interfaces might change in future versions. Particularly, dependency and state interactions between the UI and plugins may need to be refactored for more complex plugin apps.
react_apps = []
# A callback to perform actions when Airflow starts and the plugin is loaded.
# NOTE: Ensure your plugin has *args, and **kwargs in the method definition
# to protect against extra parameters injected into the on_load(...)
# function in future changes
def on_load(*args, **kwargs):
# ... perform Plugin boot actions
pass
# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
#
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links = []
# A list of operator extra links to override or add operator links
# to existing Airflow Operators.
# These extra links will be available on the task page in form of
# buttons.
operator_extra_links = []
# A list of timetable classes to register so they can be used in Dags.
timetables = []
# A list of Listeners that plugin provides. Listeners can register to
# listen to particular events that happen in Airflow, like
# TaskInstance state changes. Listeners are python modules.
listeners = []
您可以通过继承来实现该类(请参考下面的示例)。示例中所有选项均以类属性的形式定义,如果需要额外初始化,也可以将其实现为属性方法。请注意,必须在此类中指定 name。
修改插件后请务必重启 Webserver 和 Scheduler,使更改生效。
插件管理界面
Airflow 3.1 引入了插件管理界面,可在 Airflow UI 的 Admin → Plugins 菜单下访问。此页面可查看已安装的插件。
…
外部视图
通过提供 url_route 值,外部视图还可以直接使用 iframe 嵌入到 Airflow UI 中,这样可以在当前页面内渲染视图,而无需在新浏览器标签页中打开。
示例
下面的代码定义了一个插件,它向 Airflow 注入了一组示例对象定义。
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.middleware.trustedhost import TrustedHostMiddleware
# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
# Will show up in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
pass
# Creating a FastAPI application to integrate in Airflow Rest API.
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World from FastAPI plugin"}
app_with_metadata = {"app": app, "url_prefix": "/some_prefix", "name": "Name of the App"}
# Creating a FastAPI middleware that will operates on all the server api requests.
middleware_with_metadata = {
"middleware": TrustedHostMiddleware,
"args": [],
"kwargs": {"allowed_hosts": ["example.com", "*.example.com"]},
"name": "Name of the Middleware",
}
# Creating an external view that will be rendered in the Airflow UI.
external_view_with_metadata = {
# Name of the external view, this will be displayed in the UI.
"name": "Name of the External View",
# Source URL of the external view. This URL can be templated using context variables, depending on the location where the external view is rendered
# the context variables available will be different, i.e a subset of (DAG_ID, RUN_ID, TASK_ID, MAP_INDEX).
"href": "https://example.com/{DAG_ID}/{RUN_ID}/{TASK_ID}/{MAP_INDEX}",
# Destination of the external view. This is used to determine where the view will be loaded in the UI.
# Supported locations are Literal["nav", "dag", "dag_run", "task", "task_instance", "base"], default to "nav".
"destination": "dag_run",
# Optional icon, url to an svg file.
"icon": "https://example.com/icon.svg",
# Optional dark icon for the dark theme, url to an svg file. If not provided, "icon" will be used for both light and dark themes.
"icon_dark_mode": "https://example.com/dark_icon.svg",
# Optional parameters, relative URL location for the External View rendering. If not provided, external view will be rendered as an external link. If provided
# will be rendered inside an Iframe in the UI. Should not contain a leading slash.
"url_route": "my_external_view",
# Optional category, only relevant for destination "nav". This is used to group the external links in the navigation bar. We will match the existing
# menus of ["browse", "docs", "admin", "user"] and if there's no match then create a new menu.
"category": "browse",
}
# Note: The React app integration is experimental and interfaces might change in future versions.
react_app_with_metadata = {
# Name of the React app, this will be displayed in the UI.
"name": "Name of the React App",
# Bundle URL of the React app. This is the URL where the React app is served from. It can be a static file or a CDN.
# This URL can be templated using context variables, depending on the location where the external view is rendered
# the context variables available will be different, i.e a subset of (DAG_ID, RUN_ID, TASK_ID, MAP_INDEX).
"bundle_url": "https://example.com/static/js/my_react_app.js",
# Destination of the react app. This is used to determine where the app will be loaded in the UI.
# Supported locations are Literal["nav", "dag", "dag_run", "task", "task_instance", "base"], default to "nav".
# It can also be put inside of an existing page, the supported views are ["dashboard", "dag_overview", "task_overview"]. You can position
# element in the existing page via the css `order` rule which will determine the flex order.
# Use "base" to mount the app in the base layout (e.g. a toolbar strip); the host uses a flex container so you can set ``order`` in your root JSX to control position.
"destination": "dag_run",
# Optional icon, url to an svg file.
"icon": "https://example.com/icon.svg",
# Optional dark icon for the dark theme, url to an svg file. If not provided, "icon" will be used for both light and dark themes.
"icon_dark_mode": "https://example.com/dark_icon.svg",
# URL route for the React app, relative to the Airflow UI base URL. Should not contain a leading slash.
"url_route": "my_react_app",
# Optional category, only relevant for destination "nav". This is used to group the react apps in the navigation bar. We will match the existing
# menus of ["browse", "docs", "admin", "user"] and if there's no match then create a new menu.
"category": "browse",
}
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
macros = [plugin_macro]
fastapi_apps = [app_with_metadata]
fastapi_root_middlewares = [middleware_with_metadata]
external_views = [external_view_with_metadata]
react_apps = [react_app_with_metadata]
另请参阅
从 CSRF 保护中排除视图
我们强烈建议对所有视图使用 CSRF 保护。但如果确有需要,可以通过装饰器将部分视图排除在外。
from airflow.www.app import csrf
@csrf.exempt
def my_handler():
# ...
return "ok"
插件作为 Python 包
可以通过 setuptools entrypoint 机制加载插件。只需在包的 entrypoint 中声明插件即可。如果该包已安装,Airflow 会自动从 entrypoint 列表中加载已注册的插件。
注意
无论是 entrypoint 名称(例如 my_plugin)还是插件类的名称,都不会影响插件本身的模块名和类名。
# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
class MyAirflowPlugin(AirflowPlugin):
name = "my_namespace"
然后在 pyproject.toml 中
[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"
Airflow 3 中的 Flask Appbuilder 与 Flask Blueprint
Airflow 2 支持在插件中使用 Flask Appbuilder 视图(appbuilder_views)、Flask AppBuilder 菜单项(appbuilder_menu_items)以及 Flask Blueprint(flask_blueprints)。在 Airflow 3 中,这些已被外部视图(external_views)、FastAPI 应用(fastapi_apps)、FastAPI 中间件(fastapi_root_middlewares)以及 React 应用(react_apps)所取代,以提供更丰富的功能并更好地与 Airflow UI 集成。
所有新插件都应使用新的接口。
不过,Airflow 3 为 Flask 与 FAB 插件提供了兼容层,以便平滑迁移。只需安装 FAB 提供者并依据 Airflow 3 迁移指南对代码进行相应调整,即可继续使用现有的 Flask Appbuilder 视图、Flask Blueprint 与 Flask Appbuilder 菜单项。
故障排查
您可以使用 Flask CLI 来排查问题。运行前请将环境变量 FLASK_APP 设置为 airflow.www.app:create_app。
例如,要打印所有路由,可运行
FLASK_APP=airflow.www.app:create_app flask routes