Google Cloud Stackdriver 操作符

先决条件任务

要使用这些操作符,您必须执行以下几项操作

StackdriverListAlertPoliciesOperator

使用 StackdriverListAlertPoliciesOperator 获取由给定过滤器标识的所有警报策略。

使用操作符

您可以使用此操作符,也可以不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

list_alert_policies = StackdriverListAlertPoliciesOperator(
    task_id="list-alert-policies",
)

StackdriverEnableAlertPoliciesOperator

使用 StackdriverEnableAlertPoliciesOperator 启用由给定过滤器标识的警报策略。

使用操作符

您可以使用此操作符,也可以不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

enable_alert_policy = StackdriverEnableAlertPoliciesOperator(
    task_id="enable-alert-policies",
    filter_=f'(displayName="{ALERT_1_NAME}" OR displayName="{ALERT_2_NAME}")',
)

StackdriverDisableAlertPoliciesOperator

使用 StackdriverDisableAlertPoliciesOperator 禁用由给定过滤器标识的警报策略。

使用操作符

您可以使用此操作符,也可以不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

disable_alert_policy = StackdriverDisableAlertPoliciesOperator(
    task_id="disable-alert-policies",
    filter_=f'displayName="{ALERT_1_NAME}"',
)

StackdriverUpsertAlertOperator

使用 StackdriverUpsertAlertOperator 使用给定的过滤器 JSON 字符串插入或更新警报策略。如果具有给定名称的警报已存在,则该操作符更新现有策略,否则创建一个新策略。

使用操作符

您可以使用此操作符,也可以不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

create_alert_policy = StackdriverUpsertAlertOperator(
    task_id="create-alert-policies",
    alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
)

StackdriverDeleteAlertOperator

使用 StackdriverDeleteAlertOperator 删除由给定名称标识的警报策略。

使用操作符

要删除的警报名称应以 projects/<PROJECT_NAME>/alertPolicies/<ALERT_NAME> 格式给出

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

delete_alert_policy = StackdriverDeleteAlertOperator(
    task_id="delete-alert-policy",
    name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
)

StackdriverListNotificationChannelsOperator

使用 StackdriverListNotificationChannelsOperator 获取由给定过滤器标识的所有通知渠道。

使用操作符

您可以使用此操作符,也可以不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

list_notification_channel = StackdriverListNotificationChannelsOperator(
    task_id="list-notification-channel", filter_='type="pubsub"'
)

StackdriverEnableNotificationChannelsOperator

使用 StackdriverEnableNotificationChannelsOperator 启用由给定过滤器标识的通知渠道。

使用操作符

您可以使用此操作符,也可以不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

enable_notification_channel = StackdriverEnableNotificationChannelsOperator(
    task_id="enable-notification-channel", filter_='type="pubsub"'
)

StackdriverDisableNotificationChannelsOperator

使用 StackdriverDisableNotificationChannelsOperator 禁用由给定过滤器标识的通知渠道。

使用操作符

您可以使用此操作符,也可以不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverUpsertNotificationChannelOperator

使用 StackdriverUpsertNotificationChannelOperator 使用给定的通道 JSON 字符串插入或更新通知渠道。如果具有给定名称的渠道已存在,则该操作符更新现有渠道,否则创建一个新渠道。

使用操作符

您可以使用此操作符,也可以不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverDeleteNotificationChannelOperator

要删除的警报名称应以 projects/<PROJECT_NAME>/notificationChannels/<CHANNEL_NAME> 格式给出

使用操作符

您可以使用此操作符,也可以不使用项目 ID 来获取所有警报策略。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。

tests/system/google/cloud/stackdriver/example_stackdriver.py[源代码]

delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
    task_id="delete-notification-channel",
    name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
)

此条目是否有帮助?