airflow.providers.common.sql.operators.sql¶
属性¶
类¶
这是用于通用 SQL Operator 的基类,用于获取 DB Hook。 |
|
在特定数据库中执行 SQL 代码。 |
|
执行 column_checks 字典中一个或多个模板化的检查。 |
|
执行 checks 字典中提供的一个或多个检查。 |
|
对数据库执行检查。 |
|
使用 SQL 代码执行简单的值检查。 |
|
检查作为 SQL 表达式给出的指标是否在前 days_back 天的指标的容差范围内。 |
|
使用 SQL 代码对最小值阈值和最大值阈值执行值检查。 |
|
允许 DAG 根据 SQL 查询结果进行“分支”或遵循指定路径。 |
函数¶
|
模块内容¶
- airflow.providers.common.sql.operators.sql.parse_boolean[source]¶
- Sphinx-autoapi-skip:
重要提示!!!保留此代码以兼容已发布的 google provider 8.4.0 版本。
遗憾的是,此 provider 使用 _get_failed_checks 和 parse_boolean 作为导入,我们应该保留这些方法以避免 8.4.0 版本出现故障。
- class airflow.providers.common.sql.operators.sql.BaseSQLOperator(*, conn_id=None, database=None, hook_params=None, retry_on_failure=True, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
这是用于通用 SQL Operator 的基类,用于获取 DB Hook。
提供的方法是 .get_db_hook()。默认行为将尝试根据连接类型检索 DB hook。您可以通过覆盖 .get_db_hook() 方法来自定义行为。
- 参数:
conn_id (str | None) – 对特定数据库的引用
- template_fields: collections.abc.Sequence[str] = ('conn_id', 'database', 'hook_params')[source]¶
- classmethod get_hook(conn_id, hook_params=None)[source]¶
返回此连接 ID 的默认 hook。
- 参数:
- 返回:
此连接的默认 hook
- 返回类型:
- class airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator(*, sql, autocommit=False, parameters=None, handler=fetch_all_handler, output_processor=None, conn_id=None, database=None, split_statements=None, return_last=True, show_return_value_in_logs=False, requires_result_fetch=False, **kwargs)[source]¶
基类:
BaseSQLOperator
在特定数据库中执行 SQL 代码。
在实现特定 Operator 时,您还可以在 hook 中实现 _process_output 方法,以对您的 DB Hook 返回的值执行额外的处理。例如,您可以将从语句的游标中检索到的描述与返回值连接起来,或者将 Operator 的输出保存到文件中。
- 参数:
sql (str | list[str]) – 要执行的 SQL 代码或指向模板文件的字符串(模板化)。文件必须具有“.sql”扩展名。
autocommit (bool) – (可选)如果为 True,则每个命令都会自动提交(默认值:False)。
parameters (collections.abc.Mapping | collections.abc.Iterable | None) – (可选)用于渲染 SQL 查询的参数。
handler (Callable[[Any], list[tuple] | None]) – (可选)将应用于游标的函数(默认值:fetch_all_handler)。
output_processor (Callable[[list[Any], list[collections.abc.Sequence[collections.abc.Sequence] | None]], list[Any] | tuple[list[collections.abc.Sequence[collections.abc.Sequence] | None], list]] | None) – (可选)将应用于结果的函数(默认值:default_output_processor)。
split_statements (bool | None) – (可选)是否将单个 SQL 字符串拆分为多个语句。默认情况下,遵循已配置 hook 的
run
方法中的默认值。conn_id (str | None) – 用于连接到数据库的连接 ID
database (str | None) – 数据库名称,将覆盖连接中定义的名称
return_last (bool) – (可选)仅返回最后一条语句的结果(默认值:True)。
show_return_value_in_logs (bool) – (可选)如果为 true,则 operator 输出将打印到任务日志中。请谨慎使用。不建议将大型数据集转储到日志中(默认值:False)。
requires_result_fetch (bool) – (可选)如果为 True,则确保在执行完成之前获取查询结果。如果 do_xcom_push 为 True,则结果会自动获取,使此参数变得冗余(默认值:False)。
另请参阅
有关如何使用此 operator 的更多信息,请参阅指南:执行 SQL 查询
- template_fields: collections.abc.Sequence[str] = ('sql', 'parameters', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql', '.json')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLColumnCheckOperator(*, table, column_mapping, partition_clause=None, conn_id=None, database=None, accept_none=True, **kwargs)[source]¶
基类:
BaseSQLOperator
执行 column_checks 字典中一个或多个模板化的检查。
检查是根据 column_mapping 指定的每列执行的。
每个检查可以采用以下一个或多个选项
equal_to
: 等于的精确值,不能与其他比较选项一起使用greater_than
: 结果应严格大于的值less_than
: 结果应严格小于的值geq_to
: 结果应大于或等于的值leq_to
: 结果应小于或等于的值tolerance
: 结果可能与期望值相差的百分比partition_clause
: 传递到 WHERE 语句中用于分区数据的额外子句
- 参数:
table (str) – 要运行检查的表
column_mapping (dict[str, dict[str, Any]]) –
列及其相关检查的字典,例如
{ "col_name": { "null_check": { "equal_to": 0, "partition_clause": "foreign_key IS NOT NULL", }, "min": { "greater_than": 5, "leq_to": 10, "tolerance": 0.2, }, "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01}, } }
partition_clause (str | None) –
添加到 operator 构建的查询的 WHERE 子句中的部分 SQL 语句,用于为要运行的检查创建 partition_clauses,例如
"date = '1970-01-01'"
conn_id (str | None) – 用于连接到数据库的连接 ID
database (str | None) – 数据库名称,将覆盖连接中定义的名称
accept_none (bool) – 是否接受查询返回的 None 值。如果为 true,则将 None 转换为 0。
另请参阅
有关如何使用此 operator 的更多信息,请参阅指南:检查 SQL 表列
- template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql', 'conn_id', 'database', 'hook_params')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLTableCheckOperator(*, table, checks, partition_clause=None, conn_id=None, database=None, **kwargs)[source]¶
基类:
BaseSQLOperator
执行 checks 字典中提供的一个或多个检查。
检查应该编写为返回布尔结果。
- 参数:
table (str) – 要运行检查的表
checks (dict[str, dict[str, Any]]) –
检查字典,其中检查名称后跟一个字典,该字典至少包含一个检查语句,以及可选的分区子句,例如:
{ "row_count_check": {"check_statement": "COUNT(*) = 1000"}, "column_sum_check": {"check_statement": "col_a + col_b < col_c"}, "third_check": {"check_statement": "MIN(col) = 1", "partition_clause": "col IS NOT NULL"}, }
partition_clause (str | None) –
添加到 operator 构建的查询的 WHERE 子句中的部分 SQL 语句,用于为要运行的检查创建 partition_clauses,例如
"date = '1970-01-01'"
conn_id (str | None) – 用于连接到数据库的连接 ID
database (str | None) – 数据库名称,将覆盖连接中定义的名称
另请参阅
有关如何使用此操作符的更多信息,请查阅指南:检查 SQL 表值
- template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql', 'conn_id', 'database', 'hook_params')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLCheckOperator(*, sql, conn_id=None, database=None, parameters=None, **kwargs)[source]¶
基类:
BaseSQLOperator
对数据库执行检查。
SQLCheckOperator 需要一个返回单行的 SQL 查询。第一行的每个值都使用 Python 的
bool
类型转换进行评估。如果任何值返回False
,则检查失败并报错。如果返回一个 Python 字典,并且字典中的任何值为False
,则检查失败并报错。请注意,Python 的布尔类型转换会将以下值评估为
False
False
0
空字符串 (
""
)空列表 (
[]
)空字典或集合 (
{}
)值为
False
的字典 ({'DUPLICATE_ID_CHECK': False}
)
例如,给定一个像
SELECT COUNT(*) FROM foo
这样的查询,它仅在计数== 0
时失败。你可以构建更复杂的查询,例如检查表是否与上游源表具有相同的行数,或者今天分区的计数是否大于昨天分区的计数,或者一组指标是否小于 7 天平均值的 3 个标准差。此操作符可用作管道中的数据质量检查。根据它在 DAG 中的位置,你可以选择阻止关键路径,防止发布可疑数据,或者将其放在侧面,接收电子邮件警报而不中断 DAG 的进展。
- 参数:
sql (str) – 要执行的 SQL。(模板化)
conn_id (str | None) – 用于连接数据库的连接 ID。
database (str | None) – 数据库名称,将覆盖连接中定义的名称
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (可选) 用于渲染 SQL 查询的参数。
- template_fields: collections.abc.Sequence[str] = ('sql', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLValueCheckOperator(*, sql, pass_value, tolerance=None, conn_id=None, database=None, **kwargs)[source]¶
基类:
BaseSQLOperator
使用 SQL 代码执行简单的值检查。
- 参数:
- template_fields: collections.abc.Sequence[str] = ('sql', 'pass_value', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, ratio_formula='max_over_min', ignore_zero=True, conn_id=None, database=None, **kwargs)[source]¶
基类:
BaseSQLOperator
检查作为 SQL 表达式给出的指标是否在前 days_back 天的指标的容差范围内。
- 参数:
table (str) – 表名
conn_id (str | None) – 用于连接数据库的连接 ID。
database (str | None) – 数据库名称,它将覆盖连接中定义的名称
days_back (SupportsAbs[int]) –
ds
和我们希望检查的ds
之间的天数。默认为 7 天date_filter_column (str | None) – 用于过滤日期的列名。默认为 ‘ds’
ratio_formula (str | None) –
用于计算两个指标之间比率的公式。假设
cur
是今天的指标,ref
是今天减去days_back
天的指标。默认值:‘max_over_min’max_over_min
: 计算 max(cur, ref) / min(cur, ref)relative_diff
: 计算 abs(cur-ref) / ref
ignore_zero (bool) – 是否应忽略零值指标
- template_fields: collections.abc.Sequence[str] = ('sql1', 'sql2', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLThresholdCheckOperator(*, sql, min_threshold, max_threshold, conn_id=None, database=None, **kwargs)[source]¶
基类:
BaseSQLOperator
使用 SQL 代码对最小值阈值和最大值阈值执行值检查。
阈值可以是数值,也可以是返回数值的 SQL 语句。
- 参数:
另请参阅
有关如何使用此运算符的更多信息,请参阅指南: 对照阈值检查值
- template_fields: collections.abc.Sequence[str] = ('sql', 'min_threshold', 'max_threshold', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.BranchSQLOperator(*, sql, follow_task_ids_if_true, follow_task_ids_if_false, conn_id='default_conn_id', database=None, parameters=None, **kwargs)[source]¶
Bases:
BaseSQLOperator
,airflow.models.SkipMixin
允许 DAG 基于 SQL 查询结果进行“分支”或遵循指定的路径。
- 参数:
sql (str) – 要执行的 SQL 代码,应返回 true 或 false (模板化) 模板引用通过以 '.sql' 结尾的字符串识别。预期的 SQL 查询返回布尔值 (True/False)、整数 (0 = False, 其他 = 1) 或字符串 (true/y/yes/1/on/false/n/no/0/off)。
follow_task_ids_if_true (list[str]) – 如果查询返回 true,要遵循的任务 ID 或任务 ID 列表
follow_task_ids_if_false (list[str]) – 如果查询返回 false,要遵循的任务 ID 或任务 ID 列表
conn_id (str) – 用于连接到数据库的连接 ID。
database (str | None) – 数据库名称,将覆盖连接中定义的名称
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (可选) 用于渲染 SQL 查询的参数。
- template_fields: collections.abc.Sequence[str] = ('sql', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[source]¶