构建运行管道¶
我们来看另一个示例:我们需要从在线托管的文件中获取一些数据,并将其插入到我们的本地数据库中。我们还需要在插入时删除重复行。
初始设置¶
我们需要安装 Docker,因为我们将使用 在 Docker 中运行 Airflow 过程来完成此示例。以下步骤应该足够,但有关完整说明,请参阅快速入门文档。
# Download the docker-compose.yaml file
curl -LfO 'https://airflow.org.cn/docs/apache-airflow/stable/docker-compose.yaml'
# Make expected directories and set an expected environment variable
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
# Initialize the database
docker compose up airflow-init
# Start up all services
docker compose up
所有服务启动后,Web UI 将在此处提供:http://localhost:8080
。默认帐户的用户名为 airflow
,密码为 airflow
。
我们还需要创建到 postgres 数据库的 连接。要通过 Web UI 创建一个连接,请从“管理”菜单中选择“连接”,然后单击加号“向连接列表添加新记录”。
按如下所示填写字段。注意连接 ID 值,我们将其作为 postgres_conn_id
kwarg 的参数传递。
连接 ID:tutorial_pg_conn
连接类型:postgres
主机:postgres
架构:airflow
登录:airflow
密码:airflow
端口:5432
测试你的连接,如果测试成功,保存你的连接。
表创建任务¶
我们可以使用 PostgresOperator 来定义在 postgres 数据库中创建表的任务。
我们将创建一个表来促进数据清理步骤 (employees_temp
),另一个表来存储我们的已清理数据 (employees
)。
from airflow.providers.postgres.operators.postgres import PostgresOperator
create_employees_table = PostgresOperator(
task_id="create_employees_table",
postgres_conn_id="tutorial_pg_conn",
sql="""
CREATE TABLE IF NOT EXISTS employees (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
create_employees_temp_table = PostgresOperator(
task_id="create_employees_temp_table",
postgres_conn_id="tutorial_pg_conn",
sql="""
DROP TABLE IF EXISTS employees_temp;
CREATE TABLE employees_temp (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
可选:使用文件中的 SQL¶
如果你想从你的 DAG 中抽象出这些 sql 语句,你可以将语句 sql 文件移动到 dags/
目录中的某个位置,并将 sql file_path(相对于 dags/
)传递给 sql
kwarg。例如,对于 employees
,在 dags/
中创建一个 sql
目录,将 employees
DDL 放入 dags/sql/employees_schema.sql
,并将 PostgresOperator() 修改为
create_employees_table = PostgresOperator(
task_id="create_employees_table",
postgres_conn_id="tutorial_pg_conn",
sql="sql/employees_schema.sql",
)
并对 employees_temp
表重复此操作。
数据检索任务¶
在这里,我们检索数据,将其保存到我们的 Airflow 实例上的一个文件,并将该文件中的数据加载到一个中间表中,在那里我们可以执行数据清理步骤。
import os
import requests
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
@task
def get_data():
# NOTE: configure this as appropriate for your airflow environment
data_path = "/opt/airflow/dags/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)
url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
数据合并任务¶
在此,我们从检索的数据中选择完全唯一的记录,然后检查任何员工序列 编号
是否已在数据库中(如果在,我们使用新数据更新这些记录)。
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
@task
def merge_data():
query = """
INSERT INTO employees
SELECT *
FROM (
SELECT DISTINCT *
FROM employees_temp
) t
ON CONFLICT ("Serial Number") DO UPDATE
SET
"Employee Markme" = excluded."Employee Markme",
"Description" = excluded."Description",
"Leave" = excluded."Leave";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
return 1
完成我们的 DAG¶
我们已开发任务,现在我们需要将它们包装在 DAG 中,这使我们能够定义任务的运行时间和方式,并说明任务对其他任务的任何依赖关系。以下 DAG 配置为
从 2021 年 1 月 1 日开始每天午夜运行一次,
如果错过某天,仅运行一次,并且
60 分钟后超时
从process_employees
DAG 定义的最后一行中,我们看到
[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
merge_data()
任务取决于get_data()
任务,get_data()
取决于create_employees_table
和create_employees_temp_table
任务,并且create_employees_table
和create_employees_temp_table
任务可以独立运行。
将所有部分组合在一起,我们便完成了 DAG。
import datetime
import pendulum
import os
import requests
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
@dag(
dag_id="process_employees",
schedule_interval="0 0 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
def ProcessEmployees():
create_employees_table = PostgresOperator(
task_id="create_employees_table",
postgres_conn_id="tutorial_pg_conn",
sql="""
CREATE TABLE IF NOT EXISTS employees (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
create_employees_temp_table = PostgresOperator(
task_id="create_employees_temp_table",
postgres_conn_id="tutorial_pg_conn",
sql="""
DROP TABLE IF EXISTS employees_temp;
CREATE TABLE employees_temp (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
@task
def get_data():
# NOTE: configure this as appropriate for your airflow environment
data_path = "/opt/airflow/dags/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)
url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
@task
def merge_data():
query = """
INSERT INTO employees
SELECT *
FROM (
SELECT DISTINCT *
FROM employees_temp
) t
ON CONFLICT ("Serial Number") DO UPDATE
SET
"Employee Markme" = excluded."Employee Markme",
"Description" = excluded."Description",
"Leave" = excluded."Leave";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
return 1
[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
dag = ProcessEmployees()
将此代码保存到/dags
文件夹中的 Python 文件(例如dags/process_employees.py
),(在短暂延迟后),process_employees
DAG 将包含在 Web UI 上的可用 DAG 列表中。
你可以取消暂停process_employees
DAG(通过左侧的滑块)并运行它(通过操作下的运行按钮)来触发它。
在process_employees
DAG 的网格视图中,我们看到所有任务在所有已执行的运行中都成功运行。成功!