airflow.providers.microsoft.azure.hooks.asb

属性

MessageCallback

BaseAzureServiceBusHook

BaseAzureServiceBusHook 类,用于创建会话并使用连接字符串创建连接。

AdminClientHook

与 ServiceBusAdministrationClient 交互。

MessageHook

与 ServiceBusClient 交互。

模块内容

airflow.providers.microsoft.azure.hooks.asb.MessageCallback[source]
class airflow.providers.microsoft.azure.hooks.asb.BaseAzureServiceBusHook(azure_service_bus_conn_id=default_conn_name)[source]

基类: airflow.hooks.base.BaseHook

BaseAzureServiceBusHook 类,用于创建会话并使用连接字符串创建连接。

参数::

azure_service_bus_conn_id (str) – 引用 Azure Service Bus 连接

conn_name_attr = 'azure_service_bus_conn_id'[source]
default_conn_name = 'azure_service_bus_default'[source]
conn_type = 'azure_service_bus'[source]
hook_name = 'Azure Service Bus'[source]
classmethod get_connection_form_widgets()[source]

返回要添加到连接表单的连接小部件。

classmethod get_ui_field_behaviour()[source]

返回自定义字段行为。

conn_id = 'azure_service_bus_default'[source]
abstract get_conn()[source]

返回 hook 的连接。

class airflow.providers.microsoft.azure.hooks.asb.AdminClientHook(azure_service_bus_conn_id=default_conn_name)[source]

基类: BaseAzureServiceBusHook

与 ServiceBusAdministrationClient 交互。

这可以创建、更新、列出和删除 Service Bus 命名空间的资源。此 hook 使用从基类继承的相同 Azure Service Bus 客户端连接。

get_conn()[source]

创建 ServiceBusAdministrationClient 实例。

这使用连接详细信息中的连接字符串。

create_queue(queue_name, max_delivery_count=10, dead_lettering_on_message_expiration=True, enable_batched_operations=True)[source]

通过连接到 Service Bus 管理客户端创建队列,并返回 QueueProperties。

参数::
  • queue_name (str) – 队列的名称或具有名称的 QueueProperties。

  • max_delivery_count (int) – 最大传递计数。消息在此传递次数后会自动进入死信队列。默认值为 10。

  • dead_lettering_on_message_expiration (bool) – 一个值,指示此订阅在消息到期时是否支持死信。

  • enable_batched_operations (bool) – 指示是否启用服务器端批处理操作的值。

delete_queue(queue_name)[source]

通过 service bus 命名空间中的 queue_name 删除队列。

参数::

queue_name (str) – 队列的名称或具有名称的 QueueProperties。

create_topic(topic_name, azure_service_bus_conn_id='azure_service_bus_default', default_message_time_to_live=None, max_size_in_megabytes=None, requires_duplicate_detection=None, duplicate_detection_history_time_window=None, enable_batched_operations=None, size_in_bytes=None, filtering_messages_before_publishing=None, authorization_rules=None, support_ordering=None, auto_delete_on_idle=None, enable_partitioning=None, enable_express=None, user_metadata=None, max_message_size_in_kilobytes=None)[source]

通过连接到 Service Bus 管理客户端创建主题。

参数::
  • topic_name (str) – 主题名称。

  • default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 默认消息生存时间跨度值。这是消息发送到 Service Bus 后,消息到期前的持续时间。当消息本身未设置 TimeToLive 时,这是使用的默认值。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)字符串作为输入值。

  • max_size_in_megabytes (int | None) – 主题的最大大小(以兆字节为单位),即为主题分配的内存大小。

  • requires_duplicate_detection (bool | None) – 一个值,指示此主题是否需要重复检测。

  • duplicate_detection_history_time_window (datetime.timedelta | str | None) – ISO 8601 时间跨度结构,定义重复检测历史记录的持续时间。默认值为 10 分钟。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)字符串作为输入值。

  • enable_batched_operations (bool | None) – 指示是否启用服务器端批处理操作的值。

  • size_in_bytes (int | None) – 主题的大小,以字节为单位。

  • filtering_messages_before_publishing (bool | None) – 在发布前过滤消息。

  • authorization_rules (list[azure.servicebus.management.AuthorizationRule] | None) – 资源的授权规则列表。

  • support_ordering (bool | None) – 一个值,指示主题是否支持排序。

  • auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 时间跨度空闲间隔,主题在此间隔后会自动删除。最小持续时间为 5 分钟。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)字符串作为输入值。

  • enable_partitioning (bool | None) – 一个值,指示主题是否跨多个消息代理进行分区。

  • enable_express (bool | None) – 一个值,指示是否启用了 Express 实体。Express 队列在将消息写入持久存储之前,会暂时将其保存在内存中。

  • user_metadata (str | None) – 与主题关联的元数据。

  • max_message_size_in_kilobytes (int | None) – 队列可以接受的消息负载的最大大小(以千字节为单位)。此功能仅在使用高级命名空间和 Service Bus API 版本“2021-05”或更高版本时可用。允许的最小值为 1024,最大允许值为 102400。默认值为 1024。

create_subscription(topic_name, subscription_name, lock_duration=None, requires_session=None, default_message_time_to_live=None, dead_lettering_on_message_expiration=True, dead_lettering_on_filter_evaluation_exceptions=None, max_delivery_count=10, enable_batched_operations=True, forward_to=None, user_metadata=None, forward_dead_lettered_messages_to=None, auto_delete_on_idle=None, filter_rule=None, filter_rule_name=None)[source]

在主题上创建具有指定名称的订阅,并返回其 SubscriptionProperties。

可以提供可选的 filter_rule,根据消息的属性过滤消息。特别是,可以使用关联 ID 过滤器将回复与请求配对。

参数::
  • topic_name (str) – 将拥有待创建订阅的主题。

  • subscription_name (str) – 需要创建的订阅名称

  • lock_duration (datetime.timedelta | str | None) – ISO 8601 窥视锁(peek-lock)的时间跨度持续时间;即消息被锁定以供其他接收者使用的时间量。LockDuration 的最大值为 5 分钟;默认值为 1 分钟。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)字符串作为输入值。

  • requires_session (bool | None) – 一个值,指示队列是否支持会话概念。

  • default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 默认消息生存时间跨度值。这是消息发送到 Service Bus 后,消息到期前的持续时间。当消息本身未设置 TimeToLive 时,这是使用的默认值。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)字符串作为输入值。

  • dead_lettering_on_message_expiration (bool | None) – 一个值,指示此订阅在消息到期时是否支持死信。

  • dead_lettering_on_filter_evaluation_exceptions (bool | None) – 一个值,指示此订阅在消息到期时是否支持死信。

  • max_delivery_count (int | None) – 最大传递计数。消息在此传递次数后会自动进入死信队列。默认值为 10。

  • enable_batched_operations (bool | None) – 指示是否启用服务器端批处理操作的值。

  • forward_to (str | None) – 接收实体名称,发送到此订阅的所有消息都将转发到该实体。

  • user_metadata (str | None) – 与订阅关联的元数据。最大字符数为 1024。

  • forward_dead_lettered_messages_to (str | None) – 接收实体名称,发送到此订阅的所有消息都将转发到该实体。

  • auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 时间跨度空闲间隔,订阅在此间隔后会自动删除。最小持续时间为 5 分钟。接受 ~datetime.timedelta 类型或 ISO 8601 持续时间格式(如“PT300S”)字符串作为输入值。

  • filter_rule (azure.servicebus.management.CorrelationRuleFilter | azure.servicebus.management.SqlRuleFilter | None) – 应用于消息的可选关联或 SQL 规则过滤器。

  • filter_rule_name (str | None) – 应用于订阅的规则过滤器的可选规则名称

  • azure_service_bus_conn_id – 引用 Azure Service Bus 连接

update_subscription(topic_name, subscription_name, max_delivery_count=None, dead_lettering_on_message_expiration=None, enable_batched_operations=None)[source]

更新 ServiceBus 命名空间下的 Azure ServiceBus 主题订阅。

参数::
  • topic_name (str) – 将拥有待创建订阅的主题。

  • subscription_name (str) – 需要创建的订阅名称。

  • max_delivery_count (int | None) – 最大传递计数。消息在此传递次数后会自动进入死信队列。默认值为 10。

  • dead_lettering_on_message_expiration (bool | None) – 一个值,指示此订阅在消息到期时是否支持死信。

  • enable_batched_operations (bool | None) – 指示是否启用服务器端批处理操作的值。

delete_subscription(subscription_name, topic_name)[source]

删除 ServiceBus 命名空间下的主题订阅实体。

参数::
  • subscription_name (str) – 将在主题中拥有规则的订阅名称

  • topic_name (str) – 将拥有订阅规则的主题。

class airflow.providers.microsoft.azure.hooks.asb.MessageHook(azure_service_bus_conn_id=default_conn_name)[source]

基类: BaseAzureServiceBusHook

与 ServiceBusClient 交互。

这作为获取 ServiceBusSender 和 ServiceBusReceiver 的高级接口。

get_conn()[source]

通过使用连接详细信息中的连接字符串创建并返回 ServiceBusClient。

send_message(queue_name, messages, batch_message_flag=False, message_id=None, reply_to=None, message_headers=None)[source]

使用 ServiceBusClient Send 将消息发送到一个或多个 Service Bus 队列。

通过使用 batch_message_flag,它启用并以批量消息发送消息。

参数::
  • queue_name (str) – 队列的名称或具有名称的 QueueProperties。

  • messages (str | list[str]) – 需要发送到队列的消息。可以是字符串或字符串列表。

  • batch_message_flag (bool) – 布尔标志,如果消息需要作为批量消息发送,则可以设置为 True。

  • message_id (str | None) – 设置在发送到队列的消息上的消息 ID。请注意,message_id 只能在发送单条消息时设置。

  • reply_to (str | None) – 需要发送到队列的回复目标。

  • message_headers (dict[str | bytes, int | float | bytes | bool | str | uuid.UUID] | None) – 要添加到 Azure Service Bus 消息的 application_properties 字段的头部信息。

static send_list_messages(sender, messages, message_creator)[source]
static send_batch_message(sender, messages, message_creator)[source]
receive_message(queue_name, context, max_message_count=1, max_wait_time=None, message_callback=None)[source]

在指定的队列名称中一次接收一批消息。

参数::
  • queue_name (str) – 队列的名称或包含名称的 QueueProperties 对象。

  • max_message_count (int | None) – 批次中消息的最大数量。

  • max_wait_time (float | None) – 等待第一条消息到达的最大时间(秒)。

  • message_callback (MessageCallback | None) – 用于处理每条消息的可选回调函数。如果未提供,消息将被记录并完成处理。如果提供且抛出异常,消息将被放弃以便将来重新投递。

receive_subscription_message(topic_name, subscription_name, context, max_message_count, max_wait_time, message_callback=None)[source]

一次接收一批订阅消息。

如果您希望同时处理多条消息,或者作为一次调用执行即时(ad-hoc)接收,此方法是最佳的。

参数::
  • subscription_name (str) – 将在主题中拥有规则的订阅名称

  • topic_name (str) – 将拥有订阅规则的主题。

  • max_message_count (int | None) – 批次中消息的最大数量。实际返回数量取决于 prefetch_count 和传入流速率。设置为 None 将完全取决于预取(prefetch)配置。默认值为 1。

  • max_wait_time (float | None) – 等待第一条消息到达的最大时间(秒)。如果没有消息到达且未指定超时,此调用将不会返回,直到连接关闭。如果指定了超时,并且在超时期限内没有消息到达,将返回一个空列表。

此条目有帮助吗?