消息触发器¶
等待队列中的消息¶
使用 MessageQueueTrigger
来等待队列中的消息。触发器的参数是
queue
- 队列标识符
可以根据队列提供者提供附加参数。需要提供相关的默认连接 ID,例如,连接到 AWS SQS 中的队列时,连接 ID 应为 aws_default
。
下面是一个示例,说明如何配置 Airflow DAG 以由 Amazon SQS 中的消息触发。
tests/system/common/messaging/example_message_queue_trigger.py
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher
# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(queue="https://sqs.us-east-1.amazonaws.com/0123456789/my-queue")
# Define an asset that watches for messages on the queue
asset = Asset("sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)])
with DAG(dag_id="example_msgq_watcher", schedule=[asset]) as dag:
EmptyOperator(task_id="task")
工作原理¶
1. 消息队列触发器: MessageQueueTrigger
监听来自外部队列(例如,AWS SQS、Kafka 或其他消息系统)的消息。
2. 资产和监听器: Asset
抽象外部实体,在此示例中即 SQS 队列。AssetWatcher
将触发器与一个名称关联。此名称可帮助您识别哪个触发器与哪个资产关联。
3. 事件驱动的 DAG: DAG 不会按照固定的计划运行,而是在资产接收到更新(例如,队列中的新消息)时执行。