airflow.providers.snowflake.operators.snowflake¶
类¶
对 Snowflake 执行检查。 |
|
使用 SQL 代码对指定值执行简单检查,允许一定程度的容差。 |
|
检查作为 SQL 表达式给出的指标是否在与 days_back 天前的指标相差的容差范围内。 |
|
实现了 Snowflake SQL API Operator,支持按顺序执行多个 SQL 语句。 |
模块内容¶
- class airflow.providers.snowflake.operators.snowflake.SnowflakeCheckOperator(*, sql, snowflake_conn_id='snowflake_default', parameters=None, autocommit=True, do_xcom_push=True, warehouse=None, database=None, role=None, schema=None, authenticator=None, session_parameters=None, **kwargs)[source]¶
基类:
airflow.providers.common.sql.operators.sql.SQLCheckOperator
对 Snowflake 执行检查。
The
SnowflakeCheckOperator
期望一个将返回单行的 SQL 查询。第一行的每个值都会使用 Python 的bool
强制类型转换进行评估。如果任何一个值返回False
,则检查失败并报错。请注意,Python 的 bool 强制类型转换将以下内容评估为
False
False
0
空字符串(
""
)空列表(
[]
)空字典或集合(
{}
)
给定一个像
SELECT COUNT(*) FROM foo
的查询,仅当计数== 0
时才会失败。您可以构建更复杂的查询,例如检查表与上游源表具有相同的行数,或今天分区的计数大于昨天分区的计数,或一组指标小于 7 天平均值的 3 个标准差。此操作符可用作管道中的数据质量检查。根据您将其放置在 DAG 中的位置,您可以选择停止关键路径以防止发布可疑数据,或者将其放在一旁并在不停止 DAG 进程的情况下接收电子邮件警报。
- 参数:
sql (str) – 要作为单个字符串执行的 SQL 代码,或者 str(SQL 语句)的列表,或者对模板文件的引用。以 ‘.sql’ 结尾的字符串被识别为模板引用
snowflake_conn_id (str) – 对 Snowflake 连接 ID 的引用
autocommit (bool) – 如果为 True,每个命令将自动提交。(默认值:True)
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (可选) 用于渲染 SQL 查询的参数。
warehouse (str | None) – 仓库名称(将覆盖连接的额外 JSON 中定义的任何仓库)
database (str | None) – 数据库名称(将覆盖连接中定义的数据库)
schema (str | None) – Schema 名称(将覆盖连接中定义的 schema)
role (str | None) – 角色名称(将覆盖连接的额外 JSON 中定义的任何角色)
authenticator (str | None) – Snowflake 的身份验证器。‘snowflake’(默认)使用 Snowflake 内部身份验证器,‘externalbrowser’ 使用您的网络浏览器和 Okta、ADFS 或已为您帐户定义的任何其他符合 SAML 2.0 标准的身份提供商 (IdP) 进行身份验证,‘https://<您的 Okta 帐户名称>.okta.com’ 通过原生 Okta 进行身份验证。
session_parameters (dict | None) – 您可以在连接到 Snowflake 时设置会话级参数
- template_fields: collections.abc.Sequence[str][source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[source]¶
- class airflow.providers.snowflake.operators.snowflake.SnowflakeValueCheckOperator(*, sql, pass_value, tolerance=None, snowflake_conn_id='snowflake_default', parameters=None, autocommit=True, do_xcom_push=True, warehouse=None, database=None, role=None, schema=None, authenticator=None, session_parameters=None, **kwargs)[source]¶
基类:
airflow.providers.common.sql.operators.sql.SQLValueCheckOperator
使用 SQL 代码对指定值执行简单检查,允许一定程度的容差。
- 参数:
sql (str) – 要执行的 SQL
pass_value (Any) – 要检查的值
tolerance (Any) – (可选) 允许接受查询通过的容差
snowflake_conn_id (str) – 对 Snowflake 连接 ID 的引用
autocommit (bool) – 如果为 True,每个命令将自动提交。(默认值:True)
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (可选) 用于渲染 SQL 查询的参数。
warehouse (str | None) – 仓库名称(将覆盖连接的额外 JSON 中定义的任何仓库)
database (str | None) – 数据库名称(将覆盖连接中定义的数据库)
schema (str | None) – Schema 名称(将覆盖连接中定义的 schema)
role (str | None) – 角色名称(将覆盖连接的额外 JSON 中定义的任何角色)
authenticator (str | None) – Snowflake 的身份验证器。‘snowflake’(默认)使用 Snowflake 内部身份验证器,‘externalbrowser’ 使用您的网络浏览器和 Okta、ADFS 或已为您帐户定义的任何其他符合 SAML 2.0 标准的身份提供商 (IdP) 进行身份验证,‘https://<您的 Okta 帐户名称>.okta.com’ 通过原生 Okta 进行身份验证。
session_parameters (dict | None) – 您可以在连接到 Snowflake 时设置会话级参数
- template_fields: collections.abc.Sequence[str][source]¶
- class airflow.providers.snowflake.operators.snowflake.SnowflakeIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, snowflake_conn_id='snowflake_default', parameters=None, autocommit=True, do_xcom_push=True, warehouse=None, database=None, role=None, schema=None, authenticator=None, session_parameters=None, **kwargs)[source]¶
基类:
airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator
检查作为 SQL 表达式给出的指标是否在与 days_back 天前的指标相差的容差范围内。
此方法构造一个如下所示的查询
SELECT {metrics_threshold_dict_key} FROM {table} WHERE {date_filter_column}=<date>
- 参数:
table (str) – 表名
days_back (SupportsAbs[int]) – ds 与要检查的天数之间的天数。默认为 7 天
metrics_thresholds (dict) – 按指标索引的比例字典,例如 ‘COUNT(*)’: 1.5 将要求当前日期与之前的 days_back 天之间的差异不超过 50%。
snowflake_conn_id (str) – 对 Snowflake 连接 ID 的引用
autocommit (bool) – 如果为 True,每个命令将自动提交。(默认值:True)
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (可选) 用于渲染 SQL 查询的参数。
warehouse (str | None) – 仓库名称(将覆盖连接的额外 JSON 中定义的任何仓库)
database (str | None) – 数据库名称(将覆盖连接中定义的数据库)
schema (str | None) – Schema 名称(将覆盖连接中定义的 schema)
role (str | None) – 角色名称(将覆盖连接的额外 JSON 中定义的任何角色)
authenticator (str | None) – Snowflake 的身份验证器。‘snowflake’(默认)使用 Snowflake 内部身份验证器,‘externalbrowser’ 使用您的网络浏览器和 Okta、ADFS 或已为您帐户定义的任何其他符合 SAML 2.0 标准的身份提供商 (IdP) 进行身份验证,‘https://<您的 Okta 帐户名称>.okta.com’ 通过原生 Okta 进行身份验证。
session_parameters (dict | None) – 您可以在连接到 Snowflake 时设置会话级参数
- template_fields: collections.abc.Sequence[str][source]¶
- class airflow.providers.snowflake.operators.snowflake.SnowflakeSqlApiOperator(*, snowflake_conn_id='snowflake_default', warehouse=None, database=None, role=None, schema=None, authenticator=None, session_parameters=None, poll_interval=5, statement_count=0, token_life_time=LIFETIME, token_renewal_delta=RENEWAL_DELTA, bindings=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator
实现了 Snowflake SQL API Operator,支持按顺序执行多个 SQL 语句。
这是 SQLExecuteQueryOperator 的行为,Snowflake SQL API 允许在单个请求中提交多个 SQL 语句。它通过发出 POST 请求来提交 SQL 语句进行执行,轮询以检查语句的执行状态,并同时获取查询结果。此 Operator 当前使用密钥对身份验证,因此除了其他详细信息外,您需要在 Snowflake 连接中提供原始私钥内容或私钥文件路径。
- 此操作符适用于哪些场景?
在单个请求中执行多个 SQL 语句
异步执行 SQL 语句,并执行标准查询以及大多数 DDL 和 DML 语句
开发执行查询的自定义应用程序和集成
用于创建、配置用户和角色,创建表等
- 不支持以下命令
PUT 命令 (在 Snowflake SQL 中)
GET 命令 (在 Snowflake SQL 中)
CALL 命令与返回表的存储过程(带有 RETURNS TABLE 子句的存储过程)。
- 参数:
snowflake_conn_id (str) – Snowflake 连接 ID 的引用
sql – 要执行的 SQL 代码。(templated)
autocommit – 如果为 True,每个命令将自动提交。(默认值:True)
parameters – (可选) 用于渲染 SQL 查询的参数。
warehouse (str | None) – 仓库名称(将覆盖连接的额外 JSON 中定义的任何仓库)
database (str | None) – 数据库名称(将覆盖连接中定义的数据库)
schema (str | None) – Schema 名称(将覆盖连接中定义的 schema)
role (str | None) – 角色名称(将覆盖连接的额外 JSON 中定义的任何角色)
authenticator (str | None) – Snowflake 的身份验证器。‘snowflake’(默认)使用 Snowflake 内部身份验证器,‘externalbrowser’ 使用您的网络浏览器和 Okta、ADFS 或已为您帐户定义的任何其他符合 SAML 2.0 标准的身份提供商 (IdP) 进行身份验证,‘https://<您的 Okta 帐户名称>.okta.com’ 通过原生 Okta 进行身份验证。
session_parameters (dict[str, Any] | None) – 您可以在连接到 Snowflake 时设置会话级参数
poll_interval (int) – 轮询查询的间隔(秒)
statement_count (int) – 要执行的 SQL 语句数量
token_life_time (datetime.timedelta) – JWT Token 的生命周期
token_renewal_delta (datetime.timedelta) – JWT Token 的续订增量
bindings (dict[str, Any] | None) – (可选) SQL 语句中绑定变量的值。执行语句时,Snowflake 会将语句中的占位符(? 和 :name)替换为这些指定的值。
deferrable (bool) – 在可推迟模式下运行操作符。
- template_fields: collections.abc.Sequence[str][source]¶