airflow.providers.amazon.aws.sensors.sqs

从 SQS 队列中读取消息,然后删除消息。

模块内容

SqsSensor

从 Amazon SQS 队列中获取消息,然后从队列中删除消息。

class airflow.providers.amazon.aws.sensors.sqs.SqsSensor(*, sqs_queue, max_messages=5, num_batches=1, wait_time_seconds=1, visibility_timeout=None, message_filtering=None, message_filtering_match_values=None, message_filtering_config=None, delete_message_on_reception=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源代码]

基类: airflow.providers.amazon.aws.sensors.base_aws.AwsBaseSensor[airflow.providers.amazon.aws.hooks.sqs.SqsHook]

从 Amazon SQS 队列中获取消息,然后从队列中删除消息。

如果删除消息失败,则会抛出 AirflowException。 否则,消息将通过 XCom 推送,键为 messages

默认情况下,传感器每次轮询只执行一次 SQS 调用,这会将结果限制为最多 10 条消息。 但是,每次轮询的 SQS API 调用总数可以通过 num_batches 参数来控制。

另请参阅

有关如何使用此传感器的更多信息,请查看以下指南: 从 Amazon SQS 队列读取消息

参数
  • sqs_queue – SQS 队列 URL(已模板化)

  • max_messages (int) – 每次轮询要检索的最大消息数(已模板化)

  • num_batches (int) – 传感器将调用 SQS API 以接收消息的次数(默认值:1)

  • wait_time_seconds (int) – 等待接收消息的时间(以秒为单位)(默认值:1 秒)

  • visibility_timeout (int | None) – 可见性超时,在此期间,Amazon SQS 会阻止其他使用者接收和处理消息。

  • message_filtering (airflow.providers.amazon.aws.utils.sqs.MessageFilteringType | None) – 指定应如何过滤接收到的消息。 支持的选项有:None(不进行过滤,默认值)、‘literal’(消息正文的字面匹配)、‘jsonpath’(使用 JSONPath 表达式过滤消息正文)或 ‘jsonpath-ext’(类似于 ‘jsonpath’,但具有扩展的查询语法)。 您可以通过覆盖相关的类方法来添加更多方法。

  • message_filtering_match_values (Any) – 用于消息过滤器匹配的可选值。 例如,对于字面匹配,如果消息正文与任何指定的值匹配,则会包含该消息。 对于 JSONPath 匹配,将使用 JSONPath 表达式的结果,并且可以与任何指定的值匹配。

  • message_filtering_config (Any) – 要传递给消息过滤器的其他配置。 例如,使用 JSONPath 过滤,您可以在此处传递 JSONPath 表达式字符串,例如 ‘foo[*].baz’。 将忽略正文不匹配的消息。

  • delete_message_on_reception (bool) – 默认为 True,消息在被使用后会立即从队列中删除。 否则,消息在被使用后仍保留在队列中,应手动删除。

  • deferrable (bool) – 如果为 True,则传感器将以可延迟模式运行。 此模式需要安装 aiobotocore 模块。(默认值:False,但可以通过在配置文件中将 default_deferrable 设置为 True 来覆盖)

  • aws_conn_id – 用于 AWS 凭据的 Airflow 连接。 如果此值为 None 或为空,则使用默认的 boto3 行为。 如果在分布式环境中运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

  • region_name – AWS region_name。 如果未指定,则使用默认的 boto3 行为。

  • verify – 是否验证 SSL 证书。 请参阅: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • botocore_config – botocore 客户端的配置字典(键值对)。 请参阅: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html

aws_hook_class[源代码]
template_fields: collections.abc.Sequence[str][源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event=None)[源代码]
poll_sqs(sqs_conn)[源代码]

轮询 SQS 队列以检索消息。

参数

sqs_conn (airflow.providers.amazon.aws.hooks.base_aws.BaseAwsConnection) – SQS 连接

返回

从 SQS 检索的消息列表

返回类型

collections.abc.Collection

poke(context)[来源]

检查订阅的队列是否有消息,并将消息写入 xcom,键为 messages

参数

context (airflow.utils.context.Context) – 上下文对象

返回

如果有消息可用则返回 True,否则返回 False

此条目是否有帮助?