构建简单数据管道¶
欢迎阅读本系列第三个教程!至此,您已经编写了第一个 DAG 并使用了基本 Operator。现在是时候构建一个小而有意义的数据管道了——一个从外部源检索数据、将其加载到数据库并在此过程中进行清洗的数据管道。
本教程将介绍 SQLExecuteQueryOperator
,这是一种在 Airflow 中执行 SQL 的灵活且现代的方式。我们将使用它与本地 Postgres 数据库进行交互,我们将在 Airflow UI 中配置该数据库。
通过本教程,您将拥有一个可用的管道,该管道将:
下载 CSV 文件
将数据加载到暂存表中
清洗数据并将其 upsert 到目标表中
在此过程中,您将获得 Airflow UI、连接系统、SQL 执行和 DAG 编写模式的实践经验。
想在学习过程中深入了解吗?这里有两个有用的参考资料:
Postgres provider 文档
让我们开始吧!
初始设置¶
注意
您需要安装 Docker 才能运行本教程。我们将使用 Docker Compose 在本地启动 Airflow。如果您在设置方面需要帮助,请查看Docker Compose 快速入门指南。
要运行我们的管道,我们需要一个可用的 Airflow 环境。Docker Compose 使这变得简单安全——无需进行系统范围的安装。只需打开您的终端并运行以下命令:
# Download the docker-compose.yaml file
curl -LfO 'https://airflow.org.cn/docs/apache-airflow/stable/docker-compose.yaml'
# Make expected directories and set an expected environment variable
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
# Initialize the database
docker compose up airflow-init
# Start up all services
docker compose up
Airflow 启动并运行后,访问 UI 地址 http://localhost:8080
。
使用以下凭据登录:
用户名:
airflow
密码:
airflow
您将进入 Airflow 仪表板,在那里您可以触发 DAG、查看日志以及管理您的环境。
创建 Postgres 连接¶
在我们的管道能够写入 Postgres 之前,我们需要告诉 Airflow 如何连接到它。在 UI 中,打开 Admin > Connections 页面,然后点击 + 按钮添加一个新连接。
填写以下详细信息:
连接 ID:
tutorial_pg_conn
连接类型:
postgres
主机:
postgres
数据库:
airflow
(这是我们容器中的默认数据库)登录用户:
airflow
密码:
airflow
端口:
5432

保存连接。这会告诉 Airflow 如何访问在您的 Docker 环境中运行的 Postgres 数据库。
接下来,我们将开始构建使用此连接的管道。
创建用于暂存和最终数据的表¶
让我们从创建表开始。我们将创建两个表:
employees_temp
: 用于原始数据的暂存表employees
: 清洗并去重后的目标表
我们将使用 SQLExecuteQueryOperator
来运行创建这些表所需的 SQL 语句。
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
create_employees_table = SQLExecuteQueryOperator(
task_id="create_employees_table",
conn_id="tutorial_pg_conn",
sql="""
CREATE TABLE IF NOT EXISTS employees (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
create_employees_temp_table = SQLExecuteQueryOperator(
task_id="create_employees_temp_table",
conn_id="tutorial_pg_conn",
sql="""
DROP TABLE IF EXISTS employees_temp;
CREATE TABLE employees_temp (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
您可以选择将这些 SQL 语句放在 dags/
文件夹中的 .sql
文件中,并将文件路径传递给 sql=
参数。这是一种保持 DAG 代码整洁的好方法。
将数据加载到暂存表中¶
接下来,我们将下载一个 CSV 文件,将其保存在本地,并使用 PostgresHook
将其加载到 employees_temp
中。
import os
import requests
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
@task
def get_data():
# NOTE: configure this as appropriate for your airflow environment
data_path = "/opt/airflow/dags/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)
url = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/docs/tutorial/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
此任务让您体验 Airflow 与原生 Python 和 SQL Hook 的结合——这在实际管道中是一种常见模式。
合并和清洗数据¶
现在让我们对数据进行去重,并将其合并到最终表中。我们将编写一个运行 SQL INSERT … ON CONFLICT DO UPDATE 语句的任务。
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
@task
def merge_data():
query = """
INSERT INTO employees
SELECT *
FROM (
SELECT DISTINCT *
FROM employees_temp
) t
ON CONFLICT ("Serial Number") DO UPDATE
SET
"Employee Markme" = excluded."Employee Markme",
"Description" = excluded."Description",
"Leave" = excluded."Leave";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
return 1
定义 DAG¶
现在我们已经定义了所有任务,是时候将它们组合成一个 DAG 了。
import datetime
import pendulum
import os
import requests
from airflow.sdk import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
@dag(
dag_id="process_employees",
schedule="0 0 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
def ProcessEmployees():
create_employees_table = SQLExecuteQueryOperator(
task_id="create_employees_table",
conn_id="tutorial_pg_conn",
sql="""
CREATE TABLE IF NOT EXISTS employees (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
create_employees_temp_table = SQLExecuteQueryOperator(
task_id="create_employees_temp_table",
conn_id="tutorial_pg_conn",
sql="""
DROP TABLE IF EXISTS employees_temp;
CREATE TABLE employees_temp (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
@task
def get_data():
# NOTE: configure this as appropriate for your airflow environment
data_path = "/opt/airflow/dags/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)
url = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/docs/tutorial/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
@task
def merge_data():
query = """
INSERT INTO employees
SELECT *
FROM (
SELECT DISTINCT *
FROM employees_temp
) t
ON CONFLICT ("Serial Number") DO UPDATE
SET
"Employee Markme" = excluded."Employee Markme",
"Description" = excluded."Description",
"Leave" = excluded."Leave";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
return 1
[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
dag = ProcessEmployees()
将此 DAG 保存为 dags/process_employees.py
。稍后,它将显示在 UI 中。
触发并探索你的 DAG¶
打开 Airflow UI,在列表中找到 process_employees
DAG。使用滑块将其“开启”,然后点击播放按钮触发运行。
您可以在 Grid 视图中查看每个任务的运行情况,并查看每个步骤的日志。


成功后,您将拥有一个完全可用的管道,该管道可以集成外部数据、将其加载到 Postgres 中并保持其整洁。
接下来是什么?¶
干得好!您现在已经使用 Airflow 的核心模式和工具构建了一个真正的管道。以下是一些您可以进一步探索的方向:
尝试替换为不同的 SQL provider,例如 MySQL 或 SQLite。
将您的 DAG 拆分为 TaskGroup 或重构为更易用的模式。
添加告警步骤或在数据处理完成后发送通知。
另请参阅
在Airflow 文档中浏览更多操作指南