SQL 操作符¶
这些操作符对 SQL 数据库执行各种查询,包括列级和表级的数据质量检查。
执行 SQL 查询¶
使用 SQLExecuteQueryOperator
对不同的数据库运行 SQL 查询。操作符的参数有:
sql
- 单个字符串、字符串列表或指向要执行的模板文件的字符串;autocommit
(可选) 如果为 True,则每个命令会自动提交 (默认值:False);parameters
(可选) 用于渲染 SQL 查询的参数。handler
(可选) 将应用于游标的函数。如果它是None
,则不会返回结果 (默认值:fetch_all_handler)。split_statements
(可选) 是否将单个 SQL 字符串拆分为语句并单独运行 (默认值:False)。return_last
(可选) 取决于split_statements
,如果它是True
,则此参数用于仅返回最后一条语句的结果还是所有拆分语句的结果 (默认值:True)。
下面的示例展示了如何实例化 SQLExecuteQueryOperator 任务。
execute_query = SQLExecuteQueryOperator(
task_id="execute_query",
sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
split_statements=True,
return_last=False,
)
检查 SQL 表列¶
使用 SQLColumnCheckOperator
对给定表的列运行数据质量检查。除了连接 ID 和表之外,还必须提供描述列和要运行的测试之间关系的 column_mapping。一个示例列映射是一组三个嵌套字典,如下所示:
column_mapping = {
"col_name": {
"null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
"min": {
"greater_than": 5,
"leq_to": 10,
"tolerance": 0.2,
},
"max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
}
}
其中 col_name 是要运行检查的列的名称,其字典中的每个条目都是一个检查。有效的检查是:
null_check: 检查列中 NULL 值的数量
distinct_check: 检查列中不同值的 COUNT
unique_check: 检查列中不同值的数量与行数是否一致
min: 检查列中的最小值
max: 检查列中的最大值
检查字典中的每个条目要么是检查成功的条件、容差,要么是分区子句。成功的条件是:
greater_than
geq_to
less_than
leq_to
equal_to
指定条件时,equal_to 与其他条件不兼容。可以在同一检查中指定下限和上限条件。容差是指结果可能超出范围但仍被视为成功的百分比。
分区子句可以在操作符级别作为参数给出(此时它将对所有检查进行分区),可以在列映射中的列级别给出(此时它将对该列的所有检查进行分区),也可以在列的检查级别给出(此时它只对该检查进行分区)。
如果未使用所提供连接中的数据库,也可以指定数据库。
accept_none 参数(默认为 true)会将查询返回的 None 值转换为 0,从而允许空表返回有效的整数。
下面的示例演示了如何实例化 SQLColumnCheckOperator 任务。
column_check = SQLColumnCheckOperator(
task_id="column_check",
table=AIRFLOW_DB_METADATA_TABLE,
column_mapping={
"id": {
"null_check": {
"equal_to": 0,
"tolerance": 0,
},
"distinct_check": {
"equal_to": 1,
},
}
},
)
检查 SQL 表值¶
使用 SQLTableCheckOperator
对给定表运行数据质量检查。除了连接 ID 和表之外,还必须提供描述表和要运行的测试之间关系的 checks 字典。一个示例 checks 参数是一组两个嵌套字典,如下所示:
checks = (
{
"row_count_check": {
"check_statement": "COUNT(*) = 1000",
},
"column_sum_check": {
"check_statement": "col_a + col_b < col_c",
"partition_clause": "col_a IS NOT NULL",
},
},
)
第一组键是检查名称,这些名称在操作符构建的模板化查询中引用。检查名称下的字典键必须包含 check_statement,并且该值是一个解析为布尔值的 SQL 语句(这可以是任何解析为 airflow.operators.sql.parse_boolean 中的布尔值的字符串或整数)。要提供的另一个可能的键是 partition_clause,这是一个检查级语句,它将使用该检查的 WHERE 子句对表中的数据进行分区。此语句与参数 partition_clause 兼容,后者会在所有检查中进行筛选。
下面的示例演示了如何实例化 SQLTableCheckOperator 任务。
row_count_check = SQLTableCheckOperator(
task_id="row_count_check",
table=AIRFLOW_DB_METADATA_TABLE,
checks={
"row_count_check": {
"check_statement": "COUNT(*) = 1",
}
},
)
根据阈值检查值¶
使用 SQLThresholdCheckOperator
将特定的 SQL 查询结果与定义的最小和最大阈值进行比较。这两个阈值都可以是数值或计算结果为数值的另一个 SQL 查询。此操作符需要一个连接 ID,以及要执行的 SQL 查询,并且允许选择指定数据库(如果应该覆盖连接_id 中的数据库)。参数包括:- sql
- 要执行的 sql 查询,作为模板化字符串。- min_threshold
- 要检查的最小阈值。可以作为数值或模板化 sql 查询。- max_threshold
- 要检查的最大阈值。可以作为数值或模板化 sql 查询。- conn_id
(可选) 用于连接到数据库的连接 ID。- database
(可选) 覆盖连接中的数据库名称。
下面的示例演示了如何实例化 SQLThresholdCheckOperator 任务。
threshhold_check = SQLThresholdCheckOperator(
task_id="threshhold_check",
conn_id="sales_db",
sql="SELECT count(distinct(customer_id)) FROM sales;",
min_threshold=1,
max_threshold=1000,
)
如果查询返回的值在阈值范围内,则任务通过。否则,它将失败。