airflow.providers.common.sql.hooks.sql

属性

T

SQL_PLACEHOLDERS

WARNING_MESSAGE

ConnectorProtocol

数据库连接协议。

DbApiHook

SQL Hook 的抽象基类。

函数

return_single_query_results(sql, return_last, ...)

fetch_all_handler(cursor)

fetch_one_handler(cursor)

resolve_dialects()

模块内容

airflow.providers.common.sql.hooks.sql.T[source]
airflow.providers.common.sql.hooks.sql.SQL_PLACEHOLDERS[source]
airflow.providers.common.sql.hooks.sql.WARNING_MESSAGE = Multiline-String[source]
显示值
"""Import of {} from the 'airflow.providers.common.sql.hooks' module is deprecated and will
be removed in the future. Please import it from 'airflow.providers.common.sql.hooks.handlers'."""
airflow.providers.common.sql.hooks.sql.return_single_query_results(sql, return_last, split_statements)[source]
airflow.providers.common.sql.hooks.sql.fetch_all_handler(cursor)[source]
airflow.providers.common.sql.hooks.sql.fetch_one_handler(cursor)[source]
airflow.providers.common.sql.hooks.sql.resolve_dialects()[source]
class airflow.providers.common.sql.hooks.sql.ConnectorProtocol[source]

基类: Protocol

数据库连接协议。

connect(host, port, username, schema)[source]

连接到数据库。

参数:
  • host (str) – 要连接的数据库主机。

  • port (int) – 要连接的数据库端口。

  • username (str) – 用于身份验证的数据库用户名。

  • schema (str) – 要连接的数据库模式。

返回:

授权连接对象。

返回类型:

Any

class airflow.providers.common.sql.hooks.sql.DbApiHook(*args, schema=None, log_sql=True, **kwargs)[source]

基类: airflow.hooks.base.BaseHook

SQL Hook 的抽象基类。

当子类化时,维护者可以重写 _make_common_data_structure 方法:此方法将 handler 方法(通常是 cursor.fetchall())的结果转换为从该类派生的所有 Hook 共享的通用对象(元组)。大多数情况下,底层 SQL 库已经从其游标返回元组,并且可以忽略 _make_common_data_structure 方法。

参数:
  • schema (str | None) – 可选的数据库模式,它会覆盖连接中指定的模式。请确保如果您在派生 Hook 的构造函数中更改 schema 参数值,此更改应在调用 DBApiHook.__init__() 之前完成。

  • log_sql (bool) – 是否在执行 SQL 查询时记录日志。默认为 True

conn_name_attr: str[source]
default_conn_name = 'default_conn_id'[source]
strip_semicolon = False[source]
supports_autocommit = False[source]
supports_executemany = False[source]
connector: ConnectorProtocol | None = None[source]
log_sql = True[source]
descriptions: list[collections.abc.Sequence[collections.abc.Sequence] | None] = [][source]
get_conn_id()[source]
property placeholder: str[source]

返回 SQL 占位符。

property insert_statement_format: str[source]

返回 INSERT 语句格式。

property replace_statement_format: str[source]

返回 REPLACE 语句格式。

property escape_word_format: str[source]

返回转义词格式。

property escape_column_names: bool[source]

返回转义列名标志。

property connection: airflow.models.Connection[source]
property connection_extra: dict[source]
property connection_extra_lower: dict[source]

connection.extra_dejson,但其中的键已转换为小写。

这在内部用于对额外参数进行不区分大小写的访问。

get_conn()[source]

返回一个连接对象。

get_uri()[source]

从连接中提取 URI。

返回:

提取的 URI。

返回类型:

str

property sqlalchemy_url: sqlalchemy.engine.URL[source]

从连接返回一个 Sqlalchemy.engine.URL 对象。

需要在提供者子类中实现以返回 sqlalchemy.engine.URL 对象。

返回:

提取的 sqlalchemy.engine.URL 对象。

返回类型:

sqlalchemy.engine.URL

get_sqlalchemy_engine(engine_kwargs=None)[source]

获取 sqlalchemy_engine 对象。

参数:

engine_kwargs – 在 create_engine() 中使用的 Kwargs。

返回:

创建的引擎。

property inspector: sqlalchemy.engine.Inspector[source]
property dialect_name: str[source]
property dialect: airflow.providers.common.sql.dialects.dialect.Dialect[source]
property reserved_words: set[str][source]
get_reserved_words(dialect_name)[source]
get_pandas_df(sql, parameters=None, **kwargs)[source]

执行 SQL 并返回一个 pandas DataFrame。

参数:
  • sql – 要执行的 SQL 语句 (str) 或要执行的 SQL 语句列表

  • parameters (list | tuple | collections.abc.Mapping[str, Any] | None) – 用于渲染 SQL 查询的参数。

  • kwargs – (可选)传递给 pandas.io.sql.read_sql 方法

get_pandas_df_by_chunks(sql, parameters=None, *, chunksize, **kwargs)[source]
get_df(sql, parameters=None, *, df_type='pandas', **kwargs)[source]

执行 SQL 并返回一个 DataFrame。

参数:
  • sql – 要执行的 SQL 语句 (str) 或要执行的 SQL 语句列表

  • parameters (list | tuple | collections.abc.Mapping[str, Any] | None) – 用于渲染 SQL 查询的参数。

  • df_type (typing_extensions.Literal[pandas, polars]) – 要返回的 DataFrame 类型,可以是 “pandas” 或 “polars”

  • kwargs – (可选)传递给 pandas.io.sql.read_sqlpolars.read_database 方法

get_df_by_chunks(sql, parameters=None, *, chunksize, df_type='pandas', **kwargs)[source]

执行 SQL 并返回一个生成器。

参数:
  • sql – 要执行的 SQL 语句 (str) 或要执行的 SQL 语句列表

  • parameters (list | tuple | collections.abc.Mapping[str, Any] | None) – 用于渲染 SQL 查询的参数

  • chunksize (int) – 每个块中包含的行数

  • df_type (typing_extensions.Literal[pandas, polars]) – 要返回的 DataFrame 类型,可以是 “pandas” 或 “polars”

  • kwargs – (可选)传递给 pandas.io.sql.read_sqlpolars.read_database 方法

get_records(sql, parameters=None)[source]

执行 SQL 并返回一组记录。

参数:
get_first(sql, parameters=None)[source]

执行 SQL 并返回第一行结果。

参数:
static strip_sql_string(sql)[source]
static split_sql_string(sql, strip_semicolon=False)[source]

将字符串拆分为多个 SQL 表达式。

参数:
  • sql (str) – 可能包含多个表达式的 SQL 字符串

  • strip_semicolon (bool) – 是否从 SQL 字符串中去除分号

返回:

单独表达式的列表

返回类型:

list[str]

property last_description: collections.abc.Sequence[collections.abc.Sequence] | None[source]
run(sql: str | collections.abc.Iterable[str], autocommit: bool = ..., parameters: collections.abc.Iterable | collections.abc.Mapping[str, Any] | None = ..., handler: None = ..., split_statements: bool = ..., return_last: bool = ...) None[source]
run(sql: str | collections.abc.Iterable[str], autocommit: bool = ..., parameters: collections.abc.Iterable | collections.abc.Mapping[str, Any] | None = ..., handler: Callable[[Any], T] = ..., split_statements: bool = ..., return_last: bool = ...) tuple | list[tuple] | list[list[tuple] | tuple] | None

执行一个命令或一系列命令。

将 SQL 语句列表传递给 sql 参数,以便按顺序执行它们。

该方法将返回单个查询结果(通常是行列表)或这些结果的列表,其中列表中的每个元素是一个查询的结果(通常是行列表的列表 :D)。

出于兼容性原因,DBAPIHook 的行为有些令人困惑。在某些情况下,当运行多个查询时,返回值将是结果的可迭代对象(列表)——每个查询对应一个结果。然而,在其他情况下,当运行单个查询时,返回值将是该单个查询的结果,而不会将结果包装在列表中。

单个查询结果不包装在列表中返回的情况如下:

  1. sql 是字符串且 return_last 为 True(无论 split_statements 的值是什么)。

  2. sql 是字符串且 split_statements 为 False

在所有其他情况下,结果都会被包装在一个列表中,即使只有一个语句要处理。特别是,在以下情况下,返回值将是一个查询结果列表:

  1. sql 是字符串语句的可迭代对象时(无论 return_last 的值是什么)。

  2. sql 是字符串,split_statements 为 True 且 return_last 为 False 时。

调用 run 后,您可以访问 hook 对象上的以下属性:

  • descriptions:游标描述数组。如果 return_last 为 True,这将是一个包含最后一个语句的游标 description 的单元素数组。否则,它将包含每个已执行语句的游标描述。

  • last_description:最后一个已执行语句的描述。

请注意,只有提供了 handler 时,查询结果才会被实际返回;如果 handler 为 None,此方法将返回 None。

Handler 是一种将游标(Iterator)中的行处理成适合返回给 XCom 并通常适合内存的值的方式。

您可以使用预定义的 handler(fetch_all_handlerfetch_one_handler)或实现您自己的 handler。

参数:
  • sql – 要执行的 SQL 语句 (str) 或要执行的 SQL 语句列表

  • autocommit – 在执行查询之前,将连接的 autocommit 设置为什么值。

  • parameters – 用于渲染 SQL 查询的参数。

  • handler – 结果 handler,它会用每个语句的结果调用。

  • split_statements – 是否将单个 SQL 字符串拆分成多个语句并分别运行。

  • return_last – 是否仅返回最后一个语句的结果,或拆分后所有语句的结果。

返回:

如果提供了 handler,则返回查询结果(结果可能是列表,取决于参数)。

set_autocommit(conn, autocommit)[source]

在连接上设置 autocommit 标志。

get_autocommit(conn)[source]

获取给定连接的 autocommit 设置。

参数:

conn – 用于获取 autocommit 设置的连接。

返回:

连接的 autocommit 设置。如果在连接上将 autocommit 设置为 True,则为 True。如果未设置、设置为 False 或连接不支持自动提交,则为 False。

返回类型:

bool

get_cursor()[source]

返回一个游标。

insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, *, executemany=False, fast_executemany=False, autocommit=False, **kwargs)[source]

将一组元组插入到表中。

行按块插入,每个块(大小为 commit_every)都在一个新事务中完成。

参数:
  • table – 目标表的名称

  • rows – 要插入表中的行

  • target_fields – 要填充表中列的名称

  • commit_every – 在一个事务中插入的最大行数。设置为 0 以在一个事务中插入所有行。

  • replace – 是否替换而不是插入

  • executemany – 如果为 True,则按照 commit_every 参数定义的块一次性插入所有行。这仅适用于所有行具有相同列名数量的情况,但会带来更好的性能。

  • fast_executemany – 如果为 True,则用于 executemany 的游标上将设置 fast_executemany 参数,如果驱动程序支持,这将带来更好的性能。

  • autocommit – 在执行查询之前,将连接的 autocommit 设置为什么值。

abstract bulk_dump(table, tmp_file)[source]

将数据库表转储到制表符分隔的文件中。

参数:
  • table – 源表的名称

  • tmp_file – 目标文件的路径

abstract bulk_load(table, tmp_file)[source]

将制表符分隔的文件加载到数据库表中。

参数:
  • table – 目标表的名称

  • tmp_file – 要加载到表中的文件路径

test_connection()[source]

使用数据库特定的查询测试连接。

get_openlineage_database_info(connection)[source]

返回生成和解析 lineage 元数据所需的数据库特定信息。

这包括有助于构建信息模式查询和创建正确命名空间的信息。

参数:

connection – Airflow 连接,用于减少对 get_connection 方法的调用

get_openlineage_database_dialect(connection)[source]

返回用于 SQL 解析的数据库方言。

查看支持的方言列表:https://openlineage.io/docs/development/sql#sql-dialects

get_openlineage_default_schema()[source]

返回数据库特定的默认模式。

get_openlineage_database_specific_lineage(task_instance)[source]

返回额外的数据库特定 lineage,例如查询执行信息。

此方法仅在任务完成后调用。

参数:

task_instance – 可用于检索在任务运行时收集的附加信息。

static get_openlineage_authority_part(connection, default_port=None)[source]

从 Airflow 连接获取 authority 部分。

authority 表示连接的主机名和端口,并符合 OpenLineage 对许多数据库(例如 MySQL、Postgres、Trino)的命名约定。

参数:

default_port (int | None) – (可选)如果在连接 URI 中未解析出端口,则使用此值。

get_db_log_messages(conn)[source]

记录会话期间发送给客户端的所有数据库消息。

参数:

conn – 连接对象

此条目有帮助吗?