Google Cloud Stackdriver 操作符

先决条件任务

要使用这些操作符,您必须完成以下几项操作:

StackdriverListAlertPoliciesOperator

使用 StackdriverListAlertPoliciesOperator 来获取由给定过滤器标识的所有提醒政策。

使用操作符

您可以使用此操作符(带或不带项目 ID)来获取所有提醒政策。如果缺少项目 ID,则将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

list_alert_policies = StackdriverListAlertPoliciesOperator(
    task_id="list-alert-policies",
)

StackdriverEnableAlertPoliciesOperator

使用 StackdriverEnableAlertPoliciesOperator 来启用由给定过滤器标识的提醒政策。

使用操作符

您可以使用此操作符(带或不带项目 ID)来获取所有提醒政策。如果缺少项目 ID,则将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

enable_alert_policy = StackdriverEnableAlertPoliciesOperator(
    task_id="enable-alert-policies",
    filter_=f'(displayName="{ALERT_1_NAME}" OR displayName="{ALERT_2_NAME}")',
)

StackdriverDisableAlertPoliciesOperator

使用 StackdriverDisableAlertPoliciesOperator 来禁用由给定过滤器标识的提醒政策。

使用操作符

您可以使用此操作符(带或不带项目 ID)来获取所有提醒政策。如果缺少项目 ID,则将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

disable_alert_policy = StackdriverDisableAlertPoliciesOperator(
    task_id="disable-alert-policies",
    filter_=f'displayName="{ALERT_1_NAME}"',
)

StackdriverUpsertAlertOperator

使用 StackdriverUpsertAlertOperator 来 upsert (更新或插入) 由给定 JSON 字符串过滤器标识的提醒政策。如果具有给定名称的提醒已存在,则该操作符会更新现有政策;否则,会创建一个新的政策。

使用操作符

您可以使用此操作符(带或不带项目 ID)来获取所有提醒政策。如果缺少项目 ID,则将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

create_alert_policy = StackdriverUpsertAlertOperator(
    task_id="create-alert-policies",
    alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
)

StackdriverDeleteAlertOperator

使用 StackdriverDeleteAlertOperator 来删除由给定名称标识的提醒政策。

使用操作符

要删除的提醒的名称应采用以下格式提供:projects/<PROJECT_NAME>/alertPolicies/<ALERT_NAME>

tests/system/google/cloud/stackdriver/example_stackdriver.py

delete_alert_policy = StackdriverDeleteAlertOperator(
    task_id="delete-alert-policy",
    name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
)

StackdriverListNotificationChannelsOperator

使用 StackdriverListNotificationChannelsOperator 来获取由给定过滤器标识的所有 Notification Channels。

使用操作符

您可以使用此操作符(带或不带项目 ID)来获取所有提醒政策。如果缺少项目 ID,则将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

list_notification_channel = StackdriverListNotificationChannelsOperator(
    task_id="list-notification-channel", filter_='type="pubsub"'
)

StackdriverEnableNotificationChannelsOperator

使用 StackdriverEnableNotificationChannelsOperator 来启用由给定过滤器标识的 Notification Channels。

使用操作符

您可以使用此操作符(带或不带项目 ID)来获取所有提醒政策。如果缺少项目 ID,则将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

enable_notification_channel = StackdriverEnableNotificationChannelsOperator(
    task_id="enable-notification-channel", filter_='type="pubsub"'
)

StackdriverDisableNotificationChannelsOperator

使用 StackdriverDisableNotificationChannelsOperator 来禁用由给定过滤器标识的 Notification Channels。

使用操作符

您可以使用此操作符(带或不带项目 ID)来获取所有提醒政策。如果缺少项目 ID,则将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverUpsertNotificationChannelOperator

使用 StackdriverUpsertNotificationChannelOperator 来 upsert (更新或插入) 由给定频道 JSON 字符串标识的 Notification Channels。如果具有给定名称的频道已存在,则该操作符会更新现有频道;否则,会创建一个新的频道。

使用操作符

您可以使用此操作符(带或不带项目 ID)来获取所有提醒政策。如果缺少项目 ID,则将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverDeleteNotificationChannelOperator

要删除的频道的名称应采用以下格式提供:projects/<PROJECT_NAME>/notificationChannels/<CHANNEL_NAME>

使用操作符

您可以使用此操作符(带或不带项目 ID)来获取所有提醒政策。如果缺少项目 ID,则将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
    task_id="delete-notification-channel",
    name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
)

本条目有帮助吗?