airflow.providers.microsoft.azure.operators.asb

属性

MessageCallback

AzureServiceBusCreateQueueOperator

在 Service Bus 命名空间下创建一个 Azure Service Bus 队列。

AzureServiceBusSendMessageOperator

向 Service Bus 队列发送消息或批量消息。

AzureServiceBusReceiveMessageOperator

在指定的队列名称中一次接收一批消息。

AzureServiceBusDeleteQueueOperator

删除 Azure Service Bus 命名空间中的队列。

AzureServiceBusTopicCreateOperator

在 Service Bus 命名空间下创建一个 Azure Service Bus 主题。

AzureServiceBusSubscriptionCreateOperator

在 Service Bus 命名空间下创建一个 Azure Service Bus 主题订阅。

AzureServiceBusUpdateSubscriptionOperator

在 Service Bus 命名空间下更新 Azure ServiceBus 主题订阅。

ASBReceiveSubscriptionMessageOperator

从特定主题下的 Service Bus 订阅接收批量消息。

AzureServiceBusSubscriptionDeleteOperator

删除 Azure ServiceBus 命名空间中的主题订阅。

AzureServiceBusTopicDeleteOperator

删除 Azure Service Bus 命名空间中的主题。

模块内容

airflow.providers.microsoft.azure.operators.asb.MessageCallback[source]
class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusCreateQueueOperator(*, queue_name, max_delivery_count=10, dead_lettering_on_message_expiration=True, enable_batched_operations=True, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基类: airflow.models.BaseOperator

在 Service Bus 命名空间下创建一个 Azure Service Bus 队列。

另请参阅

有关如何使用此 Operator 的更多信息,请查看指南: 创建 Azure Service Bus 队列

参数:
  • queue_name (str) – 队列的名称。应是唯一的。

  • max_delivery_count (int) – 最大投递次数。消息在此次数投递后将自动进入死信队列。默认值为 10。

  • dead_lettering_on_message_expiration (bool) – 一个值,指示此订阅在消息过期时是否支持死信。

  • enable_batched_operations (bool) – 一个值,指示是否启用服务器端批处理操作。

  • azure_service_bus_conn_id (str) – 对 Azure Service Bus 连接 的引用。

template_fields: collections.abc.Sequence[str] = ('queue_name',)[source]
ui_color = '#e4f0e8'[source]
queue_name[source]
max_delivery_count = 10[source]
dead_lettering_on_message_expiration = True[source]
enable_batched_operations = True[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

通过连接到 hook 中的 Service Bus Admin 客户端,在 Azure Service Bus 命名空间中创建队列。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusSendMessageOperator(*, queue_name, message, batch=False, azure_service_bus_conn_id='azure_service_bus_default', message_id=None, reply_to=None, message_headers=None, **kwargs)[source]

基类: airflow.models.BaseOperator

向 Service Bus 队列发送消息或批量消息。

另请参阅

有关如何使用此 Operator 的更多信息,请查看指南: 向 Azure Service Bus 队列发送消息

参数:
  • queue_name (str) – 队列的名称。应是唯一的。

  • message (str | list[str]) – 需要发送到队列的消息。可以是字符串或字符串列表。

  • batch (bool) – 布尔标志,默认为 False,如果消息需要作为批处理消息发送,则可设置为 True。

  • azure_service_bus_conn_id (str) – 对 :ref: Azure Service Bus 连接<howto/connection:azure_service_bus> 的引用。

  • message_id (str | None) – 要设置在发送到队列的消息上的消息 ID。请注意,message_id 只能在发送单个消息时设置。

  • reply_to (str | None) – 接收者应回复的队列或主题名称。回复将发送到队列还是主题应在带外确定。

  • message_headers (dict[str | bytes, int | float | bytes | bool | str | uuid.UUID] | None) – 要添加到 Azure Service Bus 消息 application_properties 字段的标头。

template_fields: collections.abc.Sequence[str] = ('queue_name',)[source]
ui_color = '#e4f0e8'[source]
queue_name[source]
batch = False[source]
message[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
message_id = None[source]
reply_to = None[source]
message_headers = None[source]
execute(context)[source]

将消息发送到 Service Bus 命名空间中的特定队列。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusReceiveMessageOperator(*, queue_name, azure_service_bus_conn_id='azure_service_bus_default', max_message_count=10, max_wait_time=5, message_callback=None, **kwargs)[source]

基类: airflow.models.BaseOperator

在指定的队列名称中一次接收一批消息。

另请参阅

有关如何使用此 Operator 的更多信息,请查看指南: 接收 Azure Service Bus 队列消息

参数:
  • queue_name (str) – 队列名称的名称或带有名称的 QueueProperties。

  • max_message_count (int) – 批处理中的最大消息数。

  • max_wait_time (float) – 等待第一条消息到达的最大时间(以秒为单位)。

  • azure_service_bus_conn_id (str) – 对 :ref: Azure Service Bus 连接 <howto/connection:azure_service_bus> 的引用。

  • message_callback (MessageCallback | None) – 用于处理每条消息的可选回调。如果未提供,则消息将被记录并完成。如果提供,并且抛出异常,消息将被放弃以供将来重新投递。

template_fields: collections.abc.Sequence[str] = ('queue_name',)[source]
ui_color = '#e4f0e8'[source]
queue_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
max_message_count = 10[source]
max_wait_time = 5[source]
message_callback = None[source]
execute(context)[source]

通过连接到 Service Bus 客户端,在 Service Bus 命名空间中的特定队列中接收消息。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusDeleteQueueOperator(*, queue_name, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基类: airflow.models.BaseOperator

删除 Azure Service Bus 命名空间中的队列。

另请参阅

有关如何使用此 Operator 的更多信息,请查看指南: 删除 Azure Service Bus 队列

参数:
  • queue_name (str) – Service Bus 命名空间中的队列名称。

  • azure_service_bus_conn_id (str) – 对 :ref: Azure Service Bus 连接 <howto/connection:azure_service_bus> 的引用。

template_fields: collections.abc.Sequence[str] = ('queue_name',)[source]
ui_color = '#e4f0e8'[source]
queue_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

通过连接到 Service Bus Admin 客户端,删除 Service Bus 命名空间中的队列。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicCreateOperator(*, topic_name, azure_service_bus_conn_id='azure_service_bus_default', default_message_time_to_live=None, max_size_in_megabytes=None, requires_duplicate_detection=None, duplicate_detection_history_time_window=None, enable_batched_operations=None, size_in_bytes=None, filtering_messages_before_publishing=None, authorization_rules=None, support_ordering=None, auto_delete_on_idle=None, enable_partitioning=None, enable_express=None, user_metadata=None, max_message_size_in_kilobytes=None, **kwargs)[source]

基类: airflow.models.BaseOperator

在 Service Bus 命名空间下创建一个 Azure Service Bus 主题。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 创建 Azure Service Bus 主题

参数:
  • topic_name (str) – 主题的名称。

  • default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 默认消息存活时间值。这是消息过期前的持续时间,从消息发送到 Service Bus 时开始计算。当消息本身未设置 TimeToLive 时,使用此默认值。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)的字符串输入值。

  • max_size_in_megabytes (int | None) – 主题的最大大小(以兆字节为单位),这是为主题分配的内存大小。

  • requires_duplicate_detection (bool | None) – 一个值,指示此主题是否需要重复检测。

  • duplicate_detection_history_time_window (datetime.timedelta | str | None) – 定义重复检测历史记录持续时间的 ISO 8601 时间跨度结构。默认值为 10 分钟。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)的字符串输入值。

  • enable_batched_operations (bool | None) – 指示是否启用服务器端批处理操作的值。

  • size_in_bytes (int | None) – 主题的大小,以字节为单位。

  • filtering_messages_before_publishing (bool | None) – 在发布消息之前对其进行筛选。

  • authorization_rules (list[azure.servicebus.management.AuthorizationRule] | None) – 资源的授权规则列表。

  • support_ordering (bool | None) – 一个值,指示主题是否支持排序。

  • auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 时间跨度空闲间隔,之后主题将自动删除。最短持续时间为 5 分钟。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)的字符串输入值。

  • enable_partitioning (bool | None) – 一个值,指示主题是否将跨多个消息代理进行分区。

  • enable_express (bool | None) – 一个值,指示是否启用 Express 实体。快速队列在将消息写入持久存储之前会将其暂时保存在内存中。

  • user_metadata (str | None) – 与主题关联的元数据。

  • max_message_size_in_kilobytes (int | None) – 队列可接受的消息负载的最大大小(以千字节为单位)。此功能仅在使用高级命名空间和 Service Bus API 版本“2021-05”或更高版本时可用。允许的最小值为 1024,允许的最大值为 102400。默认值为 1024。

template_fields: collections.abc.Sequence[str] = ('topic_name',)[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
default_message_time_to_live = None[source]
max_size_in_megabytes = None[source]
requires_duplicate_detection = None[source]
duplicate_detection_history_time_window = None[source]
enable_batched_operations = None[source]
size_in_bytes = None[source]
filtering_messages_before_publishing = None[source]
authorization_rules = None[source]
support_ordering = None[source]
auto_delete_on_idle = None[source]
enable_partitioning = None[source]
enable_express = None[source]
user_metadata = None[source]
max_message_size_in_kilobytes = None[source]
execute(context)[source]

通过连接到 Service Bus Admin 客户端,在 Service Bus 命名空间中创建主题。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusSubscriptionCreateOperator(*, topic_name, subscription_name, azure_service_bus_conn_id='azure_service_bus_default', lock_duration=None, requires_session=None, default_message_time_to_live=None, dead_lettering_on_message_expiration=True, dead_lettering_on_filter_evaluation_exceptions=None, max_delivery_count=10, enable_batched_operations=True, forward_to=None, user_metadata=None, forward_dead_lettered_messages_to=None, auto_delete_on_idle=None, filter_rule=None, filter_rule_name=None, **kwargs)[source]

基类: airflow.models.BaseOperator

在 Service Bus 命名空间下创建一个 Azure Service Bus 主题订阅。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 创建 Azure Service Bus 订阅

参数:
  • topic_name (str) – 将拥有待创建订阅的主题。

  • subscription_name (str) – 待创建的订阅名称

  • lock_duration (datetime.timedelta | str | None) – ISO 8601 探查锁定(peek-lock)的时间跨度持续时间;即,消息被锁定以便其他接收方无法接收的时间量。LockDuration 的最大值为 5 分钟;默认值为 1 分钟。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)的字符串输入值。

  • requires_session (bool | None) – 一个值,指示订阅是否支持会话概念。

  • default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 默认消息存活时间值。这是消息过期前的持续时间,从消息发送到 Service Bus 时开始计算。当消息本身未设置 TimeToLive 时,使用此默认值。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)的字符串输入值。

  • dead_lettering_on_message_expiration (bool | None) – 一个值,指示当消息过期时,此订阅是否支持死信队列。

  • dead_lettering_on_filter_evaluation_exceptions (bool | None) – 一个值,指示在筛选器评估异常时此订阅是否支持死信队列。

  • max_delivery_count (int | None) – 最大投递次数。消息在此次数投递后会自动进入死信队列。默认值为 10。

  • enable_batched_operations (bool | None) – 指示是否启用服务器端批处理操作的值。

  • forward_to (str | None) – 所有发送到订阅的消息将被转发到的接收实体名称。

  • user_metadata (str | None) – 与订阅关联的元数据。最大字符数为 1024。

  • forward_dead_lettered_messages_to (str | None) – 订阅中的所有死信消息将被转发到的接收实体名称。

  • auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 时间跨度空闲间隔,之后订阅将自动删除。最短持续时间为 5 分钟。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)的字符串输入值。

  • filter_rule (azure.servicebus.management.CorrelationRuleFilter | azure.servicebus.management.SqlRuleFilter | None) – 可选的关联规则或 SQL 规则筛选器,应用于消息。

  • filter_rule_name (str | None) – 应用规则筛选器到订阅时使用的可选规则名称

  • azure_service_bus_conn_id (str) – 对 Azure Service Bus 连接 的引用。

template_fields: collections.abc.Sequence[str] = ('topic_name', 'subscription_name')[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
subscription_name[source]
lock_duration = None[source]
requires_session = None[source]
default_message_time_to_live = None[source]
dl_on_message_expiration = True[source]
dl_on_filter_evaluation_exceptions = None[source]
max_delivery_count = 10[source]
enable_batched_operations = True[source]
forward_to = None[source]
user_metadata = None[source]
forward_dead_lettered_messages_to = None[source]
auto_delete_on_idle = None[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
filter_rule = None[source]
filter_rule_name = None[source]
execute(context)[source]

通过连接到 Service Bus Admin 客户端,在 Service Bus 命名空间中创建订阅。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusUpdateSubscriptionOperator(*, topic_name, subscription_name, max_delivery_count=None, dead_lettering_on_message_expiration=None, enable_batched_operations=None, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基类: airflow.models.BaseOperator

在 Service Bus 命名空间下更新 Azure ServiceBus 主题订阅。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 更新 Azure Service Bus 订阅

参数:
  • topic_name (str) – 将拥有待创建订阅的主题。

  • subscription_name (str) – 需要更新的订阅名称。

  • max_delivery_count (int | None) – 最大投递次数。消息在此次数投递后会自动进入死信队列。默认值为 10。

  • dead_lettering_on_message_expiration (bool | None) – 一个值,指示当消息过期时,此订阅是否支持死信队列。

  • enable_batched_operations (bool | None) – 指示是否启用服务器端批处理操作的值。

  • azure_service_bus_conn_id (str) – 对 Azure Service Bus 连接 的引用。

template_fields: collections.abc.Sequence[str] = ('topic_name', 'subscription_name')[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
subscription_name[source]
max_delivery_count = None[source]
dl_on_message_expiration = None[source]
enable_batched_operations = None[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

通过连接到 Service Bus Admin 客户端更新订阅属性。

class airflow.providers.microsoft.azure.operators.asb.ASBReceiveSubscriptionMessageOperator(*, topic_name, subscription_name, max_message_count=1, max_wait_time=5, azure_service_bus_conn_id='azure_service_bus_default', message_callback=None, **kwargs)[source]

基类: airflow.models.BaseOperator

从特定主题下的 Service Bus 订阅接收批量消息。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:接收 Azure Service Bus 订阅消息

参数:
  • subscription_name (str) – 将拥有主题中规则的订阅名称

  • topic_name (str) – 将拥有订阅规则的主题。

  • max_message_count (int | None) – 批处理中消息的最大数量。实际返回数量将取决于 prefetch_count 和传入流速率。设置为 None 将完全依赖于预取配置。默认值为 1。

  • max_wait_time (float | None) – 等待第一条消息到达的最大秒数。如果没有消息到达且未指定超时,则此调用将不会返回,直到连接关闭。如果指定了超时但在此期间没有消息到达,将返回一个空列表。

  • azure_service_bus_conn_id (str) – 对 Azure Service Bus 连接 的引用。

  • message_callback (MessageCallback | None) – 用于处理每条消息的可选回调。如果未提供,则消息将被记录并完成。如果提供,并且抛出异常,消息将被放弃以供将来重新投递。

template_fields: collections.abc.Sequence[str] = ('topic_name', 'subscription_name')[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
subscription_name[source]
max_message_count = 1[source]
max_wait_time = 5[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
message_callback = None[source]
execute(context)[source]

通过连接到 Service Bus 客户端,在 Service Bus 命名空间中的特定队列中接收消息。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusSubscriptionDeleteOperator(*, topic_name, subscription_name, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基类: airflow.models.BaseOperator

删除 Azure ServiceBus 命名空间中的主题订阅。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:删除 Azure Service Bus 订阅

参数:
  • topic_name (str) – 将拥有待创建订阅的主题。

  • subscription_name (str) – 待创建的订阅名称

  • azure_service_bus_conn_id (str) – 对 Azure Service Bus 连接 的引用。

template_fields: collections.abc.Sequence[str] = ('topic_name', 'subscription_name')[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
subscription_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

通过连接到 Service Bus Admin 客户端,删除 Service Bus 命名空间中的主题订阅。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicDeleteOperator(*, topic_name, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基类: airflow.models.BaseOperator

删除 Azure Service Bus 命名空间中的主题。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:删除 Azure Service Bus 主题

参数:
template_fields: collections.abc.Sequence[str] = ('topic_name',)[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

通过连接到 Service Bus Admin 客户端,删除 Service Bus 命名空间中的主题。

此条目有帮助吗?