插件¶
Airflow 内置了一个简单的插件管理器,只需将文件放到您的 $AIRFLOW_HOME/plugins
文件夹中,即可将其外部功能集成到核心中。
plugins
文件夹中的 Python 模块会被导入,并且 宏 和 Web 视图 会集成到 Airflow 的主集合中并可供使用。
要排除插件问题,可以使用 airflow plugins
命令。此命令会输出已加载插件的信息。
有什么用?¶
Airflow 提供了处理数据的通用工具箱。不同的组织有不同的技术栈和不同的需求。使用 Airflow 插件可以帮助公司定制其 Airflow 安装,以反映其生态系统。
插件可以作为一种简便的方法来编写、共享和激活新的功能集。
此外,还需要一套更复杂的应用程序来与不同类型的数据和元数据交互。
示例
一套用于解析 Hive 日志并暴露 Hive 元数据(CPU / IO / 阶段 / 倾斜 / 等)的工具
一个异常检测框架,允许人们收集指标、设置阈值和警报
一个审计工具,帮助了解谁访问了什么
一个配置驱动的 SLA 监控工具,允许您设置被监控的表及其应该到达的时间,向相关人员发送警报,并展示中断的可视化
为何基于 Airflow 构建?¶
Airflow 包含许多在构建应用程序时可以复用的组件
一个可用于渲染视图的 Web 服务器
一个用于存储模型数据的元数据数据库
访问您的数据库,并了解如何连接它们
一组您的应用程序可以推送工作负载的 Worker
Airflow 已经部署好,您可以直接利用其部署流程
基本的图表功能,以及底层库和抽象
插件何时加载(和重新加载)?¶
默认情况下,插件是延迟加载的,一旦加载,就不会重新加载(UI 插件在 Webserver 中自动加载除外)。要在每个 Airflow 进程启动时加载它们,请在 airflow.cfg
中设置 [core] lazy_load_plugins = False
。
这意味着如果您对插件进行了任何更改,并希望 Webserver 或调度器使用新代码,则需要重启这些进程。但是,直到调度器启动后,更改才会反映在新的运行任务中。
默认情况下,任务执行使用 forking。这避免了创建新的 Python 解释器并重新解析所有 Airflow 代码和启动例程所带来的速度减慢。这种方法具有显著的优势,尤其是对于较短的任务。这意味着,如果您在任务中使用插件并希望它们更新,则需要重启 Worker(如果使用 CeleryExecutor)或调度器(LocalExecutor)。另一种选择是接受启动时的速度损失,将 core.execute_tasks_new_python_interpreter
配置设置为 True,这样会为任务启动一个全新的 Python 解释器。
(另一方面,仅由 DAG 文件导入的模块则没有这个问题,因为 DAG 文件不会在任何长时间运行的 Airflow 进程中加载/解析。)
接口¶
要创建插件,您需要派生 airflow.plugins_manager.AirflowPlugin
类,并引用您想要插入到 Airflow 中的对象。您需要派生的类如下所示
class AirflowPlugin:
# The name of your plugin (str)
name = None
# A list of references to inject into the macros namespace
macros = []
# A list of dictionaries containing FastAPI app objects and some metadata. See the example below.
fastapi_apps = []
# A list of dictionaries containing FastAPI middleware factory objects and some metadata. See the example below.
fastapi_root_middlewares = []
# A callback to perform actions when Airflow starts and the plugin is loaded.
# NOTE: Ensure your plugin has *args, and **kwargs in the method definition
# to protect against extra parameters injected into the on_load(...)
# function in future changes
def on_load(*args, **kwargs):
# ... perform Plugin boot actions
pass
# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
#
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links = []
# A list of operator extra links to override or add operator links
# to existing Airflow Operators.
# These extra links will be available on the task page in form of
# buttons.
operator_extra_links = []
# A list of timetable classes to register so they can be used in dags.
timetables = []
# A list of Listeners that plugin provides. Listeners can register to
# listen to particular events that happen in Airflow, like
# TaskInstance state changes. Listeners are python modules.
listeners = []
您可以通过继承来派生它(请参阅下面的示例)。在示例中,所有选项都已定义为类属性,但如果您需要执行额外的初始化,也可以将它们定义为属性。请注意,此类内部的 name
必须指定。
请确保在更改插件后重新启动 Webserver 和调度器,以使更改生效。
示例¶
下面的代码定义了一个插件,它在 Airflow 中注入了一组说明性的对象定义。
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.middleware.trustedhost import TrustedHostMiddleware
# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
pass
# Creating a FastAPI application to integrate in Airflow Rest API.
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World from FastAPI plugin"}
app_with_metadata = {"app": app, "url_prefix": "/some_prefix", "name": "Name of the App"}
# Creating a FastAPI middleware that will operates on all the server api requests.
middleware_with_metadata = {
"middleware": TrustedHostMiddleware,
"args": [],
"kwargs": {"allowed_hosts": ["example.com", "*.example.com"]},
"name": "Name of the Middleware",
}
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
macros = [plugin_macro]
fastapi_apps = [app_with_metadata]
fastapi_root_middlewares = [middleware_with_metadata]
另请参阅
从 CSRF 保护中排除视图¶
我们强烈建议您使用 CSRF 保护所有视图。但如果需要,可以使用装饰器排除某些视图。
from airflow.www.app import csrf
@csrf.exempt
def my_handler():
# ...
return "ok"
作为 Python 包的插件¶
可以通过 setuptools entrypoint 机制加载插件。为此,请在您的软件包中使用 entrypoint 链接您的插件。如果软件包已安装,Airflow 将自动从 entrypoint 列表中加载注册的插件。
注意
entrypoint 名称(例如 my_plugin
)和插件类名称都不会影响插件本身的模块和类名称。
# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
class MyAirflowPlugin(AirflowPlugin):
name = "my_namespace"
然后在 pyproject.toml 中
[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"
Airflow 3 中的 Flask Appbuilder 和 Flask Blueprints¶
Airflow 2 支持插件中的 Flask Appbuilder 视图 (appbuilder_views
)、Flask AppBuilder 菜单项 (appbuilder_menu_items
) 和 Flask Blueprints (flask_blueprints
)。这些在 Airflow 3 中已被 FastAPI 应用取代。所有新插件都应改用 FastAPI 应用 (fastapi_apps
)。
但是,为 Flask 和 FAB 插件提供了一个兼容层,以便于过渡到 Airflow 3 - 只需安装 FAB Provider 即可。理想情况下,您应该在升级过程中将您的插件转换为 FastAPI 应用 (fastapi_apps),因为此兼容层已被弃用。
故障排除¶
您可以使用 Flask CLI 来排除问题。要运行此命令,您需要将变量 FLASK_APP
设置为 airflow.www.app:create_app
。
例如,要打印所有路由,运行
FLASK_APP=airflow.www.app:create_app flask routes