airflow.providers.amazon.aws.triggers.sqs

模块内容

SqsSensorTrigger

从 Amazon SQS 队列异步获取消息,然后从队列中删除消息。

class airflow.providers.amazon.aws.triggers.sqs.SqsSensorTrigger(sqs_queue, aws_conn_id='aws_default', max_messages=5, num_batches=1, wait_time_seconds=1, visibility_timeout=None, message_filtering=None, message_filtering_match_values=None, message_filtering_config=None, delete_message_on_reception=True, waiter_delay=60, region_name=None, verify=None, botocore_config=None)[源代码]

基类:airflow.triggers.base.BaseTrigger

从 Amazon SQS 队列异步获取消息,然后从队列中删除消息。

参数
  • sqs_queue (str) – SQS 队列 URL

  • aws_conn_id (str | None) – AWS 连接 ID

  • max_messages (int) – 每次轮询检索的最大消息数(可使用模板)

  • num_batches (int) – 传感器将调用 SQS API 来接收消息的次数(默认值:1)

  • wait_time_seconds (int) – 等待接收消息的时间(以秒为单位)(默认值:1 秒)

  • visibility_timeout (int | None) – 可见性超时,在此期间,Amazon SQS 会阻止其他消费者接收和处理消息。

  • message_filtering (airflow.providers.amazon.aws.utils.sqs.MessageFilteringType | None) – 指定应如何过滤接收的消息。 支持的选项包括: None (不进行过滤,默认值)、 ‘literal’ (消息正文字符串匹配)或 ‘jsonpath’ (使用 JSONPath 表达式过滤消息正文)。 您可以通过重写相关的类方法来添加更多方法。

  • message_filtering_match_values (Any) – 消息过滤器要匹配的可选值。 例如,对于文字匹配,如果消息正文匹配任何指定的值,则将其包含在内。 对于 JSONPath 匹配,将使用 JSONPath 表达式的结果,并且可以匹配任何指定的值。

  • message_filtering_config (Any) – 要传递给消息过滤器的其他配置。 例如,使用 JSONPath 筛选时,可以在此处传递 JSONPath 表达式字符串,例如 ‘foo[*].baz’ 。 不匹配的消息正文将被忽略。

  • delete_message_on_reception (bool) – 默认值为 True,消息在被使用后会立即从队列中删除。 否则,消息会在使用后保留在队列中,应手动删除。

  • waiter_delay (int) – 调用 SQS API 来接收消息之间的等待时间(以秒为单位)。

property hook: airflow.providers.amazon.aws.hooks.sqs.SqsHook[源代码]
serialize()[源代码]

返回重建此触发器所需的信息。

返回

元组(类路径,重新实例化所需的关键字参数)。

返回类型

tuple[str, dict[str, Any]]

async poll_sqs(client)[源代码]

异步轮询 SQS 队列以检索消息。

参数

client (airflow.providers.amazon.aws.hooks.base_aws.BaseAwsConnection) – SQS 连接

返回

从 SQS 检索的消息列表

返回类型

collections.abc.Collection

async poke(client)[源代码]
async run()[源代码]

在异步上下文中运行触发器。

触发器应在想要触发事件时生成一个 Event,如果已完成,则返回 None。因此,单事件触发器应生成,然后立即返回。

如果它生成,则很可能会很快恢复,但也有可能不会恢复(例如,如果工作负载正在转移到另一个触发器进程,或者多事件触发器用于单事件任务延迟)。

在任何一种情况下,Trigger 类都应假设它们将被持久化,然后依赖于在不再需要它们时调用 cleanup()。

此条目是否有帮助?