airflow.providers.amazon.aws.triggers.sqs

SqsSensorTrigger

从 Amazon SQS 队列异步获取消息,然后从队列中删除这些消息。

模块内容

class airflow.providers.amazon.aws.triggers.sqs.SqsSensorTrigger(sqs_queue, aws_conn_id='aws_default', max_messages=5, num_batches=1, wait_time_seconds=1, visibility_timeout=None, message_filtering=None, message_filtering_match_values=None, message_filtering_config=None, delete_message_on_reception=True, waiter_delay=60, region_name=None, verify=None, botocore_config=None)[source]

基类: airflow.triggers.base.BaseEventTrigger

从 Amazon SQS 队列异步获取消息,然后从队列中删除这些消息。

参数:
  • sqs_queue (str) – SQS 队列 URL

  • aws_conn_id (str | None) – AWS 连接 ID

  • max_messages (int) – 每次 poke 获取的最大消息数(可模板化)

  • num_batches (int) – 传感器调用 SQS API 接收消息的次数(默认值:1)

  • wait_time_seconds (int) – 等待接收消息的时间(秒)(默认值:1 秒)

  • visibility_timeout (int | None) – 可见性超时,在此期间 Amazon SQS 会阻止其他消费者接收和处理消息。

  • message_filtering (airflow.providers.amazon.aws.utils.sqs.MessageFilteringType | None) – 指定应如何过滤接收到的消息。支持的选项有:None(不过滤,默认值)、‘literal’(消息 Body 字面匹配)或 ‘jsonpath’(使用 JSONPath 表达式过滤消息 Body)。您可以通过覆盖相关类方法添加更多方法。

  • message_filtering_match_values (Any) – 用于消息过滤器匹配的可选值。例如,在使用字面匹配时,如果消息 body 匹配任何指定值,则包含该消息。对于 JSONPath 匹配,使用 JSONPath 表达式的结果,并且该结果可以匹配任何指定值。

  • message_filtering_config (Any) – 要传递给消息过滤器的附加配置。例如,对于 JSONPath 过滤,您可以在此处传递 JSONPath 表达式字符串,例如 ‘foo[*].baz’。Body 不匹配的消息将被忽略。

  • delete_message_on_reception (bool) – 默认为 True,消息在被消费后立即从队列中删除。否则,消息在消费后仍保留在队列中,应手动删除。

  • waiter_delay (int) – 两次调用 SQS API 接收消息之间的等待时间(秒)。

sqs_queue[source]
max_messages = 5[source]
num_batches = 1[source]
wait_time_seconds = 1[source]
visibility_timeout = None[source]
message_filtering = None[source]
delete_message_on_reception = True[source]
message_filtering_match_values = None[source]
message_filtering_config = None[source]
waiter_delay = 60[source]
aws_conn_id = 'aws_default'[source]
region_name = None[source]
verify = None[source]
botocore_config = None[source]
serialize()[source]

返回重建此 Trigger 所需的信息。

返回值:

元组,包含(类路径,重新实例化所需的关键字参数)。

返回类型:

tuple[str, dict[str, Any]]

property hook: airflow.providers.amazon.aws.hooks.sqs.SqsHook[source]
async poll_sqs(client)[source]

异步轮询 SQS 队列以检索消息。

参数:

client (airflow.providers.amazon.aws.hooks.base_aws.BaseAwsConnection) – SQS 连接

返回值:

从 SQS 检索到的消息列表

返回类型:

collections.abc.Collection

async poke(client)[source]
async run()[source]

在异步上下文中运行 trigger。

当 trigger 需要触发事件时,应 yield 一个 Event,并在完成后返回 None。单事件 trigger 因此应 yield 然后立即返回。

如果它 yield 了,很可能会非常快地恢复执行,但也可能不会(例如,如果工作负载被移动到另一个 triggerer 进程,或者多事件 trigger 被用于单事件任务延迟)。

在任何情况下,Trigger 类应假定它们将被持久化,并在不再需要时依赖 cleanup() 被调用。

此条目有帮助吗?