使用 SQLExecuteQueryOperator 连接到 Trino

使用 SQLExecuteQueryOperatorTrino 查询引擎中执行 SQL 命令。

警告

TrinoOperator 已废弃,推荐使用 SQLExecuteQueryOperator。如果您正在使用 TrinoOperator,应尽快迁移。

使用 Operator

使用 trino_conn_id 参数连接到您的 Trino 实例

以下是使用 SQLExecuteQueryOperator 连接到 Trino 的示例用法

tests/system/trino/example_trino.py


with models.DAG(
    dag_id="example_trino",
    schedule="@once",  # Override to match your needs
    start_date=datetime(2025, 2, 24),
    catchup=False,
    tags=["example"],
) as dag:
    trino_create_schema = SQLExecuteQueryOperator(
        task_id="trino_create_schema",
        sql=f" CREATE SCHEMA IF NOT EXISTS {SCHEMA} WITH (location = 's3://irisbkt/cities/') ",
        handler=list,
    )
    trino_create_table = SQLExecuteQueryOperator(
        task_id="trino_create_table",
        sql=f" CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE}( cityid bigint, cityname varchar) ",
        handler=list,
    )
    trino_insert = SQLExecuteQueryOperator(
        task_id="trino_insert",
        sql=f" INSERT INTO {SCHEMA}.{TABLE} VALUES (1, 'San Francisco') ",
        handler=list,
        requires_result_fetch=True,
    )
    trino_multiple_queries = SQLExecuteQueryOperator(
        task_id="trino_multiple_queries",
        sql=[
            f" CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE1}(cityid bigint,cityname varchar) ",
            f" INSERT INTO {SCHEMA}.{TABLE1} VALUES (2, 'San Jose') ",
            f" CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE2}(cityid bigint,cityname varchar) ",
            f" INSERT INTO {SCHEMA}.{TABLE2} VALUES (3, 'San Diego') ",
        ],
        handler=list,
        requires_result_fetch=True,
    )
    trino_templated_query = SQLExecuteQueryOperator(
        task_id="trino_templated_query",
        sql="SELECT * FROM {{ params.SCHEMA }}.{{ params.TABLE }}",
        handler=list,
        params={"SCHEMA": SCHEMA, "TABLE": TABLE1},
    )
    trino_parameterized_query = SQLExecuteQueryOperator(
        task_id="trino_parameterized_query",
        sql=f" SELECT * FROM {SCHEMA}.{TABLE2} WHERE cityname = ?",
        parameters=("San Diego",),
        handler=list,
    )

    (
        trino_create_schema
        >> trino_create_table
        >> trino_insert
        >> trino_multiple_queries
        >> trino_templated_query
        >> trino_parameterized_query
    )

注意

此 Operator 可用于运行任何语法正确的 Trino 查询,并且可以通过使用 liststring 传递多个查询。

此条目是否有帮助?