tests.system.amazon.aws.utils

包内容

变量

存储要为 AWS 系统测试获取的变量的元数据。

SystemTestContextBuilder

此构建器类最终会构建一个 TaskFlow 任务,该任务将在

函数

fetch_variable(key[, default_value, test_name, optional])

给定一个参数名称:首先检查是否已存在环境变量,

set_env_id()

检索或生成一个环境 ID,验证其是否合适,

all_tasks_passed(ti)

prune_logs(logs[, force_delete, retry, retry_times, ti])

如果此 dagrun 中的所有任务都已成功,则删除关联的日志。

split_string(string)

属性

ENV_ID_ENVIRON_KEY

ENV_ID_KEY

DEFAULT_ENV_ID_PREFIX

DEFAULT_ENV_ID_LEN

DEFAULT_ENV_ID

PURGE_LOGS_INTERVAL_PERIOD

TEST_FILE_IDENTIFIERS

INVALID_ENV_ID_MSG

LOWERCASE_ENV_ID_MSG

NO_VALUE_MSG

log

tests.system.amazon.aws.utils.ENV_ID_ENVIRON_KEY: str = 'SYSTEM_TESTS_ENV_ID'[源代码]
tests.system.amazon.aws.utils.ENV_ID_KEY: str = 'ENV_ID'[源代码]
tests.system.amazon.aws.utils.DEFAULT_ENV_ID_PREFIX: str = 'env'[源代码]
tests.system.amazon.aws.utils.DEFAULT_ENV_ID_LEN: int = 8[源代码]
tests.system.amazon.aws.utils.DEFAULT_ENV_ID: str[源代码]
tests.system.amazon.aws.utils.PURGE_LOGS_INTERVAL_PERIOD = 5[源代码]
tests.system.amazon.aws.utils.TEST_FILE_IDENTIFIERS: list[str] = ['example_', 'test_'][源代码]
tests.system.amazon.aws.utils.INVALID_ENV_ID_MSG: str = '为了最大程度地提高兼容性,SYSTEM_TESTS_ENV_ID 必须是一个以...开头的字母数字字符串'[源代码]
tests.system.amazon.aws.utils.LOWERCASE_ENV_ID_MSG: str = '提供的环境 ID 包含大写字母,并且将为了...转换为小写'[源代码]
tests.system.amazon.aws.utils.NO_VALUE_MSG: str = '未找到值: 找不到变量 {key},也没有提供默认值。'[源代码]
tests.system.amazon.aws.utils.log[源代码]
class tests.system.amazon.aws.utils.Variable(name, to_split=False, delimiter=None, test_name=None, optional=False)[源代码]

存储要为 AWS 系统测试获取的变量的元数据。

参数
  • name (str) – 要获取的变量的名称。

  • to_split (bool) – 如果为 True,则输入为字符串格式的列表,需要进行拆分。默认为 False。

  • delimiter (str | None) – 如果 to_split 为 true,则将使用此分隔符拆分字符串。默认为 ‘,’。

  • test_name (str | None) – 与变量关联的系统测试的名称。

get_value()[源代码]
set_default(default)[源代码]
class tests.system.amazon.aws.utils.SystemTestContextBuilder[源代码]

这个构建器类最终构建一个 TaskFlow 任务,该任务在运行时(任务执行时)运行。此任务生成并存储测试 ENV_ID 以及任何请求的外部资源(例如,IAM 角色、VPC 等)

add_variable(variable_name, split_string=False, delimiter=None, optional=False, **kwargs)[源代码]

注册一个从环境变量或云参数存储中获取的变量

build()[源代码]

构建并返回一个 TaskFlow 任务,该任务将创建 env_id 并获取请求的变量。将所有内容存储在 xcom 中供下游任务使用。

tests.system.amazon.aws.utils.fetch_variable(key, default_value=None, test_name=None, optional=False)[源代码]

给定一个参数名称:首先检查是否存在环境变量,然后检查 SSM 中的值。如果两者都不可用,则回退到可选的默认值。

参数
  • key (str) – 要获取值的参数的名称。

  • default_value (str | None) – 如果找不到值,则使用的默认值。

  • test_name (str | None) – 系统测试名称。

  • optional (bool) – 变量是否可选。如果为 True,则如果变量不存在,则不会引发 ValueError

返回

参数的值。

返回类型

str | None

tests.system.amazon.aws.utils.set_env_id()[源代码]

检索或生成环境 ID,验证其是否适合,将其导出为环境变量,然后返回。

如果已生成环境 ID,则使用该 ID。否则,尝试获取它并将其导出为环境变量。如果没有可获取的环境 ID,则生成一个并将其导出为环境变量。

返回

有效的系统测试环境 ID。

返回类型

str

tests.system.amazon.aws.utils.all_tasks_passed(ti)[源代码]
tests.system.amazon.aws.utils.prune_logs(logs, force_delete=False, retry=False, retry_times=3, ti=None)[源代码]

如果此 dagrun 中的所有任务都已成功,则删除关联的日志。否则,将日志附加保留策略。这允许将日志用于故障排除,但确保它们不会无限期地累积。

参数
  • logs (list[tuple[str, str | None]]) – 要删除的 log_group/stream_prefix 元组的列表。

  • force_delete (bool) – 是否在删除前检查日志组内的日志流。如果为 True,则删除日志组及其内部的所有日志流。

  • retry (bool) – 如果找不到日志组/流,是否重试。在某些情况下,日志组/流是在创建主资源后几秒钟创建的。默认情况下,它会重试 3 次,等待时间为 5 秒。

  • retry_times (int) – 重试次数。

  • ti – 用于检查任务的状态。这是从 DAG 的上下文中提取的,不需要手动传递。

tests.system.amazon.aws.utils.split_string(string)[源代码]

此条目是否有帮助?