创建通知器¶
您还可以设置 template_fields
属性来指定哪些属性应被渲染为模板。
以下是创建 Notifier 类的一个示例
from airflow.sdk import BaseNotifier
from my_provider import send_message
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message):
self.message = message
def notify(self, context):
# Send notification here, below is an example
title = f"Task {context['task_instance'].task_id} failed"
send_message(title, self.message)
使用通知器¶
以下是使用上述通知器的一个示例
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from myprovider.notifier import MyNotifier
with DAG(
dag_id="example_notifier",
start_date=datetime(2022, 1, 1),
schedule=None,
on_success_callback=MyNotifier(message="Success!"),
on_failure_callback=MyNotifier(message="Failure!"),
):
task = BashOperator(
task_id="example_task",
bash_command="exit 1",
on_success_callback=MyNotifier(message="Task Succeeded!"),
)
有关社区管理的通知器列表,请参阅 通知。