定义操作器额外链接¶
如果要向操作器添加更多链接,可以通过插件或提供程序包定义它们。额外链接将显示在网格视图的任务详细信息页面中。
以下代码显示了如何通过插件向操作器添加额外链接
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
class GoogleLink(BaseOperatorLink):
name = "Google"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
return "https://www.google.com"
class MyFirstOperator(BaseOperator):
operator_extra_links = (GoogleLink(),)
def __init__(self, **kwargs):
super().__init__(**kwargs)
def execute(self, context):
self.log.info("Hello World!")
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
GoogleLink(),
]
注意
操作器额外链接应通过 Airflow 插件或自定义 Airflow 提供程序注册才能工作。
您还可以添加一个全局操作器额外链接,该链接将通过 Airflow 插件或 Airflow 提供程序对所有操作器可用。您可以在插件接口和提供程序包中了解更多信息。
您可以在额外链接中查看社区管理的提供程序提供的所有额外链接。
向现有操作器添加或覆盖链接¶
您还可以通过 Airflow 插件或自定义提供程序向现有操作器添加(或覆盖)额外链接。
例如,以下 Airflow 插件将在使用GCSToS3Operator
操作器的所有任务上添加一个操作器链接。
**向现有操作器添加操作器链接** plugins/extra_link.py
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
class S3LogLink(BaseOperatorLink):
name = "S3"
# Add list of all the operators to which you want to add this OperatorLinks
# Example: operators = [GCSToS3Operator, GCSToBigQueryOperator]
operators = [GCSToS3Operator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
# Invalid bucket name because upper case letters and underscores are used
# This will not be a valid bucket in any region
bucket_name = "Invalid_Bucket_Name"
return "https://s3.amazonaws.com/airflow-logs/{bucket_name}/{dag_id}/{task_id}/{run_id}".format(
bucket_name=bucket_name,
dag_id=operator.dag_id,
task_id=operator.task_id,
run_id=ti_key.run_id,
)
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
S3LogLink(),
]
覆盖现有操作器的操作器链接:
也可以通过插件替换操作器上的内置链接。例如,BigQueryExecuteQueryOperator
包含指向 Google Cloud Console 的链接,但如果我们想更改该链接,我们可以
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.xcom import XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
# Change from https to http just to display the override
BIGQUERY_JOB_DETAILS_LINK_FMT = "http://console.cloud.google.com/bigquery?j={job_id}"
class BigQueryConsoleLink(BaseOperatorLink):
"""
Helper class for constructing BigQuery link.
"""
name = "BigQuery Console"
operators = [BigQueryOperator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
job_id = XCom.get_one(ti_key=ti_key, key="job_id")
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
BigQueryConsoleLink(),
]
通过提供程序添加操作器链接
如提供程序包中所述,当您创建自己的 Airflow 提供程序时,可以指定提供额外链接功能的操作器列表。这是通过在提供程序包元数据中存储的provider-info
信息中包含操作器类名来实现的
提供程序信息字典中所需的示例元数据(这是apache-airflow-providers-google
提供程序当前返回的元数据的一部分
extra-links:
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink
- airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink
您可以根据需要包含任意数量的具有额外链接的操作器。