SQL 操作符

这些操作符对 SQL 数据库执行各种查询,包括列级和表级数据质量检查。

执行 SQL 查询

使用 SQLExecuteQueryOperator 对不同的数据库运行 SQL 查询。该操作符的参数包括:

  • sql - 单个字符串、字符串列表或指向要执行的模板文件的字符串;

  • autocommit (可选) 如果为 True,则每条命令自动提交(默认:False);

  • parameters (可选) 用于渲染 SQL 查询的参数。

  • handler (可选) 将应用于游标的函数。如果为 None,将不返回结果(默认:fetch_all_handler)。

  • split_statements (可选) 是否将单个 SQL 字符串拆分成语句并分别运行(默认:False)。

  • return_last (可选) 取决于 split_statements,如果为 True,此参数用于返回最后一条语句的结果或所有拆分语句的结果(默认:True)。

下面的示例展示了如何实例化 SQLExecuteQueryOperator 任务。

tests/system/common/sql/example_sql_execute_query.py

execute_query = SQLExecuteQueryOperator(
    task_id="execute_query",
    sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
    split_statements=True,
    return_last=False,
)

检查 SQL 表列

使用 SQLColumnCheckOperator 对给定表的列运行数据质量检查。除了连接 ID 和表之外,还必须提供一个 `column_mapping`,它描述了列与要运行的测试之间的关系。一个示例的 `column_mapping` 是一个由三个嵌套字典组成的集合,如下所示:

column_mapping = {
    "col_name": {
        "null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
        "min": {
            "greater_than": 5,
            "leq_to": 10,
            "tolerance": 0.2,
        },
        "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
    }
}

其中 `col_name` 是要运行检查的列的名称,其字典中的每个条目都是一个检查。有效的检查包括:

  • null_check: 检查列中的 NULL 值数量

  • distinct_check: 检查列中不同值的计数

  • unique_check: 将列中不同值的数量与行数进行比较

  • min: 检查列中的最小值

  • max: 检查列中的最大值

检查字典中的每个条目可以是检查成功的条件、容差或分区子句。成功的条件包括:

  • 大于

  • 大于等于

  • 小于

  • 小于等于

  • 等于

指定条件时,`equal_to` 与其他条件不兼容。可以在同一检查中同时指定下限和上限条件。容差是指结果可能超出边界但仍被视为成功的百分比。

分区子句可以在操作符级别作为参数给出,它将对所有检查进行分区;也可以在 `column_mapping` 中的列级别给出,它将对该列的所有检查进行分区;或者在列的检查级别给出,它仅对该检查进行分区。

如果不使用提供的连接中的数据库,也可以指定数据库。

`accept_none` 参数,默认为 true,会将查询返回的 None 值转换为 0,从而允许空表返回有效的整数。

下面的示例演示了如何实例化 SQLColumnCheckOperator 任务。

tests/system/common/sql/example_sql_column_table_check.py

column_check = SQLColumnCheckOperator(
    task_id="column_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    column_mapping={
        "id": {
            "null_check": {
                "equal_to": 0,
                "tolerance": 0,
            },
            "distinct_check": {
                "equal_to": 1,
            },
        }
    },
)

检查 SQL 表值

使用 SQLTableCheckOperator 对给定表运行数据质量检查。除了连接 ID 和表之外,还必须提供一个 `checks` 字典,它描述了表与要运行的测试之间的关系。一个示例的 `checks` 参数是由两个嵌套字典组成的集合,如下所示:

checks = (
    {
        "row_count_check": {
            "check_statement": "COUNT(*) = 1000",
        },
        "column_sum_check": {
            "check_statement": "col_a + col_b < col_c",
            "partition_clause": "col_a IS NOT NULL",
        },
    },
)

第一组键是检查名称,在操作符构建的模板化查询中引用。检查名称下的字典键必须包含 `check_statement`,其值是一个解析为布尔值的 SQL 语句(这可以是任何在 airflow.operators.sql.parse_boolean 中解析为布尔值的字符串或整数)。另一个可能提供的键是 `partition_clause`,它是一个检查级别的语句,将使用 WHERE 子句对该检查的表中的数据进行分区。此语句与参数 `partition_clause` 兼容,后者会过滤所有检查。

下面的示例演示了如何实例化 SQLTableCheckOperator 任务。

tests/system/common/sql/example_sql_column_table_check.py

row_count_check = SQLTableCheckOperator(
    task_id="row_count_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    checks={
        "row_count_check": {
            "check_statement": "COUNT(*) = 1",
        }
    },
)

根据阈值检查值

使用 SQLThresholdCheckOperator 将特定 SQL 查询结果与定义的最小和最大阈值进行比较。这两个阈值既可以是数值,也可以是解析为数值的另一个 SQL 查询。此操作符需要一个连接 ID 以及要执行的 SQL 查询,并且允许可选地指定数据库,如果应覆盖 `connection_id` 中的数据库。参数包括:- sql - 要执行的 SQL 查询,作为模板化字符串。- min_threshold - 检查的最小阈值。可以是数值或模板化 SQL 查询。- max_threshold - 检查的最大阈值。可以是数值或模板化 SQL 查询。- conn_id (可选) - 用于连接到数据库的连接 ID。- database (可选) - 数据库名称,将覆盖连接中的数据库。

下面的示例演示了如何实例化 SQLThresholdCheckOperator 任务。

tests/system/common/sql/example_sql_threshold_check.py

threshhold_check = SQLThresholdCheckOperator(
    task_id="threshhold_check",
    conn_id="sales_db",
    sql="SELECT count(distinct(customer_id)) FROM sales;",
    min_threshold=1,
    max_threshold=1000,
)

如果查询返回的值在阈值范围内,则任务通过。否则,任务失败。

此条目是否有用?