airflow.providers.common.sql.operators.sql

模块内容

BaseSQLOperator

这是一个通用的 SQL 操作符的基类,用于获取 DB Hook。

SQLExecuteQueryOperator

在特定的数据库中执行 SQL 代码。

SQLColumnCheckOperator

在 column_checks 字典中执行一个或多个模板化检查。

SQLTableCheckOperator

执行 checks 字典中提供的一个或多个检查。

SQLCheckOperator

对数据库执行检查。

SQLValueCheckOperator

使用 sql 代码执行简单的值检查。

SQLIntervalCheckOperator

检查作为 SQL 表达式给出的指标是否在 days_back 之前的容差范围内。

SQLThresholdCheckOperator

使用 sql 代码针对最小阈值和最大阈值执行值检查。

BranchSQLOperator

允许 DAG 基于 SQL 查询的结果“分支”或遵循指定的路径。

函数

default_output_processor(results, descriptions)

airflow.providers.common.sql.operators.sql.default_output_processor(results, descriptions)[源代码]
class airflow.providers.common.sql.operators.sql.BaseSQLOperator(*, conn_id=None, database=None, hook_params=None, retry_on_failure=True, **kwargs)[源代码]

基类: airflow.models.BaseOperator

这是一个通用的 SQL 操作符的基类,用于获取 DB Hook。

提供的方法是 .get_db_hook()。默认行为会尝试根据连接类型检索 DB hook。你可以通过重写 .get_db_hook() 方法来自定义该行为。

参数

conn_id (str | None) – 对特定数据库的引用

conn_id_field = 'conn_id'[源代码]
template_fields: collections.abc.Sequence[str] = ('conn_id', 'database', 'hook_params')[源代码]
classmethod get_hook(conn_id, hook_params=None)[源代码]

返回此连接 ID 的默认 hook。

参数
  • conn_id (str) – 连接 ID

  • hook_params (dict | None) – hook 参数

返回

此连接的默认 hook

返回类型

airflow.hooks.base.BaseHook

get_db_hook()[源代码]

获取连接的数据库 hook。

返回

数据库 hook 对象。

返回类型

airflow.providers.common.sql.hooks.sql.DbApiHook

class airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator(*, sql, autocommit=False, parameters=None, handler=fetch_all_handler, output_processor=None, conn_id=None, database=None, split_statements=None, return_last=True, show_return_value_in_logs=False, **kwargs)[源代码]

基类: BaseSQLOperator

在特定的数据库中执行 SQL 代码。

当实现特定的操作符时,你还可以在 hook 中实现 _process_output 方法,以便对你的 DB Hook 返回的值执行额外的处理。例如,你可以将从语句的游标检索到的描述与返回的值连接起来,或将操作符的输出保存到文件中。

参数
  • sql (str | list[str]) – 要执行的 SQL 代码或指向模板文件的字符串(已模板化)。文件必须具有 '.sql' 扩展名。

  • autocommit (bool) – (可选)如果为 True,则每个命令都会自动提交(默认值:False)。

  • parameters (collections.abc.Mapping | collections.abc.Iterable | None) – (可选)用于呈现 SQL 查询的参数。

  • handler (Callable[[Any], list[tuple] | None]) – (可选)将应用于游标的函数(默认值:fetch_all_handler)。

  • output_processor (Callable[[list[Any], list[collections.abc.Sequence[collections.abc.Sequence] | None]], list[Any] | tuple[list[collections.abc.Sequence[collections.abc.Sequence] | None], list] | None]) – (可选)将应用于结果的函数(默认值:default_output_processor)。

  • split_statements (bool | None) – (可选)如果将单个 SQL 字符串拆分为语句。默认情况下,会延迟到配置的 hook 的 run 方法中的默认值。

  • conn_id (str | None) – 用于连接数据库的连接 ID。

  • database (str | None) – 数据库名称,覆盖连接中定义的数据库。

  • return_last (bool) – (可选) 仅返回最后一个语句的结果 (默认值: True)。

  • show_return_value_in_logs (bool) – (可选) 如果为 true,则操作符的输出将打印到任务日志中。谨慎使用。不建议将大型数据集转储到日志中。(默认值: False)。

另请参阅

有关如何使用此操作符的更多信息,请查看指南: 执行 SQL 查询

template_fields: collections.abc.Sequence[str] = ('sql', 'parameters')[source]
template_ext: collections.abc.Sequence[str] = ('.sql', '.json')[source]
template_fields_renderers: ClassVar[dict][source]
ui_color = '#cdaaed'[source]
execute(context)[source]

在创建操作符时派生。

上下文是与渲染 Jinja 模板时使用的相同的字典。

请参阅 get_template_context 以获取更多上下文。

prepare_template()[source]

解析属性参数的模板文件。

get_openlineage_facets_on_start()[source]
get_openlineage_facets_on_complete(task_instance)[source]
class airflow.providers.common.sql.operators.sql.SQLColumnCheckOperator(*, table, column_mapping, partition_clause=None, conn_id=None, database=None, accept_none=True, **kwargs)[source]

基类: BaseSQLOperator

在 column_checks 字典中执行一个或多个模板化检查。

检查是基于 column_mapping 中指定的每个列执行的。

每个检查可以采用以下一个或多个选项

  • equal_to: 一个等于的精确值,不能与其他比较选项一起使用

  • greater_than: 结果应严格大于的值

  • less_than: 结果应严格小于的值

  • geq_to: 结果应大于或等于的值

  • leq_to: 结果应小于或等于的值

  • tolerance: 结果可能与预期值相差的百分比

  • partition_clause: 传递到 WHERE 语句中的额外子句,用于对数据进行分区

参数
  • table (str) – 要在其上运行检查的表

  • column_mapping (dict[str, dict[str, Any]]) –

    列及其关联检查的字典,例如

    {
        "col_name": {
            "null_check": {
                "equal_to": 0,
                "partition_clause": "foreign_key IS NOT NULL",
            },
            "min": {
                "greater_than": 5,
                "leq_to": 10,
                "tolerance": 0.2,
            },
            "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
        }
    }
    

  • partition_clause (str | None) –

    添加到操作符构建的查询中的 WHERE 子句的部分 SQL 语句,该语句为要运行检查创建 partition_clauses,例如

    "date = '1970-01-01'"
    

  • conn_id (str | None) – 用于连接数据库的连接 ID。

  • database (str | None) – 数据库名称,覆盖连接中定义的数据库。

  • accept_none (bool) – 是否接受查询返回的 None 值。如果为 true,则将 None 转换为 0。

另请参阅

有关如何使用此操作符的更多信息,请查看指南: 检查 SQL 表列

template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql')[source]
template_fields_renderers: ClassVar[dict][source]
sql_check_template = Multiline-String[source]
显示值
"""
        SELECT '{column}' AS col_name, '{check}' AS check_type, {column}_{check} AS check_result
        FROM (SELECT {check_statement} AS {column}_{check} FROM {table} {partition_clause}) AS sq
    """
column_checks[source]
execute(context)[source]

在创建操作符时派生。

上下文是与渲染 Jinja 模板时使用的相同的字典。

请参阅 get_template_context 以获取更多上下文。

class airflow.providers.common.sql.operators.sql.SQLTableCheckOperator(*, table, checks, partition_clause=None, conn_id=None, database=None, **kwargs)[source]

基类: BaseSQLOperator

执行 checks 字典中提供的一个或多个检查。

检查应编写为返回布尔值结果。

参数
  • table (str) – 要在其上运行检查的表

  • checks (dict[str, dict[str, Any]]) –

    检查字典,其中检查名称后跟一个字典,该字典至少包含一个检查语句,以及可选的分区子句,例如:

    {
        "row_count_check": {"check_statement": "COUNT(*) = 1000"},
        "column_sum_check": {"check_statement": "col_a + col_b < col_c"},
        "third_check": {"check_statement": "MIN(col) = 1", "partition_clause": "col IS NOT NULL"},
    }
    

  • partition_clause (str | None) –

    添加到操作符构建的查询中的 WHERE 子句的部分 SQL 语句,该语句为要运行检查创建 partition_clauses,例如

    "date = '1970-01-01'"
    

  • conn_id (str | None) – 用于连接数据库的连接 ID。

  • database (str | None) – 数据库名称,覆盖连接中定义的数据库。

另请参阅

有关如何使用此运算符的更多信息,请查看指南:检查 SQL 表值

template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql')[source]
template_fields_renderers: ClassVar[dict][source]
sql_check_template = Multiline-String[source]
显示值
"""
    SELECT '{check_name}' AS check_name, MIN({check_name}) AS check_result
    FROM (SELECT CASE WHEN {check_statement} THEN 1 ELSE 0 END AS {check_name}
          FROM {table} {partition_clause}) AS sq
    """
execute(context)[source]

在创建操作符时派生。

上下文是与渲染 Jinja 模板时使用的相同的字典。

请参阅 get_template_context 以获取更多上下文。

class airflow.providers.common.sql.operators.sql.SQLCheckOperator(*, sql, conn_id=None, database=None, parameters=None, **kwargs)[source]

基类: BaseSQLOperator

对数据库执行检查。

SQLCheckOperator 期望一个 SQL 查询,该查询将返回单行。第一行的每个值都使用 Python bool 强制转换进行评估。如果任何值返回 False,则检查失败并报错。如果返回 Python 字典,并且 Python 字典中的任何值是 False,则检查失败并报错。

请注意,Python 布尔值强制转换会将以下内容评估为 False

  • False

  • 0

  • 空字符串 ("")

  • 空列表 ([])

  • 空字典或集合 ({})

  • 值 = False 的字典 ({'DUPLICATE_ID_CHECK': False})

给定一个像 SELECT COUNT(*) FROM foo 这样的查询,只有当计数 == 0 时才会失败。您可以编写更复杂的查询,例如,检查该表的行数是否与上游源表相同,或者今天分区的计数是否大于昨天的分区,或者一组指标是否小于 7 天平均值的 3 个标准差。

此运算符可以用作管道中的数据质量检查,并且根据您将其放置在 DAG 中的位置,您可以选择停止关键路径,防止发布可疑数据,或者在侧面接收电子邮件警报,而不会停止 DAG 的进度。

参数
  • sql (str) – 要执行的 SQL。(已模板化)

  • conn_id (str | None) – 用于连接数据库的连接 ID。

  • database (str | None) – 数据库名称,覆盖连接中定义的数据库。

  • parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (可选) 用于呈现 SQL 查询的参数。

template_fields: collections.abc.Sequence[str] = ('sql',)[source]
template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]
template_fields_renderers: ClassVar[dict][source]
ui_color = '#fff7e6'[source]
execute(context)[source]

在创建操作符时派生。

上下文是与渲染 Jinja 模板时使用的相同的字典。

请参阅 get_template_context 以获取更多上下文。

class airflow.providers.common.sql.operators.sql.SQLValueCheckOperator(*, sql, pass_value, tolerance=None, conn_id=None, database=None, **kwargs)[source]

基类: BaseSQLOperator

使用 sql 代码执行简单的值检查。

参数
  • sql (str) – 要执行的 SQL。(已模板化)

  • conn_id (str | None) – 用于连接数据库的连接 ID。

  • database (str | None) – 数据库名称,覆盖连接中定义的数据库。

__mapper_args__[source]
template_fields: collections.abc.Sequence[str] = ('sql', 'pass_value')[source]
template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]
template_fields_renderers: ClassVar[dict][source]
ui_color = '#fff7e6'[source]
check_value(records)[source]
execute(context)[source]

在创建操作符时派生。

上下文是与渲染 Jinja 模板时使用的相同的字典。

请参阅 get_template_context 以获取更多上下文。

class airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, ratio_formula='max_over_min', ignore_zero=True, conn_id=None, database=None, **kwargs)[source]

基类: BaseSQLOperator

检查作为 SQL 表达式给出的指标是否在 days_back 之前的容差范围内。

参数
  • table (str) – 表名

  • conn_id (str | None) – 用于连接数据库的连接 ID。

  • database (str | None) – 数据库名称,将覆盖连接中定义的数据库名称

  • days_back (SupportsAbs[int]) – ds 和我们要检查的 ds 之间的天数。默认为 7 天

  • date_filter_column (str | None) – 用于筛选日期的列名。默认为 ‘ds’

  • ratio_formula (str | None) –

    用于计算两个指标之间比率的公式。假设 cur 是今天的指标,ref 是今天 - days_back 的指标。默认值:‘max_over_min’

    • max_over_min: 计算 max(cur, ref) / min(cur, ref)

    • relative_diff: 计算 abs(cur-ref) / ref

  • ignore_zero (bool) – 是否应忽略零指标

  • metrics_thresholds (dict[str, int]) – 一个以指标为索引的比率字典

__mapper_args__[source]
template_fields: collections.abc.Sequence[str] = ('sql1', 'sql2')[source]
template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]
template_fields_renderers: ClassVar[dict][source]
ui_color = '#fff7e6'[source]
ratio_formulas[source]
execute(context)[源代码]

在创建操作符时派生。

上下文是与渲染 Jinja 模板时使用的相同的字典。

请参阅 get_template_context 以获取更多上下文。

class airflow.providers.common.sql.operators.sql.SQLThresholdCheckOperator(*, sql, min_threshold, max_threshold, conn_id=None, database=None, **kwargs)[源代码]

基类: BaseSQLOperator

使用 sql 代码针对最小阈值和最大阈值执行值检查。

阈值可以是数值形式,也可以是返回数值的 SQL 语句。

参数
  • sql (str) – 要执行的 SQL。(已模板化)

  • conn_id (str | None) – 用于连接数据库的连接 ID。

  • database (str | None) – 数据库名称,覆盖连接中定义的数据库。

  • min_threshold (Any) – 要执行的数值或最小阈值 SQL (使用模板)

  • max_threshold (Any) – 要执行的数值或最大阈值 SQL (使用模板)

另请参阅

有关如何使用此操作符的更多信息,请查看指南:根据阈值检查值

template_fields: collections.abc.Sequence[str] = ('sql', 'min_threshold', 'max_threshold')[源代码]
template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[源代码]
template_fields_renderers: ClassVar[dict][源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文是与渲染 Jinja 模板时使用的相同的字典。

请参阅 get_template_context 以获取更多上下文。

push(meta_data)[源代码]

将数据检查信息和元数据发送到外部数据库。

默认功能将记录元数据。

class airflow.providers.common.sql.operators.sql.BranchSQLOperator(*, sql, follow_task_ids_if_true, follow_task_ids_if_false, conn_id='default_conn_id', database=None, parameters=None, **kwargs)[源代码]

基类:BaseSQLOperator, airflow.models.SkipMixin

允许 DAG 根据 SQL 查询的结果进行“分支”或遵循指定的路径。

参数
  • sql (str) – 要执行的 SQL 代码,应返回 true 或 false(使用模板)。以 ‘.sql’ 结尾的字符串会被识别为模板引用。预期 SQL 查询返回一个布尔值 (True/False),整数 (0 = False, 其他值 = 1) 或字符串 (true/y/yes/1/on/false/n/no/0/off)。

  • follow_task_ids_if_true (list[str]) – 如果查询返回 true,则要遵循的任务 ID 或任务 ID 列表

  • follow_task_ids_if_false (list[str]) – 如果查询返回 false,则要遵循的任务 ID 或任务 ID 列表

  • conn_id (str) – 用于连接到数据库的连接 ID。

  • database (str | None) – 数据库名称,覆盖连接中定义的数据库。

  • parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (可选) 用于呈现 SQL 查询的参数。

template_fields: collections.abc.Sequence[str] = ('sql',)[源代码]
template_ext: collections.abc.Sequence[str] = ('.sql',)[源代码]
template_fields_renderers: ClassVar[dict][源代码]
ui_color = '#a22034'[源代码]
ui_fgcolor = '#F7F7F7'[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文是与渲染 Jinja 模板时使用的相同的字典。

请参阅 get_template_context 以获取更多上下文。

这个条目有帮助吗?