从 1.10 升级到 2¶
Apache Airflow 2 是一个主要版本,本文档的目的是帮助用户从 Airflow 1.10.x 迁移到 Airflow 2
步骤 1:切换到 Python 3¶
Airflow 1.10 是最后一个支持 Python 2 的版本系列。Airflow 2.0.0 需要 Python 3.6+,并且已使用 Python 版本 3.6、3.7 和 3.8 进行测试。从 Airflow 2.1.2 开始添加了 Python 3.9 的支持。
Airflow 2.3.0 放弃了对 Python 3.6 的支持。它已使用 Python 3.7、3.8、3.9、3.10 进行测试。
如果你的特定任务仍然需要 Python 2,则可以使用 @task.virtualenv
、@task.docker
或 @task.kubernetes
装饰器来实现此目的。
有关 Python 2 和 Python 3 之间重大更改的列表,请参阅 CouchBaseDB 团队的此实用博客。
步骤 2:升级到 1.10.15¶
为了最大限度地减少用户从 Airflow 1.10 升级到 Airflow 2.0 及更高版本的摩擦,我们创建了 Airflow 1.10.15,又名“桥接版本”。这是最终的 1.10 功能版本。Airflow 1.10.15 包括对从 Airflow 2.0 反向移植的各种功能的支持,以便用户在升级到 Airflow 2.0 之前可以轻松测试其 Airflow 环境。
我们强烈建议所有升级到 Airflow 2.0 的用户首先升级到 Airflow 1.10.15 并测试其 Airflow 部署,然后再升级到 Airflow 2.0。Airflow 1.10.x 于 2021 年 6 月 17 日达到生命周期终点。不会发布新的 Airflow 1.x 版本。
1.10.15 中的功能包括
1. Airflow 2.0 的大多数破坏性 DAG 和架构更改已反向移植到 Airflow 1.10.15。这种向后兼容性并不意味着 1.10.15 将以与 Airflow 2.0 相同的方式处理这些 DAG。相反,这意味着大多数与 Airflow 2.0 兼容的 DAG 将在 Airflow 1.10.15 中工作。这种反向移植将为用户提供时间来逐步修改其 DAG,而不会造成任何服务中断。
2. 我们还将更新后的 Airflow 2.0 CLI 命令反向移植到了 Airflow 1.10.15,以便用户可以在升级之前修改其脚本以与 Airflow 2.0 兼容。
3. 对于 KubernetesExecutor 的用户,我们反向移植了 KubernetesExecutor 的 pod_template_file
功能,以及一个将根据你的 airflow.cfg
设置生成 pod_template_file
的脚本。要生成此文件,只需运行以下命令
airflow generate_pod_template -o <output file path>
执行此步骤后,只需在 airflow.cfg
的 kubernetes_executor
部分的 pod_template_file
配置中写入此文件的文件路径
注意
在 airflow 版本 2.4.2 之前,kubernetes_executor
部分称为 kubernetes
。
步骤 3:运行升级检查脚本¶
升级到 Airflow 1.10.15 后,我们建议你安装“升级检查”脚本。这些脚本将读取你的 airflow.cfg
和所有 DAG,并提供升级前所需的所有更改的详细报告。我们正在努力测试此脚本,我们的目标是,任何可以通过这些测试的 Airflow 设置都能够顺利升级到 2.0。
pip install apache-airflow-upgrade-check
安装完成后,请运行升级检查脚本。
airflow upgrade_check
有关此过程的更多详细信息,请访问升级检查脚本。
步骤 4:切换到 Backport Providers¶
现在你已经在带有 Python 3.6+ 环境的 Airflow 1.10.15 中设置完毕,你已准备好开始将你的 DAG 移植到 Airflow 2.0 合规性!
此过渡中最重要的一步也是最容易逐步完成的一步。所有 Airflow 2.0 操作符都使用 backport provider 包与 Airflow 1.10 向后兼容。你可以自行决定,通过 pip 通过 PyPI 安装 provider 并更改导入路径来过渡到使用这些 backport-providers。
例如:虽然你可能过去以这种方式导入 DockerOperator
from airflow.operators.docker_operator import DockerOperator
你现在将运行此命令来安装 provider
pip install apache-airflow-backport-providers-docker
然后使用此路径导入操作符
from airflow.providers.docker.operators.docker import DockerOperator
请注意,backport provider 包只是与 Airflow 2.0 兼容的 provider 包的反向移植。例如
pip install 'apache-airflow[docker]'
会自动安装 apache-airflow-providers-docker
包。但是你可以独立于 Airflow 核心管理/升级/删除 provider 包。
升级到 Apache Airflow 2.0 后,当你安装带有 extras 的 Airflow 时,将自动安装这些 provider 包。当你安装 Airflow 时,即使没有 extras,也会自动安装多个 provider(http、ftp、sqlite、imap)。你可以在Provider 包中阅读有关 provider 的更多信息。
步骤 5:升级 Airflow DAG¶
更改模板中未定义变量的处理方式
在 Airflow 2.0 之前,Jinja 模板允许使用未定义的变量。它们将呈现为空字符串,而不会向用户指示使用了未定义的变量。在此版本中,任何涉及未定义变量的模板呈现都将使任务失败,并在呈现时在 UI 中显示错误。
实例化 DAG 时可以还原此行为。
import jinja2
dag = DAG("simple_dag", template_undefined=jinja2.Undefined)
或者,也可以使用 | default
Jinja 过滤器单独覆盖每个 Jinja 模板变量,如下所示。
{{a | default(1)}}
KubernetesPodOperator 的更改
与 KubernetesExecutor
非常相似,KubernetesPodOperator
将不再采用 Airflow 自定义类,而是期望 pod_template yaml 文件或 kubernetes.client.models
对象。
一个值得注意的例外是,我们将继续支持 airflow.providers.cncf.kubernetes.secret.Secret
类。
以前,用户会像这样导入每个单独的类来构建 pod
from airflow.kubernetes.pod import Port
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume_mount import VolumeMount
volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
volume = Volume(name="test-volume", configs=volume_config)
volume_mount = VolumeMount("test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True)
port = Port("http", 80)
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env],
ports=[port],
volumes=[volume],
volume_mounts=[volume_mount],
name="airflow-test-pod",
task_id="task",
affinity=affinity,
is_delete_operator_pod=True,
hostnetwork=False,
tolerations=tolerations,
configmaps=configmaps,
init_containers=[init_container],
priority_class_name="medium",
)
现在,用户可以使用 kubernetes.client.models
类作为创建所有 k8s 对象的单一入口点。
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.secret import Secret
configmaps = ["test-configmap-1", "test-configmap-2"]
volume = k8s.V1Volume(
name="test-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
port = k8s.V1ContainerPort(name="http", container_port=80)
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env],
ports=[port],
volumes=[volume],
volume_mounts=[volume_mount],
name="airflow-test-pod",
task_id="task",
is_delete_operator_pod=True,
hostnetwork=False,
)
我们决定保留 Secret 类,因为用户似乎非常喜欢它,这简化了将 Kubernetes secret 挂载到工作节点的复杂性。
有关 KubernetesPodOperator API 更改的更详细列表,请阅读附录中题为“KubernetesPodOperator 的已更改参数”的部分
更改 dag_run_conf_overrides_params 的默认值
DagRun 配置字典现在默认会覆盖 params 字典。如果您通过 airflow dags backfill -c
或 airflow dags trigger -c
传递一些键值对,这些键值对将覆盖 params 中现有的键值对。您可以通过在 airflow.cfg
中将 dag_run_conf_overrides_params
设置为 False
来恢复此行为。
DAG 发现安全模式现在不区分大小写
当 DAG_DISCOVERY_SAFE_MODE
激活时,Airflow 现在将以不区分大小写的方式过滤所有包含字符串 airflow
和 dag
的文件。此更改是为了更好地支持新的 @dag
装饰器。
权限变更
作为 Airflow 2.0 的一部分,DAG 级别的权限操作 can_dag_read
和 can_dag_edit
已被弃用。它们将被 can_read
和 can_edit
替换。当角色被授予 DAG 级别的访问权限时,资源名称(或 Flask App-Builder 术语中的“视图菜单”)现在将以 DAG:
为前缀。因此,对 example_dag_id
的 can_dag_read
操作现在表示为对 DAG:example_dag_id
的 can_read
。有一个特殊的视图名为 DAGs
(在 1.10.x 版本中称为 all_dags
),它允许角色访问所有 DAG。默认的 Admin
、Viewer
、User
、Op
角色都可以访问 DAGs
视图。
作为运行 ``airflow db migrate`` 的一部分,现有的权限将为您迁移。
当使用设置的 access_control
变量初始化 DAG 时,任何旧权限名称的使用都将在数据库中自动更新,因此这不会是一个重大更改。将引发 DeprecationWarning。
放弃旧版 UI,转而使用 FAB RBAC UI
警告
重大更改
- 以前我们使用两个版本的 UI
非 RBAC UI
Flask App Builder RBAC UI
这很难维护,因为它意味着我们必须在两个地方实现/更新功能。在此版本中,我们删除了旧的 UI,转而使用 Flask App Builder RBAC UI,从而减少了巨大的维护负担。不再需要在配置中显式设置 RBAC UI,因为它是唯一的默认 UI。
如果您以前使用非 RBAC UI,则必须切换到新的 RBAC-UI 并创建用户才能访问 Airflow 的 Web 服务器。有关创建用户的 CLI 的更多详细信息,请参阅 命令行界面和环境变量参考
请注意,自定义身份验证后端需要重写以针对新的基于 FAB 的 UI。
作为此更改的一部分,[webserver]
部分中的一些配置项已被删除,不再适用,包括 authenticate
、filter_by_owner
、owner_mode
和 rbac
。
在升级到此版本之前,我们建议激活新的 FAB RBAC UI。为此,您应该在 airflow.cfg
文件中将 [webserver]
中的 rbac
选项设置为 True
[webserver]
rbac = True
为了登录界面,您需要创建一个管理员帐户。
假设您已经安装了 Airflow 1.10.15,您可以使用 Airflow 2.0 CLI 命令语法 airflow users create
创建用户。您无需更改配置文件,因为 FAB RBAC UI 是唯一支持的 UI。
airflow users create \
--role Admin \
--username admin \
--firstname FIRST_NAME \
--lastname LAST_NAME \
--email [email protected]
OAuth 中的重大更改
注意
当运行多个 Airflow Web 服务器副本时,它们需要共享相同的 *secret_key* 才能访问同一用户会话。通过任何配置机制注入此密钥。1.10.15 桥接版本修改了此功能,使用随机生成的密钥而不是不安全的默认密钥,并且可能会破坏依赖于默认密钥的现有部署。Web 服务器密钥还用于授权对 Celery 工作人员的日志检索请求。使用密钥生成的令牌有一个短暂的过期时间 - 请确保您运行 Airflow 组件的所有计算机上的时间同步(例如使用 ntpd),否则您在访问日志时可能会收到“禁止”错误。
flask-oauthlib
已被 authlib
替换,因为 flask-oauthlib
已被弃用,转而使用 authlib
。已更改的新旧提供程序配置密钥如下
旧密钥 |
新密钥 |
---|---|
|
|
|
|
|
|
|
|
有关更多信息,请访问 https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
Pendulum 支持中的重大更改
Airflow 已从 Pendulum 1.x 升级到 Pendulum 2.x。由于 Pendulum 2.x 中某些方法及其定义已更改或已被删除,因此带来了一些重大更改。
例如,以下代码段现在将引发错误
execution_date.format("YYYY-MM-DD HH:mm:ss", formatter="alternative")
因为 Pendulum 2.x 不支持 formatter
选项,默认使用 alternative
。
有关更多信息,请访问 https://pendulum.eustace.io/blog/pendulum-2.0.0-is-out.html
步骤 6:升级配置设置¶
Airflow 2.0 对配置数据的期望更加严格,并且需要在更多情况下显式指定配置值,而不是默认为通用值。
其中一些在升级检查指南中进行了详细介绍,但一个重要的变化领域是 Kubernetes 执行器。下面为 Kubernetes 执行器的用户进行了说明。
升级 KubernetesExecutor 设置
KubernetesExecutor 将不再从 airflow.cfg 中读取基本 Pod 配置。
在 Airflow 2.0 中,KubernetesExecutor 将需要一个以 yaml 编写的基本 Pod 模板。此文件可以存在于主机上的任何位置,并将使用 airflow.cfg
文件中的 pod_template_file
配置链接。您可以通过运行以下命令创建 pod_template_file
:airflow generate_pod_template
airflow.cfg
仍然接受 worker_container_repository
、worker_container_tag
和默认命名空间的值。
以下 airflow.cfg
值将被弃用
worker_container_image_pull_policy
airflow_configmap
airflow_local_settings_configmap
dags_in_image
dags_volume_subpath
dags_volume_mount_point
dags_volume_claim
logs_volume_subpath
logs_volume_claim
dags_volume_host
logs_volume_host
env_from_configmap_ref
env_from_secret_ref
git_repo
git_branch
git_sync_depth
git_subpath
git_sync_rev
git_user
git_password
git_sync_root
git_sync_dest
git_dags_folder_mount_point
git_ssh_key_secret_name
git_ssh_known_hosts_configmap_name
git_sync_credentials_secret
git_sync_container_repository
git_sync_container_tag
git_sync_init_container_name
git_sync_run_as_user
worker_service_account_name
image_pull_secrets
gcp_service_account_keys
affinity
tolerations
run_as_user
fs_group
[kubernetes_node_selectors]
[kubernetes_annotations]
[kubernetes_environment_variables]
[kubernetes_secrets]
[kubernetes_labels]
现在,当启动任务时,``executor_config`` 将需要一个 ``kubernetes.client.models.V1Pod`` 类
在 Airflow 1.10.x 中,用户可以通过将字典传递给 executor_config
变量,在运行时修改任务 Pod。现在,用户将通过 kubernetes.client.models.V1Pod
完全访问 Kubernetes API。
在弃用版本中,用户将使用以下字典挂载卷
second_task = PythonOperator(
task_id="four_task",
python_callable=test_volume_mount,
executor_config={
"KubernetesExecutor": {
"volumes": [
{
"name": "example-kubernetes-test-volume",
"hostPath": {"path": "/tmp/"},
},
],
"volume_mounts": [
{
"mountPath": "/foo/",
"name": "example-kubernetes-test-volume",
},
],
}
},
)
在新模型中,用户可以使用 pod_override
键下的以下代码来完成相同的事情
from kubernetes.client import models as k8s
@task(
task_id="four_task",
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/foo/",
name="example-kubernetes-test-volume",
)
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
)
},
)
def test_volume_mount():
pass
second_task = test_volume_mount()
对于 Airflow 2.0,传统的 executor_config
将继续运行,但会发出弃用警告,并且将在未来的版本中删除。
步骤 7:升级到 Airflow 2¶
在运行如上所述的升级检查、安装向后移植的提供程序、修改 DAG 以使其兼容并更新配置设置后,您应该准备好升级到 Airflow 2.0。
最后一次运行升级检查始终是一个好主意,以确保您没有遗漏任何内容。在此阶段,检测到的问题应为零或最少,您计划在升级 Airflow 版本后修复这些问题。
此时,只需遵循标准的 Airflow 版本升级过程
确保备份 Airflow 元数据库
暂停所有 DAG,并确保没有正在运行的内容
暂停 DAG 的原因是确保在后续步骤中进行的数据库升级期间,不会有任何内容正在写入数据库。
为了更加小心,最好在暂停 DAG 后进行数据库备份。
安装/升级 Airflow 版本到您选择的 2.0 版本
确保安装正确的提供程序
这可以通过使用 Airflow 安装的一部分的“extras”选项来完成,也可以通过单独安装提供程序来完成。
请注意,如果您使用 pip 安装,则可能必须先卸载向后移植的提供程序,然后再安装新的提供程序。如果您使用具有一组指定要求的 Airflow Docker 镜像进行安装,则不适用这种情况,其中更改会自动获得一组新的模块。
您可以在 提供程序软件包 中阅读有关提供程序的更多信息。
使用
airflow db migrate
迁移 Airflow 元数据库。上面的命令可能不太熟悉,因为它使用 Airflow 2.0 CLI 语法显示。
数据库升级可能会根据需要修改数据库模式,并将现有数据映射到符合更新的数据库模式。
注意
数据库升级可能需要一段时间,具体取决于数据库中的 DAG 数量以及数据库中存储的任务历史、xcom 变量等的历史记录量。在我们的测试中,我们发现从 Airflow 1.10.15 到 Airflow 2.0 的 Airflow 数据库升级在 PostgreSQL 上一个包含大约 35,000 个任务实例和 500 个 DAG 的 Airflow 数据库上花费了两到三分钟。为了更快地进行数据库升级并获得更好的整体性能,建议您定期存档不再有价值的旧历史元素。
重新启动 Airflow 调度器、Web 服务器和工作人员
附录¶
KubernetesPodOperator 的已更改参数¶
Port 已从 list[Port] 迁移到 list[V1ContainerPort]
之前
from airflow.kubernetes.pod import Port
port = Port("http", 80)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
ports=[port],
task_id="task",
)
之后
from kubernetes.client import models as k8s
port = k8s.V1ContainerPort(name="http", container_port=80)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
ports=[port],
task_id="task",
)
Volume_mounts 已从 list[VolumeMount] 迁移到 list[V1VolumeMount]
之前
from airflow.kubernetes.volume_mount import VolumeMount
volume_mount = VolumeMount("test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volume_mounts=[volume_mount],
task_id="task",
)
之后
from kubernetes.client import models as k8s
volume_mount = k8s.V1VolumeMount(
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volume_mounts=[volume_mount],
task_id="task",
)
Volume 已从 list[Volume] 迁移到 list[V1Volume]
之前
from airflow.kubernetes.volume import Volume
volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
volume = Volume(name="test-volume", configs=volume_config)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volumes=[volume],
task_id="task",
)
之后
from kubernetes.client import models as k8s
volume = k8s.V1Volume(
name="test-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volumes=[volume],
task_id="task",
)
env_vars 已从 dict 迁移到 list[V1EnvVar]
之前
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars={"ENV1": "val1", "ENV2": "val2"},
task_id="task",
)
之后
from kubernetes.client import models as k8s
env_vars = [
k8s.V1EnvVar(name="ENV1", value="val1"),
k8s.V1EnvVar(name="ENV2", value="val2"),
]
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars=env_vars,
task_id="task",
)
PodRuntimeInfoEnv 已被移除
PodRuntimeInfoEnv 现在可以作为 V1EnvVarSource
添加到 env_vars
变量中
之前
from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")],
task_id="task",
)
之后
from kubernetes.client import models as k8s
env_vars = [
k8s.V1EnvVar(
name="ENV3",
value_from=k8s.V1EnvVarSource(field_ref=k8s.V1ObjectFieldSelector(field_path="status.podIP")),
)
]
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars=env_vars,
task_id="task",
)
configmaps 已被移除
Configmaps 现在可以作为 V1EnvVarSource
添加到 env_from
变量中
之前
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
configmaps=["test-configmap"],
task_id="task",
)
之后
from kubernetes.client import models as k8s
configmap = "test-configmap"
env_from = [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap))]
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_from=env_from,
task_id="task",
)
Resources 已从 Dict 迁移到 V1ResourceRequirements
之前
resources = {
"limit_cpu": 0.25,
"limit_memory": "64Mi",
"limit_ephemeral_storage": "2Gi",
"request_cpu": "250m",
"request_memory": "64Mi",
"request_ephemeral_storage": "1Gi",
}
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
resources=resources,
)
之后
from kubernetes.client import models as k8s
resources = k8s.V1ResourceRequirements(
requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
limits={
"memory": "64Mi",
"cpu": 0.25,
"nvidia.com/gpu": None,
"ephemeral-storage": "2Gi",
},
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test-" + str(random.randint(0, 1000000)),
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
resources=resources,
)
image_pull_secrets 已从 String 迁移到 list[k8s.V1LocalObjectReference]
之前
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
name="test",
task_id="task",
image_pull_secrets="fake-secret",
cluster_context="default",
)
之后
quay_k8s = KubernetesPodOperator(
namespace="default",
image="quay.io/apache/bash",
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
cmds=["bash", "-cx"],
name="airflow-private-image-pod",
task_id="task-two",
)
从实验性 API 迁移到稳定 API v1 的迁移指南¶
在 Airflow 2.0 中,我们添加了新的 REST API。实验性 API 仍然有效,但未来可能会放弃支持。
然而,实验性 API 不需要身份验证,因此默认情况下禁用。如果您想使用它,则需要显式启用实验性 API。如果您的应用程序仍在使用实验性 API,您应该**认真**考虑迁移到稳定 API。
稳定 API 公开了许多可通过 Web 服务器访问的端点。以下是两个端点之间的差异,这将有助于您从实验性 REST API 迁移到稳定 REST API。
基本端点
稳定 API v1 的基本端点是 /api/v1/
。您必须将实验性基本端点从 /api/experimental/
更改为 /api/v1/
。下表显示了差异
目的 |
实验性 REST API 端点 |
稳定 REST API 端点 |
---|---|---|
创建 DAGRuns (POST) |
|
|
列出 DAGRuns (GET) |
|
|
检查健康状态 (GET) |
|
|
任务信息 (GET) |
|
|
TaskInstance 公共变量 (GET) |
|
|
暂停 DAG (PATCH) |
|
|
暂停的 DAG 的信息 (GET) |
|
|
最新的 DAG 运行 (GET) |
|
|
获取所有池 (GET) |
|
|
创建池 (POST) |
|
|
删除池 (DELETE) |
|
|
DAG 血缘 (GET) |
|
|
此端点 /api/v1/dags/{dag_id}/dagRuns
还允许您使用查询字符串中的 start_date
、end_date
、execution_date
等参数来过滤 dag_runs。因此,以前由此端点执行的操作
/api/experimental/dags/<string:dag_id>/dag_runs/<string:execution_date>
现在可以使用查询字符串中的筛选参数来处理。可以使用此端点(/api/v1/dags/{dag_id}/dagRuns
)的查询字符串中的筛选器来完成获取有关最新运行的信息。请查看稳定 API 参考文档以获取更多信息
DAG 回调的异常处理变更¶
DAG 回调中的异常曾经会导致 Airflow 调度器崩溃。作为我们努力使调度器更具性能和可靠性的一部分,我们已将此行为更改为记录异常。最重要的是,添加了一个新的 dag.callback_exceptions 计数器指标,以帮助更好地监视回调异常。
迁移到 TaskFlow API¶
Airflow 2.0 引入了 TaskFlow API,以简化 Python 可调用任务的声明。鼓励用户使用其 TaskFlow 修饰符替代项来替换经典运算符。有关详细信息,请参阅使用 TaskFlow。
经典运算符 |
TaskFlow 修饰符 |
---|---|
|
|
|
|
|
|
|
|
|
|
Airflow CLI 2.0 中的变更¶
Airflow CLI 已进行组织,以便将相关命令分组为子命令,这意味着如果您在脚本中使用这些命令,则必须对其进行更改。
本节介绍已进行的更改以及更新脚本所需的操作。从命令行操作用户的功能已更改。airflow create_user
、airflow delete_user
和 airflow list_users
已分组到一个命令 airflow users
中,带有可选标志 create
、list
和 delete
。airflow list_dags
命令现在是 airflow dags list
,airflow pause
是 airflow dags pause
等。
在 Airflow 1.10 和 2.0 中,都有一个 airflow config
命令,但是行为有所不同。在 Airflow 1.10 中,它会打印所有配置选项,而在 Airflow 2.0 中,它是一个命令组。airflow config
现在是 airflow config list
。您可以通过运行命令 airflow config --help
来检查其他选项
有关更新的 CLI 命令的完整列表,请参阅 https://airflow.org.cn/cli.html。
您可以通过运行 airflow --help
来了解有关这些命令的信息。例如,要获得有关 celery
组命令的帮助,您必须运行帮助命令:airflow celery --help
。
旧命令 |
新命令 |
组 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
“users”组的用法示例
要创建新用户
airflow users create --username jondoe --lastname doe --firstname jon --email [email protected] --role Viewer --password test
列出用户
airflow users list
删除用户
airflow users delete --username jondoe
将用户添加到角色
airflow users add-role --username jondoe --role Public
从角色中移除用户
airflow users remove-role --username jondoe --role Public
在 CLI 中,短选项样式更改精确使用单个字符
对于 Airflow 短选项,精确使用一个字符。新命令如下表所示
旧命令 |
新命令 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
对于 Airflow 长选项,使用 kebab-case 而不是 snake_case
旧选项 |
新选项 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
从 CLI 中移除 serve_logs 命令
serve_logs
命令已被删除。此命令应仅由内部应用程序机制运行,并且无需从 CLI 接口访问。
dag_state CLI 命令
如果 DAGRun 是通过传入的 conf 键/值触发的,它们也会在 dag_state CLI 响应中打印,例如 running, {“name”: “bob”},而在之前的版本中,它只打印状态:例如 running
弃用 backfill 命令上的 ignore_first_depends_on_past 并将其默认设置为 True
当使用 depends_on_past
DAG 执行回填时,用户需要传递 --ignore-first-depends-on-past
。我们应该将其默认设置为 true
以避免混淆
Airflow 插件的更改¶
如果您正在使用 Airflow 插件并传递了在非 RBAC UI(基于 flask-admin
的 UI)中使用的 admin_views
和 menu_links
,请将其更新为使用 flask_appbuilder_views
和 flask_appbuilder_menu_links
。
旧:
from airflow.plugins_manager import AirflowPlugin
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink
class TestView(BaseView):
@expose("/")
def test(self):
# in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
return self.render("test_plugin/test.html", content="Hello galaxy!")
v = TestView(category="Test Plugin", name="Test View")
ml = MenuLink(category="Test Plugin", name="Test Menu Link", url="https://airflow.org.cn/")
class AirflowTestPlugin(AirflowPlugin):
admin_views = [v]
menu_links = [ml]
将其更改为:
from airflow.plugins_manager import AirflowPlugin
from flask_appbuilder import expose, BaseView as AppBuilderBaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
default_view = "test"
@expose("/")
def test(self):
return self.render_template("test_plugin/test.html", content="Hello galaxy!")
v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
"name": "Test View",
"category": "Test Plugin",
"view": v_appbuilder_view,
}
# Creating a flask appbuilder Menu Item
appbuilder_mitem = {
"name": "Google",
"category": "Search",
"category_icon": "fa-th",
"href": "https://www.google.com",
}
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
appbuilder_views = [v_appbuilder_package]
appbuilder_menu_items = [appbuilder_mitem]
extras 名称的更改¶
all
extra 已被缩减为仅包含面向用户的依赖项。这意味着此 extra 不包含开发依赖项。如果您正在使用它并依赖于开发包,则应使用 devel_all
。
对 Airflow 1.10.x 版本的支持¶
Airflow 1.10.x 于 2021 年 6 月 17 日到达生命周期结束。不会发布新的 Airflow 1.x 版本。
Backport 提供程序的支持于 2021 年 3 月 17 日结束。不会发布新的 backport 提供程序版本。
我们计划对版本控制和发布过程采用严格的语义版本控制方法。这意味着我们不打算在 2.* 版本中进行任何向后不兼容的更改。任何重大更改,包括删除 Airflow 2.0 中弃用的功能,都将作为 Airflow 3.0 版本的一部分发生。