如何创建你自己的 Provider¶
定制 Provider¶
你可以开发并发布自己的 Provider。你的定制 Operator、Hook、Sensor、Transfer Operator 可以打包在一个标准的 Airflow 包中,并使用相同的机制进行安装。此外,它们还可以使用相同的机制来扩展 Airflow 核心,包括认证后端、定制连接、日志记录、密钥后端以及额外 Operator 链接,如前一章所述。
如 Provider 文档所述,定制 Provider 可以扩展 Airflow 核心 - 它们可以为 Operator 添加额外的链接以及定制连接。如果你想为自己的定制 Provider 使用这种机制,可以构建并安装它们作为包。
如何创建 Provider¶
将 Provider 添加到 Airflow 只是构建一个 Python 包并将正确的元数据添加到包中的问题。我们使用标准的 Python 机制来定义入口点(entry points)。你的包需要定义适当的入口点 apache_airflow_provider,它必须指向你的包实现的函数,并返回一个字典,其中包含你包的可发现功能的列表。该字典必须遵循json-schema 规范。
Schema 中的大部分字段为文档提供了扩展点(你也可能想用于自己的目的),但从可扩展性角度来看,重要的字段是这些:
在 CLI/API 中显示包信息
package-name- Provider 的包名。name- Provider 的易读名称。description- Provider 的额外描述。version- 包的版本列表(按时间倒序)。列表中的第一个版本是当前包版本。它取自已安装包的版本,而不是 provider_info 信息中的版本。
向 Airflow 核心暴露定制功能
extra-links- 此字段应包含所有添加额外链接功能的 Operator 类名的列表。请参阅定义 Operator 额外链接,了解如何为你的 Operator 添加额外链接功能。connection-types- 此字段应包含所有连接类型的列表,以及实现这些定制连接类型(提供定制额外字段和定制字段行为)的 Hook 类名。此字段从 Airflow 2.2.0 开始可用,它取代了已弃用的hook-class-names。有关更多详细信息,请参阅管理连接。secret-backends- 此字段应包含 Provider 提供的所有密钥后端类名的列表。请参阅密钥后端,了解如何添加。task-decorators- 此字段应包含字典列表,其中包含装饰器可用的名称/路径。请参阅创建定制 @task 装饰器,了解如何添加定制装饰器。logging- 此字段应包含 Provider 提供的所有日志处理程序类名的列表。请参阅任务日志记录,了解日志处理程序。notifications- 此字段应包含通知类。请参阅创建通知器,了解通知。executors- 此字段应包含执行器类名。请参阅执行器,了解执行器。config- 此字段应包含一个字典,该字典应符合airflow/config_templates/config.yml.schema.json,其中包含 Provider 贡献的配置。请参阅设置配置选项,了解设置配置的详细信息。
filesystems- 此字段应包含所有文件系统模块名的列表。请参阅对象存储,了解文件系统。integrations- 提供 Provider 中可用的集成列表。transfers- 此字段应包含 Provider 提供的所有 Transfer Operator 类名的列表。请参阅Operator,了解 Operator(其中 Transfer 是 Operator 的一种类型)。operators- 此字段应包含 Provider 提供的所有 Operator 类名的列表。请参阅Operator,了解 Operator。hooks- 此字段应包含 Provider 提供的所有 Hook 类名的列表。请参阅管理连接,了解 Hook 及其提供的连接。sensors- 此字段应包含 Provider 提供的所有 Sensor 类名的列表。请参阅Sensor,了解 Sensor。bundles- 此字段应包含 Provider 提供的所有 Bundle 类名的列表。triggers- 此字段应包含 Provider 提供的所有 Trigger 类名的列表。请参阅可延迟 Operator 和 Trigger,了解 Trigger。auth-backends- 此字段应包含 Provider 提供的所有认证后端类名的列表。请参阅认证管理器,了解认证。auth-managers- 此字段应包含 Provider 提供的所有认证管理器类名的列表。请参阅认证管理器,了解认证管理器。notifications- 此字段应包含 Provider 提供的所有通知类名的列表。请参阅创建通知器,了解通知。task-decorators- 此字段应包含 Provider 提供的所有任务装饰器类名的列表。请参阅创建定制 @task 装饰器,了解任务装饰器。config- 此字段应包含 Provider 提供的所有定制配置选项的列表。asset-uris(已弃用) - 此字段应包含 URI scheme 列表以及实现标准化函数的类名。请参阅资产定义,了解数据集 URI。
注意
已弃用的值
hook-class-names(已弃用) - 此字段应包含所有提供带定制额外字段和字段行为的定制连接类型的 Hook 类名列表。hook-class-names数组自 Airflow 2.2.0 起已弃用(出于优化原因),并将在 Airflow 3 中移除。如果你的 Provider 定位到 Airflow 2.2.0+,则无需包含hook-class-names数组;如果你还想兼容 Airflow 2 的早期版本,则应同时包含hook-class-names和connection-types数组。有关更多详细信息,请参阅管理连接。dataset-uris(已弃用) - 此字段应包含 URI scheme 列表以及实现标准化函数的类名。在 Airflow 3.0 中弃用,取而代之的是asset-uris。
安装你的 Provider 后,你可以使用 airflow providers 命令查询已安装的 Provider 及其功能。这样可以验证你的 Provider 是否被正确识别以及它们是否正确定义了扩展。请参阅命令行接口和环境变量参考,了解可用 CLI 子命令的详细信息。
编写自己的 Provider 时,请考虑遵循Provider 命名约定
特殊注意事项¶
可选的 Provider 功能¶
2.3.0 版本新增: 此功能在 Airflow 2.3+ 中可用。
一些 Provider 可能提供可选功能,这些功能仅在安装了某些包或库时才可用。此类功能通常会导致 ImportErrors;但是,这些导入错误应被静默忽略,而不是用虚假警告污染 Airflow 的日志。虚假警告是一种非常糟糕的模式,因为它们往往会变成盲点,因此鼓励避免虚假警告。然而,在 Airflow 2.3 之前,Airflow 没有机制选择性地忽略“已知”ImportErrors。因此 Airflow 2.1 和 2.2 静默忽略了所有来自 Provider 的 ImportErrors,实际上导致忽略了重要的导入错误 - 没有给 Airflow 用户提示 Provider 的依赖项中缺少某些东西。
将 Provider 与动态任务映射结合使用¶
Airflow 2.3 添加了动态任务映射,并增加了为每个任务分配唯一键的可能性。这意味着当这样的动态映射任务想要从 XCom 中检索值时(例如,在需要计算额外链接的情况下),它应该始终检查传入的 ti_key 值是否不为 None,然后才使用 XCom.get_value 检索 XCom 值。这使得能够与早期版本的 Airflow 保持向后兼容性。
想要保持向后兼容性的 Provider 中访问 XCom 值的典型代码应该类似于这样(注意 if ti_key is not None: 条件)。
def get_link( self, operator: BaseOperator, dttm: datetime | None = None, ti_key: "TaskInstanceKey" | None = None, ): if ti_key is not None: job_ids = XCom.get_value(key="job_id", ti_key=ti_key) else: assert dttm is not None job_ids = XCom.get_one( key="job_id", dag_id=operator.dag.dag_id, task_id=operator.task_id, execution_date=dttm, ) if not job_ids: return None if len(job_ids) < self.index: return None job_id = job_ids[self.index] return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
定制 Provider 常见问题¶
当我编写自己的 Provider 时,需要做些什么特殊的事情来使其对其他人可用吗?
你无需做任何特殊的事情,只需创建返回格式正确的元数据(包含 extra-links 和 connection-types 字段的字典,如果你还定位到 Airflow 2.2.0 之前的版本,则还包含已弃用的 hook-class-names 字段)的 apache_airflow_provider 入口点。
任何在安装了你的 Python 包的环境中运行 Airflow 的人都可以将该包用作 Provider 包。
我应该给我的 Provider 起一个特定的名称,还是应该在 ``airflow.providers`` 包中创建它?
我们有相当数量(>80 个)由社区管理的 Provider,我们将与 Apache Airflow 一起维护它们。所有这些 Provider 都有明确定义的结构并遵循我们定义的命名约定,并且它们都在 airflow.providers 包中。如果你的意图是贡献你的 Provider,那么你应该遵循这些约定并向 Apache Airflow 提交 PR 以贡献它。但你可以自由使用任何包名,只要不与其他名称冲突,所以最好选择在你“领域”内的包。
我需要做些什么才能将一个包变成一个 Provider?
要将现有的 Python 包变成一个 Provider,你需要执行以下操作(参见下面的示例):
在
pyproject.toml文件中添加apache_airflow_provider入口点 - 这会告诉 Airflow 从哪里获取所需的 Provider 元数据。创建你在第一步中提到的函数,作为你的包的一部分:该函数返回一个字典,其中包含关于你的 Provider 包的所有元数据。
如果你希望 Airflow 在 Provider 页面中链接到你的 Provider 文档,请确保为你的包添加“project-url/documentation”元数据。这也会在 PyPI 中添加你的文档链接。
请注意,该字典应符合
airflow/provider_info.schema.jsonJSON-schema 规范。社区管理的 Provider 有更多用于构建文档的字段,但运行时信息的要求只包含 schema 中定义的几个字段。
airflow/provider_info.schema.json
{
"$schema": "https://json-schema.fullstack.org.cn/draft-07/schema#",
"type": "object",
"properties": {
"package-name": {
"description": "Package name available under which the package is available in the PyPI repository.",
"type": "string"
},
"name": {
"description": "Provider name",
"type": "string"
},
"description": {
"description": "Information about the package in RST format",
"type": "string"
},
"hook-class-names": {
"type": "array",
"description": "Hook class names that provide connection types to core (deprecated by connection-types)",
"items": {
"type": "string"
},
"deprecated": {
"description": "The hook-class-names property has been deprecated in favor of connection-types which is more performant version allowing to only import individual Hooks rather than all hooks at once",
"deprecatedVersion": "2.2.0"
}
},
"filesystems": {
"type": "array",
"description": "Filesystem module names",
"items": {
"type": "string"
}
},
"integrations": {
"description": "Array of integrations provided by the provider",
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"description": "Name of the integrations to expose by the provider",
"type": "string"
},
"external-doc-url": {
"description": "URL that describes the integration",
"type": "string"
},
"logo": {
"description": "URL or path on the airflow side where you can find the logo",
"type": "string"
},
"tags": {
"description": "Tags describing the integration (free-form)",
"type": "array",
"items": {
"type": "string"
}
}
}
}
},
"operators": {
"description": "List of operators provided by the integration",
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"description": "Name of the integration",
"type": "string"
},
"python-modules": {
"description": "List of modules where operators are found",
"type" : "array",
"items": {
"type": "string"
}
}
}
}
},
"sensors": {
"description": "List of sensors provided by the integration",
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"description": "Name of the integration",
"type": "string"
},
"python-modules": {
"description": "List of modules where sensors are found",
"type" : "array",
"items": {
"type": "string"
}
}
}
}
},
"hooks": {
"description": "List of hooks provided by the integration",
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"description": "Name of the integration",
"type": "string"
},
"python-modules": {
"description": "List of modules where hooks are found",
"type" : "array",
"items": {
"type": "string"
}
}
}
}
},
"asset-uris": {
"description": "List of asset uris provided by the provider",
"type": "array",
"items": {
"type": "object",
"properties": {
"schemes": {
"description": "Array of schemes supported",
"type": "array",
"items" : {
"type": "string"
}
},
"handler": {
"description": "Handler used to handle the asset",
"anyOf": [
{ "type": "string" },
{ "type": "null" }
]
},
"factory": {
"description": "Factory to create the asset",
"type": "string"
},
"to_openlineage_converter": {
"description": "Converter to open-lineage event",
"type": "string"
}
},
"required": ["schemes"]
}
},
"dataset-uris": {
"description": "List of asset uris provided by the provider",
"type": "array",
"items": {
"type": "object",
"properties": {
"schemes": {
"description": "Array of schemes supported",
"type": "array",
"items" : {
"type": "string"
}
},
"handler": {
"description": "Handler used to handle the asset",
"anyOf": [
{ "type": "string" },
{ "type": "null" }
]
},
"factory": {
"description": "Factory to create the asset",
"type": "string"
},
"to_openlineage_converter": {
"description": "Converter to open-lineage event",
"type": "string"
}
},
"required": ["schemes"]
},
"deprecated": {
"description": "The dataset-uris property has been deprecated in favor of asset-uris in airflow 3.0",
"deprecatedVersion": "3.0.0"
}
},
"dialects": {
"description": "List of dialects the provider provides",
"type": "array",
"items": {
"type": "object",
"properties": {
"dialect-type": {
"description": "Type of SQL dialect",
"type": "string"
},
"dialect-class-name": {
"description": "Class name that implements the dialect",
"type": "string"
}
}
}
},
"transfers": {
"description": "List of transfer operators the provider provides",
"type": "array",
"items": {
"type": "object",
"properties": {
"how-to-guide": {
"description": "Path to how-to-guide for the transfer. The path must start with '/docs/'",
"type": "string"
},
"source-integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider"
},
"target-integration-name": {
"type": "string",
"description": "Target integration name. It must have a matching item in the 'integration' section of any provider"
},
"python-module": {
"type": "string",
"description": "List of python modules containing the transfers"
}
},
"additionalProperties": false,
"required": [
"source-integration-name",
"target-integration-name",
"python-module"
]
}
},
"triggers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider"
},
"python-modules": {
"description": "List of Python modules containing the triggers",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false,
"required": [
"integration-name",
"python-modules"
]
}
},
"bundles": {
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider"
},
"python-modules": {
"description": "List of python modules containing the bundles",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false,
"required": [
"integration-name",
"python-modules"
]
}
},
"connection-types": {
"type": "array",
"description": "Map of connection types mapped to hook class names",
"items": {
"type": "object",
"properties": {
"connection-type": {
"description": "Type of connection defined by the provider",
"type": "string"
},
"hook-class-name": {
"description": "Hook class name that implements the connection type",
"type": "string"
}
},
"required": [
"connection-type",
"hook-class-name"
]
}
},
"extra-links": {
"type": "array",
"description": "Operator class names that provide extra link functionality",
"items": {
"type": "string"
}
},
"secrets-backends": {
"type": "array",
"description": "Secrets Backend class names",
"items": {
"type": "string"
}
},
"logging": {
"type": "array",
"description": "Logging Task Handlers class names",
"items": {
"type": "string"
}
},
"auth-backends": {
"type": "array",
"description": "API Auth Backend module names",
"items": {
"type": "string"
}
},
"auth-managers": {
"type": "array",
"description": "Auth managers module names",
"items": {
"type": "string"
}
},
"notifications": {
"type": "array",
"description": "Notification class names",
"items": {
"type": "string"
}
},
"executors": {
"type": "array",
"description": "Executor class names",
"items": {
"type": "string"
}
},
"config": {
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"options": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/option"
}
},
"renamed": {
"type": "object",
"properties": {
"previous_name": {
"type": "string"
},
"version": {
"type": "string"
}
}
}
},
"required": [
"description",
"options"
],
"additionalProperties": false
}
},
"task-decorators": {
"type": "array",
"description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",
"items": {
"name": {
"type": "string"
},
"path": {
"type": "string"
}
}
},
"plugins": {
"type": "array",
"description": "Plugins provided by the provider",
"items": {
"name": {
"type": "string",
"description": "Name of the plugin"
},
"plugin-class": {
"type": "string",
"description": "Class to instantiate the plugin"
}
}
}
},
"definitions": {
"option": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"version_added": {
"type": [
"string",
"null"
]
},
"type": {
"type": "string",
"enum": [
"string",
"boolean",
"integer",
"float"
]
},
"example": {
"type": [
"string",
"null",
"number"
]
},
"default": {
"type": [
"string",
"null",
"number"
]
},
"sensitive": {
"type": "boolean",
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
}
},
"required": [
"description",
"version_added",
"type",
"example",
"default"
],
"additional_properties": false
}
},
"required": [
"name",
"description"
]
}
pyproject.toml 示例
[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.myproviderpackage.get_provider_info:get_provider_info"
myproviderpackage/get_provider_info.py 示例
def get_provider_info():
return {
"package-name": "my-package-name",
"name": "name",
"description": "a description",
"hook-class-names": [
"myproviderpackage.hooks.source.SourceHook",
],
}
连接 ID 和类型有约定吗?
非常好的问题。很高兴你问了。我们通常遵循 <NAME>_default 作为连接 ID 的约定,只使用 <NAME> 作为连接类型。一些例子:
google_cloud_defaultID 和google_cloud_platform类型aws_defaultID 和aws类型
你应该遵循这个约定。使用唯一的连接类型名称很重要,因此它对于你的 Provider 应该是唯一的。如果两个 Provider 尝试添加具有相同类型的连接,只有一个会成功。
我可以将自己的 Provider 贡献给 Apache Airflow 吗?
答案取决于 Provider。我们在 PROVIDERS.rst 开发者文档中有相关政策。
我可以在 PyPI 中向 Apache Airflow 用户推广我的 Provider 并将其作为包分享给其他人吗?
当然可以!我们的网站有一个生态系统区域,我们在其中分享非社区管理的 Airflow 扩展和工作。欢迎向该页面提交 PR,我们将评估并在我们认为该 Provider 对 Airflow 用户社区有用时合并它。
我可以对使用我的 Provider 收费吗?
这超出了我们的控制范围和领域。作为一个 Apache 项目,我们对商业友好,并且有许多围绕 Apache Airflow 和许多其他 Apache 项目建立起来的企业。作为一个社区,我们免费提供所有软件,这一点永远不会改变。第三方开发者正在做的事情不受 Apache Airflow 社区的控制。