airflow.providers.amazon.aws.sensors.sqs

从 SQS 队列中读取消息,然后删除它。

SqsSensor

从 Amazon SQS 队列获取消息,然后从队列中删除这些消息。

模块内容

class airflow.providers.amazon.aws.sensors.sqs.SqsSensor(*, sqs_queue, max_messages=5, num_batches=1, wait_time_seconds=1, visibility_timeout=None, message_filtering=None, message_filtering_match_values=None, message_filtering_config=None, delete_message_on_reception=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.providers.amazon.aws.sensors.base_aws.AwsBaseSensor[airflow.providers.amazon.aws.hooks.sqs.SqsHook]

从 Amazon SQS 队列获取消息,然后从队列中删除这些消息。

如果消息删除失败,则抛出 AirflowException。否则,消息将通过 XCom 推送,键为 messages

默认情况下,传感器每次 poke 只执行一次 SQS 调用,这将结果限制为最多 10 条消息。然而,每次 poke 的 SQS API 调用总数可以通过 num_batches 参数控制。

另请参阅

有关如何使用此传感器的更多信息,请查看指南:从 Amazon SQS 队列读取消息

参数:
  • sqs_queue – SQS 队列 URL (支持模板)

  • max_messages (int) – 每次 poke 检索的最大消息数 (支持模板)

  • num_batches (int) – 传感器调用 SQS API 接收消息的次数 (默认值: 1)

  • wait_time_seconds (int) – 等待接收消息的时间(秒)(默认值: 1 秒)

  • visibility_timeout (int | None) – 可见性超时,在此期间 Amazon SQS 会阻止其他消费者接收和处理该消息。

  • message_filtering (airflow.providers.amazon.aws.utils.sqs.MessageFilteringType | None) – 指定如何过滤收到的消息。支持的选项有:None (不过滤,默认值)、‘literal’ (消息体精确匹配)、‘jsonpath’ (使用 JSONPath 表达式过滤消息体),或 ‘jsonpath-ext’ (类似于 ‘jsonpath’,但带有扩展的查询语法)。您可以通过覆盖相关的类方法来添加更多方法。

  • message_filtering_match_values (Any) – 消息过滤器的可选匹配值。例如,对于精确匹配,如果消息体与任何指定的值匹配,则包含该消息。对于 JSONPath 匹配,使用 JSONPath 表达式的结果,如果与任何指定的值匹配,则包含该消息。

  • message_filtering_config (Any) – 传递给消息过滤器的附加配置。例如,对于 JSONPath 过滤,您可以在此处传递 JSONPath 表达式字符串,例如 ‘foo[*].baz’。消息体不匹配的消息将被忽略。

  • delete_message_on_reception (bool) – 默认为 True,消息一经消费即从队列中删除。否则,消息消费后仍保留在队列中,需要手动删除。

  • deferrable (bool) – 如果为 True,传感器将以可推迟模式运行。此模式需要安装 aiobotocore 模块。(默认值: False,但可以通过在配置文件中设置 default_deferrable 为 True 来覆盖)

  • aws_conn_id – 用于 AWS 凭据的 Airflow 连接。如果此参数为 None 或为空,则使用默认的 boto3 行为。如果 Airflow 以分布式方式运行且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(且必须在每个工作节点上维护)。

  • region_name – AWS region_name。如果未指定,则使用默认的 boto3 行为。

  • verify – 是否验证 SSL 证书。详见:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • botocore_config – botocore 客户端的配置字典(键值对)。详见:https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html

aws_hook_class[source]
template_fields: collections.abc.Sequence[str][source]
sqs_queue[source]
max_messages = 5[source]
num_batches = 1[source]
wait_time_seconds = 1[source]
visibility_timeout = None[source]
message_filtering = None[source]
delete_message_on_reception = True[source]
message_filtering_match_values = None[source]
message_filtering_config = None[source]
deferrable = True[source]
execute(context)[source]

创建操作符时派生。

执行任务的主要方法。Context 是渲染 jinja 模板时使用的相同字典。

有关更多上下文信息,请参阅 get_template_context。

execute_complete(context, event=None)[source]
poll_sqs(sqs_conn)[source]

轮询 SQS 队列以检索消息。

参数:

sqs_conn (airflow.providers.amazon.aws.hooks.base_aws.BaseAwsConnection) – SQS 连接

返回:

从 SQS 检索到的消息列表

返回类型:

collections.abc.Collection

poke(context)[source]

检查订阅队列中的消息,并使用 messages 键将其写入 xcom。

参数:

context (airflow.utils.context.Context) – 上下文对象

返回:

如果消息可用则为 True,否则为 False

此条目有帮助吗?