Amazon Simple Queue Service (SQS)

Amazon Simple Queue Service (SQS) 是一种完全托管的消息队列服务,使您能够解耦和扩展微服务、分布式系统和无服务器应用程序。SQS 消除了管理和操作面向消息的中间件相关的复杂性和开销,并使开发人员能够专注于差异化工作。使用 SQS,您可以在软件组件之间以任何量级发送、存储和接收消息,而不会丢失消息或要求其他服务可用。

先决任务

要使用这些 operator,您必须做一些事情

通用参数

aws_conn_id

Amazon Web Services 连接 ID 的引用。如果此参数设置为 None,则使用默认的 boto3 行为,不进行连接查找。否则使用连接中存储的凭据。默认值: aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 region_name。否则使用指定的值而不是连接值。默认值: None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果您想使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,可以指定此参数。

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 verify。否则使用指定的值而不是连接值。默认值: None

botocore_config

提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。

例如,有关参数的更多详细信息,请参阅 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则使用指定的值而不是连接值。默认值: None

注意

指定一个空字典 {} 将覆盖 botocore.config.Config 的连接配置。

Operators

向 Amazon SQS 队列发布消息

要向 Amazon SQS 队列发布消息,您可以使用 SqsPublishOperator

在以下示例中,任务 publish_to_queue 将包含任务实例和执行日期的消息发布到默认名称为 Airflow-Example-Queue 的队列中。

tests/system/amazon/aws/example_sqs.py

publish_to_queue_1 = SqsPublishOperator(
    task_id="publish_to_queue_1",
    sqs_queue=sqs_queue,
    message_content="{{ task_instance }}",
)
publish_to_queue_2 = SqsPublishOperator(
    task_id="publish_to_queue_2",
    sqs_queue=sqs_queue,
    message_content="{{ task_instance }}",
)

Sensors

从 Amazon SQS 队列读取消息

要从 Amazon SQS 队列读取消息直到耗尽,可以使用 SqsSensor。通过将 deferrable 参数设置为 True,此 sensor 也可以在可延迟模式下运行。

tests/system/amazon/aws/example_sqs.py

read_from_queue = SqsSensor(
    task_id="read_from_queue",
    sqs_queue=sqs_queue,
)
# Retrieve multiple batches of messages from SQS.
# The SQS API only returns a maximum of 10 messages per poll.
read_from_queue_in_batch = SqsSensor(
    task_id="read_from_queue_in_batch",
    sqs_queue=sqs_queue,
    # Get maximum 10 messages each poll
    max_messages=10,
    # Combine 3 polls before returning results
    num_batches=3,
)

参考

此条目有帮助吗?