消息触发器

等待队列中的消息

使用 MessageQueueTrigger 来等待队列中的消息。触发器的参数是

  • queue - 队列标识符

可以根据队列提供者提供附加参数。需要提供相关的默认连接 ID,例如,连接到 AWS SQS 中的队列时,连接 ID 应为 aws_default

下面是一个示例,说明如何配置 Airflow DAG 以由 Amazon SQS 中的消息触发。

tests/system/common/messaging/example_message_queue_trigger.py

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher

# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(queue="https://sqs.us-east-1.amazonaws.com/0123456789/my-queue")

# Define an asset that watches for messages on the queue
asset = Asset("sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)])

with DAG(dag_id="example_msgq_watcher", schedule=[asset]) as dag:
    EmptyOperator(task_id="task")

工作原理

1. 消息队列触发器: MessageQueueTrigger 监听来自外部队列(例如,AWS SQS、Kafka 或其他消息系统)的消息。

2. 资产和监听器: Asset 抽象外部实体,在此示例中即 SQS 队列。AssetWatcher 将触发器与一个名称关联。此名称可帮助您识别哪个触发器与哪个资产关联。

3. 事件驱动的 DAG: DAG 不会按照固定的计划运行,而是在资产接收到更新(例如,队列中的新消息)时执行。

此条目是否有帮助?