Amazon 简单队列服务 (Amazon SQS) 通知操作指南

简介

Amazon SQS 通知器 SqsNotifier 允许用户使用 DAG 级别和任务级别的各种 on_*_callbacks 将消息推送到 Amazon SQS 队列。

您还可以将通知器与 sla_miss_callback 一起使用。

注意

当通知器与 sla_miss_callback 一起使用时,上下文将仅包含传递给回调的值,请参阅 sla_miss_callback

示例代码:

from datetime import datetime, timezone
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.notifications.sqs import send_sqs_notification

dag_failure_sqs_notification = send_sqs_notification(
    aws_conn_id="aws_default",
    queue_url="https://sqs.eu-west-1.amazonaws.com/123456789098/MyQueue",
    message_body="The DAG {{ dag.dag_id }} failed",
)
task_failure_sqs_notification = send_sqs_notification(
    aws_conn_id="aws_default",
    region_name="eu-west-1",
    queue_url="https://sqs.eu-west-1.amazonaws.com/123456789098/MyQueue",
    message_body="The task {{ ti.task_id }} failed",
)

with DAG(
    dag_id="mydag",
    schedule="@once",
    start_date=datetime(2023, 1, 1, tzinfo=timezone.utc),
    on_failure_callback=[dag_failure_sqs_notification],
    catchup=False,
):
    BashOperator(task_id="mytask", on_failure_callback=[task_failure_sqs_notification], bash_command="fail")

此条目有帮助吗?