airflow.providers.postgres.hooks.postgres

属性

CursorType

PostgresHook

与 Postgres 交互。

模块内容

airflow.providers.postgres.hooks.postgres.CursorType[source]
class airflow.providers.postgres.hooks.postgres.PostgresHook(*args, options=None, enable_log_db_messages=False, **kwargs)[source]

基础类: airflow.providers.common.sql.hooks.sql.DbApiHook

与 Postgres 交互。

您可以在连接的 extra 字段中指定 ssl 参数,例如 {"sslmode": "require", "sslcert": "/path/to/cert.pem", etc}。您也可以选择 cursor,例如 {"cursor": "dictcursor"}。请参考 psycopg2.extras 以了解更多详情。

注意:对于 Redshift,请在 extra 连接参数中使用 keepalives_idle 并将其设置为小于 300 秒的值。

注意:对于 AWS IAM 身份验证,请在 extra 连接参数中使用 iam 并将其设置为 true。密码字段留空。这将使用“aws_default”连接来获取临时令牌,除非您在 extras 中覆盖它。extras 示例: {"iam":true, "aws_conn_id":"my_aws_conn"}

对于 Redshift,也在 extra 连接参数中使用 redshift 并将其设置为 true。cluster-identifier 从 host 字段的开头提取,因此是可选的。但是,可以在 extra 字段中覆盖它。extras 示例: {"iam":true, "redshift":true, "cluster-identifier": "my_cluster_id"}

对于 Redshift Serverless,在 extra 连接参数中使用 redshift-serverless 并将其设置为 true。workgroup-name 从 host 字段的开头提取,因此是可选的。但是,可以在 extra 字段中覆盖它。extras 示例: {"iam":true, "redshift-serverless":true, "workgroup-name": "my_serverless_workgroup"}

参数
  • postgres_conn_id – 指向特定 postgres 数据库的 postgres 连接 ID

  • options (str | None) – 可选。指定在连接开始时发送到服务器的命令行选项。例如,将其设置为 -c search_path=myschema 会将会话的 search_path 值设置为 myschema

  • enable_log_db_messages (bool) – 可选。如果启用,则记录会话期间发送到客户端的数据库消息。为避免内存泄漏,psycopg2 只保存最后 50 条消息。详情请参阅: PostgreSQL 日志配置参数

conn_name_attr = 'postgres_conn_id'[source]
default_conn_name = 'postgres_default'[source]
conn_type = 'postgres'[source]
hook_name = 'Postgres'[source]
supports_autocommit = True[source]
supports_executemany = True[source]
ignored_extra_options[source]
conn: psycopg2.extensions.connection = None[source]
database: str | None[source]
options = None[source]
enable_log_db_messages = False[source]
property sqlalchemy_url: sqlalchemy.engine.URL[source]

从连接返回 Sqlalchemy.engine.URL 对象。

需要在提供者子类中实现以返回 sqlalchemy.engine.URL 对象。

返回

提取的 sqlalchemy.engine.URL 对象。

返回类型

sqlalchemy.engine.URL

property dialect_name: str[source]
property dialect: airflow.providers.common.sql.dialects.dialect.Dialect[source]
get_conn()[source]

建立与 postgres 数据库的连接。

copy_expert(sql, filename)[source]

使用 psycopg2 的 copy_expert 方法执行 SQL。

在没有超级用户权限的情况下执行 COPY 命令是必要的。

注意:如果此方法使用“COPY FROM”语句调用,并且指定输入文件不存在,它会创建一个空文件,不加载数据,但操作会成功。因此,如果用户想知道输入文件何时不存在,他们必须自己检查其是否存在。

get_uri()[source]

从连接中提取 URI。

返回

以 Sqlalchemy URI 格式提取的 URI。

返回类型

str

bulk_load(table, tmp_file)[source]

将制表符分隔的文件加载到数据库表中。

bulk_dump(table, tmp_file)[source]

将数据库表转储到制表符分隔的文件中。

get_iam_token(conn)[source]

获取 IAM 令牌。

这使用 AWSHook 来检索临时密码以连接到 Postgres 或 Redshift。端口是必需的。如果未提供,则使用默认的 5432。

get_table_primary_key(table, schema='public')[source]

获取表的主键。

参数
  • table (str) – 目标表的名称

  • schema (str | None) – 目标模式的名称,默认为 public

返回

主键列列表

返回类型

list[str] | None

get_openlineage_database_info(connection)[source]

为 OpenLineage 返回 Postgres/Redshift 特定信息。

get_openlineage_database_dialect(connection)[source]

返回 postgres/redshift 方言。

get_openlineage_default_schema()[source]

返回当前模式。这通常通过 SEARCH_PATH 参数更改。

classmethod get_ui_field_behaviour()[source]
get_db_log_messages(conn)[source]

记录会话期间发送到客户端的所有数据库消息。

参数

conn – 连接对象

此条目有帮助吗?