Airflow Summit 2025 将于 10 月 07 日至 09 日举行。立即注册获取早鸟票!

构建简单数据管道

欢迎阅读本系列第三个教程!至此,您已经编写了第一个 DAG 并使用了基本 Operator。现在是时候构建一个小而有意义的数据管道了——一个从外部源检索数据、将其加载到数据库并在此过程中进行清洗的数据管道。

本教程将介绍 SQLExecuteQueryOperator,这是一种在 Airflow 中执行 SQL 的灵活且现代的方式。我们将使用它与本地 Postgres 数据库进行交互,我们将在 Airflow UI 中配置该数据库。

通过本教程,您将拥有一个可用的管道,该管道将:

  • 下载 CSV 文件

  • 将数据加载到暂存表中

  • 清洗数据并将其 upsert 到目标表中

在此过程中,您将获得 Airflow UI、连接系统、SQL 执行和 DAG 编写模式的实践经验。

想在学习过程中深入了解吗?这里有两个有用的参考资料:

让我们开始吧!

初始设置

注意

您需要安装 Docker 才能运行本教程。我们将使用 Docker Compose 在本地启动 Airflow。如果您在设置方面需要帮助,请查看Docker Compose 快速入门指南

要运行我们的管道,我们需要一个可用的 Airflow 环境。Docker Compose 使这变得简单安全——无需进行系统范围的安装。只需打开您的终端并运行以下命令:

# Download the docker-compose.yaml file
curl -LfO 'https://airflow.org.cn/docs/apache-airflow/stable/docker-compose.yaml'

# Make expected directories and set an expected environment variable
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env

# Initialize the database
docker compose up airflow-init

# Start up all services
docker compose up

Airflow 启动并运行后,访问 UI 地址 http://localhost:8080

使用以下凭据登录:

  • 用户名: airflow

  • 密码: airflow

您将进入 Airflow 仪表板,在那里您可以触发 DAG、查看日志以及管理您的环境。

创建 Postgres 连接

在我们的管道能够写入 Postgres 之前,我们需要告诉 Airflow 如何连接到它。在 UI 中,打开 Admin > Connections 页面,然后点击 + 按钮添加一个新连接

填写以下详细信息:

  • 连接 ID: tutorial_pg_conn

  • 连接类型: postgres

  • 主机: postgres

  • 数据库: airflow (这是我们容器中的默认数据库)

  • 登录用户: airflow

  • 密码: airflow

  • 端口: 5432

Add Connection form in Airflow's web UI with Postgres details filled in.

保存连接。这会告诉 Airflow 如何访问在您的 Docker 环境中运行的 Postgres 数据库。

接下来,我们将开始构建使用此连接的管道。

创建用于暂存和最终数据的表

让我们从创建表开始。我们将创建两个表:

  • employees_temp: 用于原始数据的暂存表

  • employees: 清洗并去重后的目标表

我们将使用 SQLExecuteQueryOperator 来运行创建这些表所需的 SQL 语句。

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

create_employees_table = SQLExecuteQueryOperator(
    task_id="create_employees_table",
    conn_id="tutorial_pg_conn",
    sql="""
        CREATE TABLE IF NOT EXISTS employees (
            "Serial Number" NUMERIC PRIMARY KEY,
            "Company Name" TEXT,
            "Employee Markme" TEXT,
            "Description" TEXT,
            "Leave" INTEGER
        );""",
)

create_employees_temp_table = SQLExecuteQueryOperator(
    task_id="create_employees_temp_table",
    conn_id="tutorial_pg_conn",
    sql="""
        DROP TABLE IF EXISTS employees_temp;
        CREATE TABLE employees_temp (
            "Serial Number" NUMERIC PRIMARY KEY,
            "Company Name" TEXT,
            "Employee Markme" TEXT,
            "Description" TEXT,
            "Leave" INTEGER
        );""",
)

您可以选择将这些 SQL 语句放在 dags/ 文件夹中的 .sql 文件中,并将文件路径传递给 sql= 参数。这是一种保持 DAG 代码整洁的好方法。

将数据加载到暂存表中

接下来,我们将下载一个 CSV 文件,将其保存在本地,并使用 PostgresHook 将其加载到 employees_temp 中。

import os
import requests
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task
def get_data():
    # NOTE: configure this as appropriate for your airflow environment
    data_path = "/opt/airflow/dags/files/employees.csv"
    os.makedirs(os.path.dirname(data_path), exist_ok=True)

    url = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/docs/tutorial/pipeline_example.csv"

    response = requests.request("GET", url)

    with open(data_path, "w") as file:
        file.write(response.text)

    postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
    conn = postgres_hook.get_conn()
    cur = conn.cursor()
    with open(data_path, "r") as file:
        cur.copy_expert(
            "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
            file,
        )
    conn.commit()

此任务让您体验 Airflow 与原生 Python 和 SQL Hook 的结合——这在实际管道中是一种常见模式。

合并和清洗数据

现在让我们对数据进行去重,并将其合并到最终表中。我们将编写一个运行 SQL INSERT … ON CONFLICT DO UPDATE 语句的任务。

from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task
def merge_data():
    query = """
        INSERT INTO employees
        SELECT *
        FROM (
            SELECT DISTINCT *
            FROM employees_temp
        ) t
        ON CONFLICT ("Serial Number") DO UPDATE
        SET
              "Employee Markme" = excluded."Employee Markme",
              "Description" = excluded."Description",
              "Leave" = excluded."Leave";
    """
    try:
        postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
        conn = postgres_hook.get_conn()
        cur = conn.cursor()
        cur.execute(query)
        conn.commit()
        return 0
    except Exception as e:
        return 1

定义 DAG

现在我们已经定义了所有任务,是时候将它们组合成一个 DAG 了。

import datetime
import pendulum
import os

import requests
from airflow.sdk import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


@dag(
    dag_id="process_employees",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
)
def ProcessEmployees():
    create_employees_table = SQLExecuteQueryOperator(
        task_id="create_employees_table",
        conn_id="tutorial_pg_conn",
        sql="""
            CREATE TABLE IF NOT EXISTS employees (
                "Serial Number" NUMERIC PRIMARY KEY,
                "Company Name" TEXT,
                "Employee Markme" TEXT,
                "Description" TEXT,
                "Leave" INTEGER
            );""",
    )

    create_employees_temp_table = SQLExecuteQueryOperator(
        task_id="create_employees_temp_table",
        conn_id="tutorial_pg_conn",
        sql="""
            DROP TABLE IF EXISTS employees_temp;
            CREATE TABLE employees_temp (
                "Serial Number" NUMERIC PRIMARY KEY,
                "Company Name" TEXT,
                "Employee Markme" TEXT,
                "Description" TEXT,
                "Leave" INTEGER
            );""",
    )

    @task
    def get_data():
        # NOTE: configure this as appropriate for your airflow environment
        data_path = "/opt/airflow/dags/files/employees.csv"
        os.makedirs(os.path.dirname(data_path), exist_ok=True)

        url = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/docs/tutorial/pipeline_example.csv"

        response = requests.request("GET", url)

        with open(data_path, "w") as file:
            file.write(response.text)

        postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
        conn = postgres_hook.get_conn()
        cur = conn.cursor()
        with open(data_path, "r") as file:
            cur.copy_expert(
                "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
                file,
            )
        conn.commit()

    @task
    def merge_data():
        query = """
            INSERT INTO employees
            SELECT *
            FROM (
                SELECT DISTINCT *
                FROM employees_temp
            ) t
            ON CONFLICT ("Serial Number") DO UPDATE
            SET
              "Employee Markme" = excluded."Employee Markme",
              "Description" = excluded."Description",
              "Leave" = excluded."Leave";
        """
        try:
            postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
            conn = postgres_hook.get_conn()
            cur = conn.cursor()
            cur.execute(query)
            conn.commit()
            return 0
        except Exception as e:
            return 1

    [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()


dag = ProcessEmployees()

将此 DAG 保存为 dags/process_employees.py。稍后,它将显示在 UI 中。

触发并探索你的 DAG

打开 Airflow UI,在列表中找到 process_employees DAG。使用滑块将其“开启”,然后点击播放按钮触发运行。

您可以在 Grid 视图中查看每个任务的运行情况,并查看每个步骤的日志。

DAG List view showing the ``process_employees`` DAG

DAG Overview page for ``process_employees`` DAG showing the DAG run

成功后,您将拥有一个完全可用的管道,该管道可以集成外部数据、将其加载到 Postgres 中并保持其整洁。

接下来是什么?

干得好!您现在已经使用 Airflow 的核心模式和工具构建了一个真正的管道。以下是一些您可以进一步探索的方向:

  • 尝试替换为不同的 SQL provider,例如 MySQL 或 SQLite。

  • 将您的 DAG 拆分为 TaskGroup 或重构为更易用的模式。

  • 添加告警步骤或在数据处理完成后发送通知。

另请参阅

这篇文章有帮助吗?