定义 Operator 额外链接¶
如果您想为 Operator 添加更多链接,可以通过插件或 Provider 包定义它们。额外链接将显示在 Grid 视图的任务详情页中。

以下代码展示了如何通过插件为 Operator 添加额外链接
from airflow.sdk import BaseOperator
from airflow.sdk import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
class GoogleLink(BaseOperatorLink):
name = "Google"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
return "https://www.google.com"
class MyFirstOperator(BaseOperator):
operator_extra_links = (GoogleLink(),)
def __init__(self, **kwargs):
super().__init__(**kwargs)
def execute(self, context):
self.log.info("Hello World!")
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
GoogleLink(),
]
注意
Operator 额外链接需要通过 Airflow 插件或自定义 Airflow Provider 注册才能生效。
您还可以添加一个全局 Operator 额外链接,该链接将通过 Airflow 插件或 Airflow Provider 对所有 Operator 可用。您可以在插件接口和Provider中了解更多信息。
您可以在额外链接中查看社区管理的 Provider 提供的所有额外链接。
添加或覆盖现有 Operator 的链接¶
您还可以通过 Airflow 插件或自定义 Provider 为现有 Operator 添加(或覆盖)额外链接。
例如,以下 Airflow 插件将为所有使用 GCSToS3Operator
Operator 的任务添加 Operator 链接。
为现有 Operator 添加 Operator 链接 plugins/extra_link.py
from airflow.sdk import BaseOperator, BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
class S3LogLink(BaseOperatorLink):
name = "S3"
# Add list of all the operators to which you want to add this OperatorLinks
# Example: operators = [GCSToS3Operator, GCSToBigQueryOperator]
operators = [GCSToS3Operator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
# Invalid bucket name because upper case letters and underscores are used
# This will not be a valid bucket in any region
bucket_name = "Invalid_Bucket_Name"
return "https://s3.amazonaws.com/airflow-logs/{bucket_name}/{dag_id}/{task_id}/{run_id}".format(
bucket_name=bucket_name,
dag_id=operator.dag_id,
task_id=operator.task_id,
run_id=ti_key.run_id,
)
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
S3LogLink(),
]
覆盖现有 Operator 的 Operator 链接:
也可以通过插件替换 Operator 的内置链接。例如,BigQueryExecuteQueryOperator
包含一个指向 Google Cloud Console 的链接,但如果我们想更改该链接,我们可以这样做:
from airflow.sdk import BaseOperator, BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
# Change from https to http just to display the override
BIGQUERY_JOB_DETAILS_LINK_FMT = "http://console.cloud.google.com/bigquery?j={job_id}"
class BigQueryDatasetLink(BaseGoogleLink):
"""
Helper class for constructing BigQuery Dataset Link.
"""
name = "BigQuery Dataset"
key = "bigquery_dataset"
format_str = BIGQUERY_DATASET_LINK
@staticmethod
def persist(
context: Context,
task_instance: BaseOperator,
dataset_id: str,
project_id: str,
):
task_instance.xcom_push(
context,
key=BigQueryDatasetLink.key,
value={"dataset_id": dataset_id, "project_id": project_id},
)
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
BigQueryDatasetLink(),
]
通过 Provider 添加 Operator 链接
正如 Provider 中所解释的,当您创建自己的 Airflow Provider 时,可以指定提供额外链接功能的 Operator 列表。这通过在存储在 Provider 包元数据中的 provider-info
信息中包含 Operator 类名来实现。
Provider-info 字典中所需的示例元数据(这是 apache-airflow-providers-google
Provider 当前返回的元数据的一部分)
extra-links:
- airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink
- airflow.providers.google.cloud.links.bigquery.BigQueryTableLink
您可以包含任意数量带有额外链接的 Operator。