连接 Iceberg

Iceberg 连接类型使用 pyiceberg 连接到 Iceberg REST catalog。该 Hook 提供目录的内省功能(列出命名空间、列出表、读取 schema、检查分区和快照),并为 Spark、Trino、Flink 等外部引擎生成 OAuth2 token。

在你的 Airflow 环境中安装 Iceberg Provider 后,iceberg 对应的连接类型将可用。

默认连接 ID

Iceberg Hook 使用参数 iceberg_conn_id 作为 Connection ID,默认值为参数 iceberg_default。如需在不同环境之间切换,可创建多个连接。

配置连接

Catalog URI(主机)

Iceberg REST catalog 端点的 URL。示例:https://your-catalog.example.com/ws/v1

客户端 ID(登录)

用于对 catalog 进行身份验证的 OAuth2 客户端 ID。对不需要 OAuth2 凭证的 catalog(例如本地 catalog),请留空。

客户端密钥(密码)

用于对 catalog 进行身份验证的 OAuth2 客户端密钥。

额外(可选)

一个 JSON 对象,包含传递给 pyiceberg.catalog.load_catalog() 的额外 catalog 属性。常见属性如下:

{
    "warehouse": "s3://my-warehouse/",
    "s3.endpoint": "https://s3.us-east-1.amazonaws.com",
    "s3.region": "us-east-1",
    "s3.access-key-id": "AKIA...",
    "s3.secret-access-key": "..."
}

对于 AWS/GCP/Azure 部署,推荐使用 IAM 角色或基于环境的凭证,仅在 extra 中传递 warehouse 路径。

从 1.x 迁移

在 2.0.0 版本中,get_conn() 现在返回一个 pyiceberg.catalog.Catalog 实例,而不是 token 字符串。如果你之前使用 get_conn() 获取 OAuth2 token,请改用 get_token()

# Before (1.x)
token = IcebergHook().get_conn()

# After (2.0)
token = IcebergHook().get_token()

get_token_macro() 方法已更新为自动使用 get_token(),因此 Jinja2 模板无需任何更改即可继续工作。

此条目是否有帮助?