airflow.providers.google.cloud.operators.pubsub

此模块包含 Google PubSub 操作符。

PubSubCreateTopicOperator

创建 PubSub 主题。

PubSubCreateSubscriptionOperator

创建 PubSub 订阅。

PubSubDeleteTopicOperator

删除 PubSub 主题。

PubSubDeleteSubscriptionOperator

删除 PubSub 订阅。

PubSubPublishMessageOperator

向 PubSub 主题发布消息。

PubSubPullOperator

从 PubSub 订阅拉取消息并通过 XCom 传递。

模块内容

class airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_exists=False, gcp_conn_id='google_cloud_default', labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[源码]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

创建 PubSub 主题。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 创建 PubSub 主题

默认情况下,如果主题已存在,此操作符不会导致 DAG 失败。

with DAG("successful DAG") as dag:
    create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic")
    create_topic_again = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic")

    create_topic >> create_topic_again

可以将此操作符配置为在主题已存在时失败。

with DAG("failing DAG") as dag:
    create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic")
    create_topic_again = PubSubCreateTopicOperator(
        project_id="my-project", topic="my_new_topic", fail_if_exists=True
    )

    create_topic >> create_topic_again

project_idtopic 均支持模板化,因此您可以在其值中使用 Jinja 模板。

参数:
  • project_id (str) – 可选,将创建主题的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • topic (str) – 要创建的主题。不要包含完整的主题路径。换句话说,不要使用 projects/{project}/topics/{topic},只需提供 {topic}。(支持模板化)

  • gcp_conn_id (str) – 连接到 Google Cloud 时使用的连接 ID。

  • labels (dict[str, str] | None) – 客户端分配的标签;请参阅 https://cloud.google.com/pubsub/docs/labels

  • message_storage_policy (dict | google.cloud.pubsub_v1.types.MessageStoragePolicy) – 限制发布到主题的消息可以存储在哪些 Google Cloud 区域的策略。如果不存在,则没有约束生效。Union[dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]

  • kms_key_name (str | None) – 用于保护对发布到此主题的消息的访问的 Cloud KMS CryptoKey 的资源名称。预期格式为 projects/*/locations/*/keyRings/*/cryptoKeys/*

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,则不会重试请求。

  • timeout (float | None) – (可选)等待请求完成的时间(秒)。请注意,如果指定了 retry,则超时应用于每次单独的尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选)要使用短期凭据模拟的可选服务账号,或是获取列表中最后一个账号(将在请求中被模拟)的 access_token 所需的账号链式列表。如果设置为字符串,则该账号必须向原始账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号则向原始账号授予此角色。(支持模板化)

template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[源码]
ui_color = '#0273d4'[源码]
project_id = None[源码]
topic[源码]
fail_if_exists = False[源码]
gcp_conn_id = 'google_cloud_default'[源码]
labels = None[源码]
message_storage_policy = None[源码]
kms_key_name = None[源码]
schema_settings = None[源码]
message_retention_duration = None[源码]
retry[源码]
timeout = None[源码]
metadata = ()[源码]
impersonation_chain = None[源码]
execute(context)[源码]

创建操作符时派生。

Context 是与渲染 jinja 模板时使用的相同的字典。

有关更多 context,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator(*, topic, project_id=PROVIDE_PROJECT_ID, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, gcp_conn_id='google_cloud_default', push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[源码]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

创建 PubSub 订阅。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 创建 PubSub 订阅

默认情况下,订阅将在 project_id 中创建。如果指定了 subscription_project_id 并且 Google Cloud 凭据允许,则可以在与其主题不同的项目中创建订阅。

默认情况下,如果订阅已存在,此操作符不会导致 DAG 失败。但是,主题必须存在于项目中。

with DAG("successful DAG") as dag:
    create_subscription = PubSubCreateSubscriptionOperator(
        project_id="my-project", topic="my-topic", subscription="my-subscription"
    )
    create_subscription_again = PubSubCreateSubscriptionOperator(
        project_id="my-project", topic="my-topic", subscription="my-subscription"
    )

    create_subscription >> create_subscription_again

可以将此操作符配置为在订阅已存在时失败。

with DAG("failing DAG") as dag:
    create_subscription = PubSubCreateSubscriptionOperator(
        project_id="my-project", topic="my-topic", subscription="my-subscription"
    )
    create_subscription_again = PubSubCreateSubscriptionOperator(
        project_id="my-project", topic="my-topic", subscription="my-subscription", fail_if_exists=True
    )

    create_subscription >> create_subscription_again

最后,subscription 不是必需的。如果未传递,操作符将使用 uuid 模块为订阅的名称生成一个通用唯一标识符。

with DAG("DAG") as dag:
    PubSubCreateSubscriptionOperator(project_id="my-project", topic="my-topic")

project_idtopicsubscriptionsubscription_project_idimpersonation_chain 均支持模板化,因此您可以在其值中使用 Jinja 模板。

参数:
  • project_id (str) – 可选,主题所在的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • topic (str) – 要创建的主题。不要包含完整的主题路径。换句话说,不要使用 projects/{project}/topics/{topic},只需提供 {topic}。(支持模板化)

  • subscription (str | None) – Pub/Sub 订阅名称。如果为空,将使用 uuid 模块生成一个随机名称

  • subscription_project_id (str | None) – 将创建订阅的 Google Cloud 项目 ID。如果为空,将使用 topic_project

  • ack_deadline_secs (int) – 订阅者确认从订阅拉取的每条消息的时间(秒)

  • gcp_conn_id (str) – 连接到 Google Cloud 时使用的连接 ID。

  • push_config (dict | google.cloud.pubsub_v1.types.PushConfig | None) – 如果此订阅使用推送交付(push delivery),则此字段用于配置。空的 pushConfig 表示订阅者将使用 API 方法拉取和确认消息。

  • retain_acked_messages (bool | None) – 指示是否保留已确认的消息。如果为 true,即使消息已被确认,它们也不会从订阅的积压队列中清除,直到它们超出 message_retention_duration 窗口。如果您想 Seek 到某个时间戳,则此值必须为 true。

  • message_retention_duration (dict | google.cloud.pubsub_v1.types.Duration | None) – 从消息发布时刻起,在订阅的积压队列中保留未确认消息的时间。如果 retain_acked_messages 为 true,则这也配置了已确认消息的保留时间,因此配置了可以回溯进行 Seek 的时间长度。默认为 7 天。不能超过 7 天或少于 10 分钟。

  • labels (dict[str, str] | None) – 客户端分配的标签;请参阅 https://cloud.google.com/pubsub/docs/labels

  • enable_message_ordering (bool) – 如果为 true,则使用相同 ordering_key 发布在 PubsubMessage 中的消息将按照 Pub/Sub 系统接收它们的顺序传递给订阅者。否则,它们可能以任何顺序传递。

  • expiration_policy (dict | google.cloud.pubsub_v1.types.ExpirationPolicy | None) – 指定此订阅过期条件的策略。只要任何连接的订阅者成功使用订阅中的消息或对订阅执行操作,订阅就被视为活动状态。如果未设置 expiration_policy,则将使用 ttl 为 31 天的默认策略。expiration_policy.ttl 允许的最小值是 1 天。

  • filter – 使用 Cloud Pub/Sub 过滤语言编写的表达式。如果非空,则只有 attributes 字段与过滤器匹配的 PubsubMessages 会传递到此订阅。如果为空,则不过滤任何消息。

  • dead_letter_policy (dict | google.cloud.pubsub_v1.types.DeadLetterPolicy | None) – 指定此订阅中死信消息条件的策略。如果未设置 dead_letter_policy,则禁用死信功能。

  • retry_policy (dict | google.cloud.pubsub_v1.types.RetryPolicy | None) – 指定 Pub/Sub 如何为此订阅重试消息传递的策略。如果未设置,则应用默认重试策略。这通常意味着对于健康的订阅者,消息将尽快重试。RetryPolicy 将在给定消息的 NACKs 或确认截止日期超时事件时触发。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,则不会重试请求。

  • timeout (float | None) – (可选)等待请求完成的时间(秒)。请注意,如果指定了 retry,则超时应用于每次单独的尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选)要使用短期凭据模拟的可选服务账号,或是获取列表中最后一个账号(将在请求中被模拟)的 access_token 所需的账号链式列表。如果设置为字符串,则该账号必须向原始账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号则向原始账号授予此角色。(支持模板化)

template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'subscription', 'subscription_project_id', 'impersonation_chain')[源码]
ui_color = '#0273d4'[源码]
project_id = None[源码]
topic[源码]
subscription = None[源码]
subscription_project_id = None[源码]
ack_deadline_secs = 10[源码]
fail_if_exists = False[源码]
gcp_conn_id = 'google_cloud_default'[源码]
push_config = None[源码]
retain_acked_messages = None[源码]
message_retention_duration = None[源码]
labels = None[源码]
enable_message_ordering = False[源码]
expiration_policy = None[源码]
filter_ = None[源码]
dead_letter_policy = None[源码]
retry_policy = None[source]
retry[source]
timeout = None[source]
metadata = ()[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 是与渲染 jinja 模板时使用的相同的字典。

有关更多 context,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

删除 PubSub 主题。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 删除 Pub/Sub 主题

默认情况下,如果主题不存在,此操作符不会导致 DAG 失败。

with DAG("successful DAG") as dag:
    PubSubDeleteTopicOperator(project_id="my-project", topic="non_existing_topic")

可以配置此操作符,使其在主题不存在时失败。

with DAG("failing DAG") as dag:
    PubSubDeleteTopicOperator(
        project_id="my-project",
        topic="non_existing_topic",
        fail_if_not_exists=True,
    )

project_idtopic 均支持模板化,因此您可以在其值中使用 Jinja 模板。

参数:
  • project_id (str) – 可选,进行操作的 Google Cloud 项目 ID(支持模板化)。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • topic (str) – 要删除的主题。不要包含完整的主题路径。换句话说,不要使用 projects/{project}/topics/{topic},而只提供 {topic}。(支持模板化)

  • fail_if_not_exists (bool) – 如果为 True 且主题不存在,则任务将失败

  • gcp_conn_id (str) – 连接到 Google Cloud 时使用的连接 ID。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,则不会重试请求。

  • timeout (float | None) – (可选)等待请求完成的时间(秒)。请注意,如果指定了 retry,则超时应用于每次单独的尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选)要使用短期凭据模拟的可选服务账号,或是获取列表中最后一个账号(将在请求中被模拟)的 access_token 所需的账号链式列表。如果设置为字符串,则该账号必须向原始账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号则向原始账号授予此角色。(支持模板化)

template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[source]
ui_color = '#cb4335'[source]
project_id = None[source]
topic[source]
fail_if_not_exists = False[source]
gcp_conn_id = 'google_cloud_default'[source]
retry[source]
timeout = None[source]
metadata = ()[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 是与渲染 jinja 模板时使用的相同的字典。

有关更多 context,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator(*, subscription, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

删除 PubSub 订阅。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 删除 Pub/Sub 订阅

默认情况下,如果订阅不存在,此操作符不会导致 DAG 失败。

with DAG("successful DAG") as dag:
    PubSubDeleteSubscriptionOperator(project_id="my-project", subscription="non-existing")

可以将此操作符配置为在订阅已存在时失败。

with DAG("failing DAG") as dag:
    PubSubDeleteSubscriptionOperator(
        project_id="my-project",
        subscription="non-existing",
        fail_if_not_exists=True,
    )

project_idsubscription 支持模板化,因此您可以在其值中使用 Jinja 模板。

参数:
  • project_id (str) – 可选,进行操作的 Google Cloud 项目 ID(支持模板化)。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • subscription (str) – 要删除的订阅。不要包含完整的订阅路径。换句话说,不要使用 projects/{project}/subscription/{subscription},而只提供 {subscription}。(支持模板化)

  • fail_if_not_exists (bool) – 如果为 True 且订阅不存在,则任务将失败

  • gcp_conn_id (str) – 连接到 Google Cloud 时使用的连接 ID。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可选)用于重试请求的重试对象。如果指定 None,则不会重试请求。

  • timeout (float | None) – (可选)等待请求完成的时间(秒)。请注意,如果指定了 retry,则超时应用于每次单独的尝试。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可选)提供给方法的额外元数据。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选)要使用短期凭据模拟的可选服务账号,或是获取列表中最后一个账号(将在请求中被模拟)的 access_token 所需的账号链式列表。如果设置为字符串,则该账号必须向原始账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号则向原始账号授予此角色。(支持模板化)

template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[source]
ui_color = '#cb4335'[source]
project_id = None[source]
subscription[source]
fail_if_not_exists = False[source]
gcp_conn_id = 'google_cloud_default'[source]
retry[source]
timeout = None[source]
metadata = ()[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 是与渲染 jinja 模板时使用的相同的字典。

有关更多 context,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator(*, topic, messages, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', enable_message_ordering=False, impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

向 PubSub 主题发布消息。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 发布 Pub/Sub 消息

每个任务会将所有提供的消息发布到同一 Google Cloud 项目中的同一主题。如果主题不存在,此任务将失败。

m1 = {"data": b"Hello, World!", "attributes": {"type": "greeting"}}
m2 = {"data": b"Knock, knock"}
m3 = {"attributes": {"foo": ""}}
m4 = {"data": b"Who's there?", "attributes": {"ordering_key": "knock_knock"}}

t1 = PubSubPublishMessageOperator(
    project_id="my-project",
    topic="my_topic",
    messages=[m1, m2, m3],
    create_topic=True,
    dag=dag,
)

t2 = PubSubPublishMessageOperator(
    project_id="my-project",
    topic="my_topic",
    messages=[m4],
    create_topic=True,
    enable_message_ordering=True,
    dag=dag,
)

project_idtopicmessages 支持模板化,因此您可以在其值中使用 Jinja 模板。

参数:
  • project_id (str) – 可选,进行操作的 Google Cloud 项目 ID(支持模板化)。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • topic (str) – 要发布到的主题。不要包含完整的主题路径。换句话说,不要使用 projects/{project}/topics/{topic},而只提供 {topic}。(支持模板化)

  • messages (list) – 要发布到主题的消息列表。每条消息都是一个字典,包含以下一个或多个键值映射:* ‘data’: 一个字节字符串 (utf-8 编码) * ‘attributes’: {‘key1’: ‘value1’, …} 每条消息必须至少包含一个非空的 ‘data’ 值或一个包含至少一个键的属性字典(支持模板化)。请参阅 https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

  • gcp_conn_id (str) – 连接到 Google Cloud 时使用的连接 ID。

  • enable_message_ordering (bool) – 如果为 True,则使用相同的 ordering_key 在 PubsubMessage 中发布的消息将按照 Pub/Sub 系统接收到的顺序传递给订阅者。否则,它们可能以任何顺序传递。默认为 False。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选)要使用短期凭据模拟的可选服务账号,或是获取列表中最后一个账号(将在请求中被模拟)的 access_token 所需的账号链式列表。如果设置为字符串,则该账号必须向原始账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号则向原始账号授予此角色。(支持模板化)

template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'messages', 'enable_message_ordering', 'impersonation_chain')[source]
ui_color = '#0273d4'[source]
project_id = None[source]
topic[source]
messages[source]
gcp_conn_id = 'google_cloud_default'[source]
enable_message_ordering = False[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 是与渲染 jinja 模板时使用的相同的字典。

有关更多 context,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator(*, project_id, subscription, max_messages=5, ack_messages=False, messages_callback=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, deferrable=False, poll_interval=300, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

从 PubSub 订阅拉取消息并通过 XCom 传递。

如果队列为空,返回空列表 - 从不等待消息。如果您确实需要等待,请改用 airflow.providers.google.cloud.sensors.PubSubPullSensor

另请参阅

有关如何使用此操作符和 PubSubPullSensor 的更多信息,请参阅指南: 从 Pub/Sub 订阅拉取消息

此操作符将从指定的 Pub/Sub 订阅中拉取最多 max_messages 条消息。当订阅返回消息时,操作符将立即返回这些消息,并通过 XCom 传递给下游任务。

如果 ack_messages 设置为 True,消息将在返回之前立即被确认,否则下游任务将负责确认它们。

project_idsubscription 支持模板化,因此您可以在其值中使用 Jinja 模板。

参数:
  • project_id (str) – 订阅所在的 Google Cloud 项目 ID(支持模板化)

  • subscription (str) – Pub/Sub 订阅名称。不要包含完整的订阅路径。

  • max_messages (int) – 每次 Pub/Sub 拉取请求检索的最大消息数量

  • ack_messages (bool) – 如果为 True,每条消息将立即被确认,而不是由任何下游任务确认

  • gcp_conn_id (str) – 连接到 Google Cloud 时使用的连接 ID。

  • messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (可选)用于处理接收到的消息的回调。其返回值将保存到 XCom。如果您正在拉取大量消息,您可能需要提供一个自定义回调。如果未提供,默认实现将使用 google.protobuf.json_format.MessageToDict 函数将 ReceivedMessage 对象转换为 JSON 可序列化的字典。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可选)要使用短期凭据模拟的可选服务账号,或是获取列表中最后一个账号(将在请求中被模拟)的 access_token 所需的账号链式列表。如果设置为字符串,则该账号必须向原始账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号则向原始账号授予此角色。(支持模板化)

  • deferrable (bool) – 如果为 True,在可推迟模式下运行任务。

  • poll_interval (int) – 等待检查任务的两次连续调用之间的时间(秒)。默认值为 300 秒。

template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[source]
gcp_conn_id = 'google_cloud_default'[source]
project_id[source]
subscription[source]
max_messages = 5[source]
ack_messages = False[source]
messages_callback = None[source]
impersonation_chain = None[source]
deferrable = False[source]
poll_interval = 300[source]
execute(context)[source]

创建操作符时派生。

Context 是与渲染 jinja 模板时使用的相同的字典。

有关更多 context,请参阅 get_template_context。

execute_complete(context, event)[source]

如果提供了 messages_callback,则执行它;否则,立即返回触发事件消息。

此条目有帮助吗?