OdbcOperator

开放数据库连接 (ODBC) 是一种用于访问数据库管理系统 (DBMS) 的标准 API。

先决条件任务

要使用此操作符,您需要

  • 安装 python 模块 pyodbc: .. code-block:: bash

    pip install apache-airflow[odbc]

  • 安装您的数据库的 ODBC 驱动程序。

  • 如果您的数据库需要,配置 ODBC 数据源名称 (DSN)。

满足这些先决条件后,您应该能够运行此 Python 片段(将变量值替换为您驱动程序相关的值)。

如果 pyodbc 模块缺失或驱动程序不可用,其他错误消息将通知您。一个 Connection Refused 错误意味着连接字符串指向的主机上没有数据库正在监听新连接。

import pyodbc

driver = "{ODBC Driver 17 for SQL Server}"
server = "localhost"
database = "testdb"
username = "user"
password = "password"

conn_str = (
    f"DRIVER={driver};" f"SERVER={server};" f"DATABASE={database};" f"UID={username};" f"PWD={password};"
)

conn = pyodbc.connect(conn_str)

用法

使用 SQLExecuteQueryOperator 对可通过 ODBC 驱动程序访问的数据库(或数据存储)执行命令。

ODBC 连接 (ODBC Connection) 必须作为 conn_id 传递。

tests/system/odbc/example_odbc.py


    create_table = SQLExecuteQueryOperator(
        task_id="create_table",
        sql="""
        CREATE TABLE IF NOT EXISTS my_table (
            dt VARCHAR(50),
            value VARCHAR(255)
        );
        """,
        conn_id="my_odbc_conn",
        autocommit=True,
    )

参数 sql 可以接收一个字符串或一个字符串列表。每个字符串可以是 SQL 语句,也可以是模板文件的引用。模板引用以 '.sql' 结尾来识别。

参数 autocommit 如果设置为 True,将在每个命令后执行一次提交(默认为 False)。

模板化

您可以使用 Jinja 模板 来参数化 sql

tests/system/odbc/example_odbc.py


    insert_data = SQLExecuteQueryOperator(
        task_id="insert_data",
        sql="""
        INSERT INTO my_table (dt, value)
        VALUES ('{{ ds }}', 'test_value');
        """,
        conn_id="my_odbc_conn",
        autocommit=True,
    )

此条目有帮助吗?