连接 Iceberg¶
Iceberg 连接类型使用 pyiceberg 连接到 Iceberg REST catalog。该 Hook 提供目录的内省功能(列出命名空间、列出表、读取 schema、检查分区和快照),并为 Spark、Trino、Flink 等外部引擎生成 OAuth2 token。
在你的 Airflow 环境中安装 Iceberg Provider 后,iceberg 对应的连接类型将可用。
默认连接 ID¶
Iceberg Hook 使用参数 iceberg_conn_id 作为 Connection ID,默认值为参数 iceberg_default。如需在不同环境之间切换,可创建多个连接。
配置连接¶
- Catalog URI(主机)
Iceberg REST catalog 端点的 URL。示例:
https://your-catalog.example.com/ws/v1- 客户端 ID(登录)
用于对 catalog 进行身份验证的 OAuth2 客户端 ID。对不需要 OAuth2 凭证的 catalog(例如本地 catalog),请留空。
- 客户端密钥(密码)
用于对 catalog 进行身份验证的 OAuth2 客户端密钥。
- 额外(可选)
一个 JSON 对象,包含传递给
pyiceberg.catalog.load_catalog()的额外 catalog 属性。常见属性如下:{ "warehouse": "s3://my-warehouse/", "s3.endpoint": "https://s3.us-east-1.amazonaws.com", "s3.region": "us-east-1", "s3.access-key-id": "AKIA...", "s3.secret-access-key": "..." }
对于 AWS/GCP/Azure 部署,推荐使用 IAM 角色或基于环境的凭证,仅在 extra 中传递
warehouse路径。
从 1.x 迁移¶
在 2.0.0 版本中,get_conn() 现在返回一个 pyiceberg.catalog.Catalog 实例,而不是 token 字符串。如果你之前使用 get_conn() 获取 OAuth2 token,请改用 get_token()。
# Before (1.x)
token = IcebergHook().get_conn()
# After (2.0)
token = IcebergHook().get_token()
get_token_macro() 方法已更新为自动使用 get_token(),因此 Jinja2 模板无需任何更改即可继续工作。