@task.snowpark
¶
使用 @task.snowpark
装饰器在 Snowflake 数据库中运行 Snowpark Python 代码。
警告
Snowpark 尚不支持 Python 3.12。
目前,此装饰器不支持 Snowpark pandas API,因为 Airflow 中使用了冲突的 pandas 版本。请考虑使用 Snowpark pandas API 配合其他 Snowpark 装饰器或操作符。
前置任务¶
要使用此装饰器,您必须完成以下几项任务
通过 pip 安装 provider package 包。
pip install 'apache-airflow-providers-snowflake'有关详细信息,请参阅安装。
使用操作符¶
使用 snowflake_conn_id
参数指定使用的连接。如果未指定,将使用 snowflake_default
。
`@task.snowpark` 装饰器的示例用法如下
tests/system/snowflake/example_snowpark_decorator.py
@task.snowpark
def setup_data(session: Session):
# The Snowpark session object is injected as an argument
data = [
(1, 0, 5, "Product 1", "prod-1", 1, 10),
(2, 1, 5, "Product 1A", "prod-1-A", 1, 20),
(3, 1, 5, "Product 1B", "prod-1-B", 1, 30),
(4, 0, 10, "Product 2", "prod-2", 2, 40),
(5, 4, 10, "Product 2A", "prod-2-A", 2, 50),
(6, 4, 10, "Product 2B", "prod-2-B", 2, 60),
(7, 0, 20, "Product 3", "prod-3", 3, 70),
(8, 7, 20, "Product 3A", "prod-3-A", 3, 80),
(9, 7, 20, "Product 3B", "prod-3-B", 3, 90),
(10, 0, 50, "Product 4", "prod-4", 4, 100),
(11, 10, 50, "Product 4A", "prod-4-A", 4, 100),
(12, 10, 50, "Product 4B", "prod-4-B", 4, 100),
]
columns = ["id", "parent_id", "category_id", "name", "serial_number", "key", "3rd"]
df = session.create_dataframe(data, schema=columns)
table_name = "sample_product_data"
df.write.save_as_table(table_name, mode="overwrite")
return table_name
table_name = setup_data() # type: ignore[call-arg, misc]
@task.snowpark
def check_num_rows(table_name: str):
# Alternatively, retrieve the Snowpark session object using `get_active_session`
from snowflake.snowpark.context import get_active_session
session = get_active_session()
df = session.table(table_name)
assert df.count() == 12
check_num_rows(table_name)
如示例所示,在 Python 函数中有两种使用 Snowpark session 对象的方法
将 Snowpark session 对象作为名为
session
的关键字参数传递给函数。Snowpark session 将被自动注入到函数中,您可以像往常一样使用它。使用来自 Snowpark 的 get_active_session 函数在函数内部检索 Snowpark session 对象。
注意
可以传递给装饰器的参数将优先于 Airflow 连接元数据中已有的参数(例如 schema
、role
、database
等)。