airflow.models.dag

模块内容

DAG

DAG(有向无环图)是具有方向依赖性的任务的集合。

DagTag

每个 DAG 的标签名称,允许在 DAG 视图中快速筛选。

DagOwnerAttributes

定义不同所有者属性的表。

DagModel

包含 DAG 属性的表。

DagContext

当 DAG 用作 ContextManager 时,DAG 上下文用于保持当前的 DAG。

函数

create_timetable(interval, timezone)

schedule_interval 参数创建 Timetable 实例。

get_last_dagrun(dag_id, session[, ...])

返回 DAG 的最后一个 DAG 运行,如果没有则返回 None。

get_dataset_triggered_next_run_info(dag_ids, *, session)

获取 dag_ids 列表的下一个运行信息。

dag([dag_id, description, schedule, ...])

Python DAG 装饰器,将函数包装到 Airflow DAG 中。

属性

log

DEFAULT_VIEW_PRESETS

ORIENTATION_PRESETS

TAG_MAX_LEN

DagStateChangeCallback

ScheduleInterval

ScheduleIntervalArg

ScheduleArg

SLAMissCallback

DEFAULT_SCHEDULE_INTERVAL

DAG_ARGS_EXPECTED_TYPES

airflow.models.dag.log[source]
airflow.models.dag.DEFAULT_VIEW_PRESETS = ['grid', 'graph', 'duration', 'gantt', 'landing_times'][source]
airflow.models.dag.ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT'][source]
airflow.models.dag.TAG_MAX_LEN = 100[source]
airflow.models.dag.DagStateChangeCallback[source]
airflow.models.dag.ScheduleInterval[source]
airflow.models.dag.ScheduleIntervalArg[source]
airflow.models.dag.ScheduleArg[source]
airflow.models.dag.SLAMissCallback[source]
airflow.models.dag.DEFAULT_SCHEDULE_INTERVAL[source]
exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[source]

基类: airflow.exceptions.AirflowException

当模型错误地填充数据间隔字段时引发的异常。

数据间隔字段应都为 None (对于在 AIP-39 之前计划的运行) 或都为 datetime (对于在 AIP-39 实现后计划的运行)。如果只有一个字段为 None,则会引发此异常。

__str__()[source]

返回 str(self)。

airflow.models.dag.create_timetable(interval, timezone)[source]

schedule_interval 参数创建 Timetable 实例。

airflow.models.dag.get_last_dagrun(dag_id, session, include_externally_triggered=False)[source]

返回 DAG 的最后一个 DAG 运行,如果没有则返回 None。

最后的 DAG 运行可以是任何类型的运行,例如计划运行或回填运行。被覆盖的 DagRuns 将被忽略。

airflow.models.dag.get_dataset_triggered_next_run_info(dag_ids, *, session)[source]

获取 dag_ids 列表的下一个运行信息。

给定 dag_ids 列表,获取一个字符串,表示任何数据集触发的 DAG 距离下次运行有多近,例如 “1 of 2 datasets updated”。

airflow.models.dag.DAG_ARGS_EXPECTED_TYPES[source]
class airflow.models.dag.DAG(dag_id, description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=airflow_conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs=airflow_conf.getint('core', 'max_active_runs_per_dag'), max_consecutive_failed_dag_runs=airflow_conf.getint('core', 'max_consecutive_failed_dag_runs_per_dag'), dagrun_timeout=None, sla_miss_callback=None, default_view=airflow_conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation=airflow_conf.get_mandatory_value('webserver', 'dag_orientation'), catchup=airflow_conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True, fail_stop=False, dag_display_name=None)[source]

基类: airflow.utils.log.logging_mixin.LoggingMixin

DAG(有向无环图)是具有方向依赖性的任务的集合。

DAG 也有一个计划、一个开始日期和一个结束日期(可选)。对于每个计划(例如每天或每小时),DAG 需要在满足其依赖关系时运行每个单独的任务。某些任务具有依赖于其自身过去执行的属性,这意味着它们必须等到之前的计划(和上游任务)完成后才能运行。

DAG 本质上充当任务的命名空间。一个 task_id 只能添加到 DAG 一次。

请注意,如果您计划使用时区,则所有提供的日期都应为 pendulum 日期。请参阅时区感知 DAG

2.4 版本新增: schedule 参数用于指定基于时间的调度逻辑 (timetable),或数据集驱动的触发器。

自 2.4 版本起已弃用: schedule_intervaltimetable 参数。它们的功能已合并到新的 schedule 参数中。

参数:
  • dag_id (str) – DAG 的 ID;必须仅包含字母数字字符、破折号、点号和下划线(所有 ASCII 字符)

  • description (str | None) – DAG 的描述,例如在 Web 服务器上显示

  • schedule (ScheduleArg) – 定义 DAG 运行的调度规则。可以接受 cron 字符串、timedelta 对象、Timetable 或 Dataset 对象列表。如果未提供此参数,DAG 将设置为默认调度 timedelta(days=1)。另请参阅 使用时间表自定义 DAG 调度

  • start_date (datetime.datetime | None) – 调度器将尝试回填的时间戳

  • end_date (datetime.datetime | None) – 超过此日期,您的 DAG 将不会运行,将其保留为 None 以进行开放式调度

  • template_searchpath (str | Iterable[str] | None) – 此文件夹列表(非相对路径)定义 Jinja 将在哪里查找您的模板。顺序很重要。请注意,Jinja/Airflow 默认包含您的 DAG 文件的路径

  • template_undefined (type[jinja2.StrictUndefined]) – 模板未定义类型。

  • user_defined_macros (dict | None) – 一个宏字典,将在您的 Jinja 模板中公开。例如,将 dict(foo='bar') 传递给此参数允许您在与此 DAG 相关的所有 Jinja 模板中使用 {{ foo }}。请注意,您可以在此处传递任何类型的对象。

  • user_defined_filters (dict | None) – 一个过滤器字典,将在您的 Jinja 模板中公开。例如,将 dict(hello=lambda name: 'Hello %s' % name) 传递给此参数允许您在与此 DAG 相关的所有 Jinja 模板中使用 {{ 'world' | hello }}

  • default_args (dict | None) – 一个默认参数字典,在初始化运算符时用作构造函数关键字参数。请注意,运算符具有相同的钩子,并且优先于此处定义的钩子,这意味着如果您的字典包含 ‘depends_on_past’: True,而运算符的 default_args 调用中包含 ‘depends_on_past’: False,则实际值将为 False

  • params (collections.abc.MutableMapping | None) – 一个 DAG 级别参数字典,可以在模板中访问,命名空间在 params 下。这些参数可以在任务级别被覆盖。

  • max_active_tasks (int) – 允许并发运行的任务实例数量

  • max_active_runs (int) – 最大活动 DAG 运行次数,超出此数量的正在运行状态的 DAG 运行,调度程序将不会创建新的活动 DAG 运行

  • max_consecutive_failed_dag_runs (int) – (实验性)最大连续失败的 DAG 运行次数,超过此次数,调度程序将禁用 DAG

  • dagrun_timeout (datetime.timedelta | None) – 指定 DagRun 在超时/失败之前应该运行多长时间,以便可以创建新的 DagRun。

  • sla_miss_callback (None | SLAMissCallback | list[SLAMissCallback]) – 指定在报告 SLA 超时时调用的函数或函数列表。有关传递给回调的函数签名和参数的更多信息,请参阅 sla_miss_callback

  • default_view (str) – 指定 DAG 默认视图(网格、图形、持续时间、甘特图、着陆时间),默认为网格

  • orientation (str) – 指定图形视图中的 DAG 方向(LR、TB、RL、BT),默认为 LR

  • catchup (bool) – 执行调度程序追赶(还是只运行最新的)?默认为 True

  • on_failure_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – 当此 dag 的 DagRun 失败时要调用的函数或函数列表。上下文字典作为单个参数传递给此函数。

  • on_success_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – 与 on_failure_callback 非常相似,不同之处在于它在 dag 成功时执行。

  • access_control (dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None) – 指定可选的 DAG 级别操作,例如,“{‘role1’: {‘can_read’}, ‘role2’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”;或者如果存在 DAGs Run 资源,则可以指定资源名称,例如,“{‘role1’: {‘DAG Runs’: {‘can_create’}}, ‘role2’: {‘DAGs’: {‘can_read’, ‘can_edit’, ‘can_delete’}}}”

  • is_paused_upon_creation (bool | None) – 指定首次创建 DAG 时是否暂停。如果 DAG 已存在,则此标志将被忽略。如果未指定此可选参数,则将使用全局配置设置。

  • jinja_environment_kwargs (dict | None) –

    要传递给 Jinja Environment 以进行模板渲染的其他配置选项

    示例:为了避免 Jinja 从模板字符串中删除尾随换行符

    DAG(
        dag_id="my-dag",
        jinja_environment_kwargs={
            "keep_trailing_newline": True,
            # some other jinja2 Environment options here
        },
    )
    

    请参阅Jinja 环境文档

  • render_template_as_native_obj (bool) – 如果为 True,则使用 Jinja NativeEnvironment 将模板渲染为本机 Python 类型。如果为 False,则使用 Jinja Environment 将模板渲染为字符串值。

  • tags (list[str] | None) – 用于在 UI 中帮助过滤 DAG 的标签列表。

  • owner_links (dict[str, str] | None) – 所有者及其链接的字典,这些链接将在 DAG 视图 UI 上可点击。可以用作 HTTP 链接(例如指向您的 Slack 通道的链接)或 mailto 链接。例如:{“dag_owner”: “https://airflow.org.cn/”}

  • auto_register (bool) – 当此 DAG 在 with 代码块中使用时自动注册

  • fail_stop (bool) – 当 DAG 中的任务失败时,使当前正在运行的任务失败。警告:失败停止 DAG 只能具有默认触发规则(“all_success”)的任务。如果失败停止 DAG 中的任何任务具有非默认触发规则,则会抛出异常。

  • dag_display_name (str | None) – DAG 的显示名称,它出现在 UI 上。

property dag_id: str[源代码]
property is_subdag: bool[源代码]
property concurrency: int[源代码]
property max_active_tasks: int[源代码]
属性 access_control[源代码]
属性 dag_display_name: str[源代码]
属性 description: str | None[源代码]
属性 default_view: str[源代码]
属性 pickle_id: int | None[源代码]
属性 tasks: list[airflow.models.operator.Operator][源代码]
属性 task_ids: list[str][源代码]
属性 teardowns: list[airflow.models.operator.Operator][源代码]
属性 tasks_upstream_of_teardowns: list[airflow.models.operator.Operator][源代码]
属性 task_group: airflow.utils.task_group.TaskGroup[源代码]
属性 relative_fileloc: pathlib.Path[源代码]

可导入的 dag“文件”相对于配置的 DAG 文件夹的文件位置。

属性 folder: str[源代码]

DAG 对象实例化的文件夹位置。

属性 owner: str[源代码]

返回 DAG 任务中找到的所有所有者的列表。

返回

DAG 任务中所有者的逗号分隔列表

返回类型

str

属性 allow_future_exec_dates: bool[源代码]
属性 concurrency_reached[源代码]

使用airflow.models.DAG.get_concurrency_reached,此属性已弃用。

属性 is_paused[源代码]

使用airflow.models.DAG.get_is_paused,此属性已弃用。

属性 normalized_schedule_interval: ScheduleInterval[源代码]
属性 latest_execution_date[源代码]

使用airflow.models.DAG.get_latest_execution_date,此属性已弃用。

属性 subdags[源代码]

返回与此 DAG 关联的子 DAG 对象列表。

属性 roots: list[airflow.models.operator.Operator][源代码]

返回没有父节点的节点。这些是首先执行的节点,称为根节点。

属性 leaves: list[airflow.models.operator.Operator][源代码]

返回没有子节点的节点。这些是最后执行的节点,称为叶节点。

属性 task: airflow.decorators.TaskDecoratorCollection[源代码]
fileloc: str[源代码]

需要导入以加载此 DAG 或子 DAG 的文件路径。

如果此 DAG 是从 ZIP 文件或其他 DAG 分发格式加载的,则这可能不是磁盘上的实际文件。

parent_dag: DAG | None[源代码]
get_doc_md(doc_md)[源代码]
validate()[source]

验证 DAG 是否具有一致的设置。

此方法在 DAG 被装包之前由 DAG 包调用。

validate_executor_field()[source]
__repr__()[source]

返回 repr(self)。

__eq__(other)[source]

返回 self==value。

__ne__(other)[source]

返回 self!=value。

__lt__(other)[source]

返回 self<value。

__hash__()[source]

返回 hash(self)。

__enter__()[source]
__exit__(_type, _value, _tb)[source]
date_range(start_date, num=None, end_date=None)[source]
is_fixed_time_schedule()[source]

判断计划是否具有固定时间(例如,每天凌晨 3 点)。

检测是通过“窥视”接下来的两个 cron 触发时间来完成的;如果这两个时间具有相同的分钟和小时值,则该计划是固定的,我们*不需要*执行 DST 修复。

这假设 DST 发生在整分钟更改时(例如 12:59 -> 12:00)。

不要试图理解这实际上意味着什么。这是一个不应在任何地方使用的旧逻辑。

following_schedule(dttm)[source]

计算此 DAG 在 UTC 中的后续计划。

参数:

dttm – utc 时间

返回

utc 时间

previous_schedule(dttm)[source]
next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]

获取此 DAG 在 date_last_automated_dagrun 之后的下一个 DagRun 的相关信息。

这会根据 DAG 的时间表、start_date、end_date 等计算下一个 DagRun 应运行的时间间隔(其执行日期)以及何时可以安排它。这不会检查最大活动运行次数或任何其他“max_active_tasks”类型的限制,而仅基于此 DAG 及其任务的各种日期和间隔字段执行计算。

参数:
  • last_automated_dagrun (None | datetime.datetime | airflow.timetables.base.DataInterval) – 此 DAG 的现有“自动”DagRun(已安排或回填,但不是手动)的 max(execution_date)

  • restricted (bool) – 如果设置为 False (默认值为 True),则忽略 DAG 或任务上指定的 start_dateend_datecatchup

返回

下一个 dagrun 的 DagRunInfo,如果不会安排 dagrun 则为 None。

返回类型

airflow.timetables.base.DagRunInfo | None

next_dagrun_after_date(date_last_automated_dagrun)[source]
iter_dagrun_infos_between(earliest, latest, *, align=True)[source]

在给定间隔之间使用此 DAG 的时间表生成 DagRunInfo。

如果 DagRunInfo 实例的 logical_date 不早于 earliest,也不晚于 latest,则生成这些实例。实例按其 logical_date 从最早到最晚排序。

如果 alignFalse,则第一次运行将立即在 earliest 上发生,即使它没有落在逻辑时间表上。默认值为 True,但子 DAG 将忽略此值,并且为了向后兼容,始终表现得好像已将其设置为 False

示例:一个 DAG 计划在每天午夜运行 (0 0 * * *)。如果 earliest2021-06-03 23:00:00,则如果 align=False,则第一个 DagRunInfo 将为 2021-06-03 23:00:00;如果 align=True,则为 2021-06-04 00:00:00

get_run_dates(start_date, end_date=None)[source]

使用此 DAG 的计划间隔返回接收为参数的间隔之间的日期列表。

返回的日期可用于执行日期。

参数:
  • start_date – 间隔的开始日期。

  • end_date – 间隔的结束日期。默认为 timezone.utcnow()

返回

间隔内遵循 DAG 计划的日期列表。

返回类型

list

normalize_schedule(dttm)[source]
get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)[source]
has_dag_runs(session=NEW_SESSION, include_externally_triggered=True)[source]
param(name, default=NOTSET)[source]

为当前 DAG 返回 DagParam 对象。

参数:
  • name (str) – DAG 参数名称。

  • default (Any) – DAG 参数的后备值。

返回

指定名称和当前 DAG 的 DagParam 实例。

返回类型

airflow.models.param.DagParam

get_concurrency_reached(session=NEW_SESSION)[source]

返回一个布尔值,指示是否已达到此 DAG 的 max_active_tasks 限制。

get_is_active(session=NEW_SESSION)[source]

返回一个布尔值,指示此 DAG 是否处于活动状态。

get_is_paused(session=NEW_SESSION)[source]

返回一个布尔值,指示此 DAG 是否已暂停。

static fetch_callback(dag, dag_run_id, success=True, reason=None, *, session=NEW_SESSION)[source]

根据成功的值获取适当的回调。

此方法获取此 DagRun 中单个 TaskInstance 的上下文,并将其与回调列表一起返回。

参数:
  • dag (DAG) – DAG 对象

  • dag_run_id (str) – DAG 运行 ID

  • success (bool) – 标志,用于指定是应调用失败回调还是成功回调

  • reason (str | None) – 完成原因

  • session (sqlalchemy.orm.session.Session) – 数据库会话

handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]

根据需要触发 on_failure_callback 或 on_success_callback。

此方法获取此 DagRun 中单个 TaskInstance 的上下文,并将其与 “reason” 一起传递给可调用对象,主要用于区分 DagRun 失败。

参数:
  • dagrun (airflow.models.dagrun.DagRun) – DagRun 对象

  • success – 标志,用于指定是应调用失败回调还是成功回调

  • reason – 完成原因

  • session – 数据库会话

classmethod execute_callback(callbacks, context, dag_id)[source]

使用给定的上下文触发回调。

参数:
  • callbacks (list[Callable] | None) – 要调用的回调列表

  • context (airflow.models.taskinstance.Context | None) – 要传递给所有回调的上下文

  • dag_id (str) – 要查找的 DAG 的 dag_id。

get_active_runs()[source]

返回当前正在运行的 DAG 运行执行日期的列表。

返回

执行日期列表

get_num_active_runs(external_trigger=None, only_running=True, session=NEW_SESSION)[source]

返回活动“正在运行”的 DAG 运行数。

参数:
  • external_trigger – 对于外部触发的活动 DAG 运行,为 True

  • session

返回

活动 DAG 运行的数字大于 0

static fetch_dagrun(dag_id, execution_date=None, run_id=None, session=NEW_SESSION)[source]

如果存在,则返回给定执行日期或 run_id 的 DAG 运行,否则返回 None。

参数:
  • dag_id (str) – 要查找的 DAG 的 dag_id。

  • execution_date (datetime.datetime | None) – 要查找的 DagRun 的执行日期。

  • run_id (str | None) – 要查找的 DagRun 的 run_id。

  • session (sqlalchemy.orm.session.Session) –

返回

如果找到 DagRun,则返回 DagRun,否则返回 None。

返回类型

airflow.models.dagrun.DagRun | airflow.serialization.pydantic.dag_run.DagRunPydantic

get_dagrun(execution_date=None, run_id=None, session=NEW_SESSION)[source]
get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]

返回 start_date(包含)和 end_date(包含)之间的 dag 运行列表。

参数:
  • start_date – 要查找的 DagRun 的起始执行日期。

  • end_date – 要查找的 DagRun 的结束执行日期。

  • session

返回

找到的 DagRun 列表。

get_latest_execution_date(session=NEW_SESSION)[source]

返回至少存在一个 dag 运行的最新日期。

resolve_template_files()[source]
get_template_env(*, force_sandboxed=False)[source]

构建 Jinja2 环境。

set_dependency(upstream_task_id, downstream_task_id)[source]

设置已使用 add_task() 添加到 DAG 的两个任务之间的依赖关系。

get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]

获取 base_date 之前(包括)的 num 个任务实例。

返回的列表可能包含与任何 DagRunType 对应的正好 num 个任务实例。如果 base_date 之前调度的 DAG 运行次数少于 num,则它可能更少。

get_task_instances(start_date=None, end_date=None, state=None, session=NEW_SESSION)[source]
set_task_instance_state(*, task_id, map_indexes=None, execution_date=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

设置 TaskInstance 的状态,并清除处于 failed 或 upstream_failed 状态的下游任务。

参数:
  • task_id (str) – TaskInstance 的任务 ID

  • map_indexes (Collection[int] | None) – 仅当其 map_index 匹配时才设置 TaskInstance。如果为 None(默认),则设置该任务的所有映射 TaskInstance。

  • execution_date (datetime.datetime | None) – TaskInstance 的执行日期

  • run_id (str | None) – TaskInstance 的 run_id

  • state (airflow.utils.state.TaskInstanceState) – 要将 TaskInstance 设置为的状态

  • upstream (bool) – 包括给定 task_id 的所有上游任务

  • downstream (bool) – 包括给定 task_id 的所有下游任务

  • future (bool) – 包括给定 task_id 的所有未来 TaskInstance

  • commit (bool) – 提交更改

  • past (bool) – 包括给定 task_id 的所有过去 TaskInstance

set_task_group_state(*, group_id, execution_date=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

将 TaskGroup 设置为给定状态,并清除处于 failed 或 upstream_failed 状态的下游任务。

参数:
  • group_id (str) – TaskGroup 的 group_id

  • execution_date (datetime.datetime | None) – TaskInstance 的执行日期

  • run_id (str | None) – TaskInstance 的 run_id

  • state (airflow.utils.state.TaskInstanceState) – 要将 TaskInstance 设置为的状态

  • upstream (bool) – 包括给定 task_id 的所有上游任务

  • downstream (bool) – 包括给定 task_id 的所有下游任务

  • future (bool) – 包括给定 task_id 的所有未来 TaskInstance

  • commit (bool) – 提交更改

  • past (bool) – 包括给定 task_id 的所有过去 TaskInstance

  • session (sqlalchemy.orm.session.Session) – 新会话

topological_sort(include_subdag_tasks=False)[source]

按拓扑顺序对任务进行排序,使任务位于其任何上游依赖项之后。

已弃用,请使用 task_group.topological_sort 代替

set_dag_runs_state(state=DagRunState.RUNNING, session=NEW_SESSION, start_date=None, end_date=None, dag_ids=None)[source]
clear(task_ids=None, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, dag_run_state=DagRunState.QUEUED, dry_run=False, session=NEW_SESSION, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None, exclude_task_ids=frozenset())[source]

清除与当前 DAG 关联的、指定日期范围内的任务实例集合。

参数:
  • task_ids (Collection[str | tuple[str, int]] | None) – 要清除的任务 ID 列表或 (task_id, map_index) 元组。

  • start_date (datetime.datetime | None) – 要清除的最小 execution_date。

  • end_date (datetime.datetime | None) – 要清除的最大 execution_date。

  • only_failed (bool) – 仅清除失败的任务。

  • only_running (bool) – 仅清除正在运行的任务。

  • confirm_prompt (bool) – 请求确认。

  • include_subdags (bool) – 清除子 DAG 中的任务,并清除由 ExternalTaskMarker 指示的外部任务。

  • include_parentdag (bool) – 清除子 DAG 父 DAG 中的任务。

  • dag_run_state (airflow.utils.state.DagRunState) – 设置 DagRun 的状态。如果设置为 False,则不会更改 dagrun 状态。

  • dry_run (bool) – 查找要清除的任务,但不实际清除它们。

  • session (sqlalchemy.orm.session.Session) – 要使用的 sqlalchemy 会话。

  • dag_bag (airflow.models.dagbag.DagBag | None) – 用于查找 DAG 子 DAG 的 DagBag (可选)。

  • exclude_task_ids (frozenset[str] | frozenset[tuple[str, int]] | None) – 不应清除的 task_id 或 (task_id, map_index) 元组的集合。

classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]
__deepcopy__(memo)[source]
sub_dag(*args, **kwargs)[source]

请使用 airflow.models.DAG.partial_subset,此方法已弃用。

partial_subset(task_ids_or_regex, include_downstream=False, include_upstream=True, include_direct_upstream=False)[source]

根据与一个或多个任务匹配的正则表达式,返回当前 DAG 的子集。

根据应该匹配一个或多个任务的正则表达式,返回当前 DAG 的子集,作为当前 DAG 的深层副本,并根据传递的标志包含上游和下游的相邻任务。

参数:
  • task_ids_or_regex (str | Pattern | Iterable[str]) – 任务 ID 列表,或与任务 ID 匹配的正则表达式(作为字符串或已编译的正则表达式模式)。

  • include_downstream – 除了匹配的任务之外,还包含匹配任务的所有下游任务。

  • include_upstream – 除了匹配的任务之外,还包含匹配任务的所有上游任务。

  • include_direct_upstream – 包含匹配任务和下游任务(如果 include_downstream = True)的所有直接上游任务。

has_task(task_id)[source]
has_task_group(task_group_id)[source]
task_group_dict()[source]
get_task(task_id, include_subdags=False)[source]
pickle_info()[source]
pickle(session=NEW_SESSION)[源代码]
tree_view()[源代码]

打印 DAG 的 ASCII 树状表示。

get_tree_view()[源代码]

返回 DAG 的 ASCII 树状表示。

add_task(task)[源代码]

向 DAG 添加一个任务。

参数:

task (airflow.models.operator.Operator) – 你想要添加的任务

add_tasks(tasks)[源代码]

向 DAG 添加一个任务列表。

参数:

tasks (Iterable[airflow.models.operator.Operator]) – 你想要添加的任务列表

run(start_date=None, end_date=None, mark_success=False, local=False, donot_pickle=airflow_conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False, continue_on_failures=False, disable_retry=False)[源代码]

运行 DAG。

参数:
  • start_date – 要运行的范围的开始日期

  • end_date – 要运行的范围的结束日期

  • mark_success – 设置为 True 时,将作业标记为成功而不运行它们

  • local – 设置为 True 时,使用 LocalExecutor 运行任务

  • executor – 用于运行任务的执行器实例

  • donot_pickle – 设置为 True 时,避免序列化 DAG 对象并将其发送给工作节点

  • ignore_task_deps – 设置为 True 时,跳过上游任务

  • ignore_first_depends_on_past – 设置为 True 时,仅对第一组任务忽略 depends_on_past 依赖

  • pool – 要使用的资源池

  • delay_on_limit_secs – 当达到 max_active_runs 限制时,在下次尝试运行 DAG 运行之前等待的时间(秒)

  • verbose – 使日志输出更详细

  • conf – 从 CLI 传递的用户定义的字典

  • rerun_failed_tasks

  • run_backwards

  • run_at_least_once – 如果为 true,即使在时间范围内不存在逻辑运行,也始终至少运行一次 DAG。

cli()[源代码]

公开特定于此 DAG 的 CLI。

test(execution_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, use_executor=False, mark_success_pattern=None, session=NEW_SESSION)[源代码]

为给定的 DAG 和执行日期执行一次 DagRun。

参数:
  • execution_date (datetime.datetime | None) – DAG 运行的执行日期

  • run_conf (dict[str, Any] | None) – 传递给新创建的 dagrun 的配置

  • conn_file_path (str | None) – 连接文件的文件路径,格式为 yaml 或 json

  • variable_file_path (str | None) – 变量文件的文件路径,格式为 yaml 或 json

  • use_executor (bool) – 如果设置,则使用执行器来测试 DAG

  • mark_success_pattern (Pattern | str | None) – 要标记为成功而不是运行的 task_ids 的正则表达式

  • session (sqlalchemy.orm.session.Session) – 数据库连接(可选)

create_dagrun(state, execution_date=None, run_id=None, start_date=None, external_trigger=False, conf=None, run_type=None, session=NEW_SESSION, dag_hash=None, creating_job_id=None, data_interval=None)[源代码]

从此 DAG 创建一个 dag 运行,包括与此 DAG 关联的任务。

返回 dag 运行。

参数:
  • run_id (str | None) – 定义此 dag 运行的运行 id

  • run_type (airflow.utils.types.DagRunType | None) – DagRun 的类型

  • execution_date (datetime.datetime | None) – 此 dag 运行的执行日期

  • state (airflow.utils.state.DagRunState) – dag 运行的状态

  • start_date (datetime.datetime | None) – 应该评估此 dag 运行的日期

  • external_trigger (bool | None) – 此 dag 运行是否由外部触发

  • conf (dict | None) – 包含要传递给 DAG 的配置/参数的字典

  • creating_job_id (int | None) – 创建此 DagRun 的作业的 id

  • session (sqlalchemy.orm.session.Session) – 数据库会话

  • dag_hash (str | None) – 序列化 DAG 的哈希值

  • data_interval (tuple[datetime.datetime, datetime.datetime] | None) – DagRun 的数据间隔

classmethod bulk_sync_to_db(dags, session=NEW_SESSION)[source]

使用 airflow.models.DAG.bulk_write_to_db,此方法已弃用。

classmethod bulk_write_to_db(dags, processor_subdir=None, session=NEW_SESSION)[source]

确保数据库 dag 表中给定 DAG 的 DagModel 行是最新的。

请注意,此方法可以为 DAG 和 SubDAG 调用。SubDag 实际上是一个 SubDagOperator。

参数:

dags (Collection[DAG]) – 要保存到数据库的 DAG 对象

返回

sync_to_db(processor_subdir=None, session=NEW_SESSION)[source]

将此 DAG 的属性保存到数据库。

请注意,此方法可以为 DAG 和 SubDAG 调用。SubDag 实际上是一个 SubDagOperator。

返回

get_default_view()[source]

允许向后兼容的 jinja2 模板。

static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]

给定已知 DAG 的列表,停用在 ORM 中标记为活动的任何其他 DAG。

参数:

active_dag_ids – 活动 DAG ID 的列表

返回

static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]

停用上次被调度程序触及的时间早于过期日期的任何 DAG。

这些 DAG 很可能已被删除。

参数:

expiration_date – 设置在此时间之前被触及的非活动 DAG

返回

static get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION)[source]

返回给定 DAG 中任务实例的数量。

参数:
  • session – ORM 会话

  • dag_id – 要获取任务并发的 DAG 的 ID

  • run_id – 要获取任务并发的 DAG 运行的 ID

  • task_ids – 给定 DAG 的有效任务 ID 列表

  • states – 如果提供,则用于筛选的状态列表

返回

正在运行的任务数

返回类型

int

classmethod get_serialized_fields()[source]

字符串化的 DAG 和操作符只包含这些字段。

get_edge_info(upstream_task_id, downstream_task_id)[source]

返回给定任务对的边缘信息,如果没有信息,则返回空边缘。

set_edge_info(upstream_task_id, downstream_task_id, info)[source]

在 DAG 上设置给定的边缘信息。

请注意,这将覆盖现有信息,而不是与之合并。

validate_schedule_and_params()[source]

当 DAG 定义了计划时,验证 Param 值。

如果存在任何无法通过其架构定义解析的 Param,则引发异常。

解析给定的链接,并验证它是否是有效的 URL 或“mailto”链接。

返回无效的 (owner, link) 对的迭代器。

class airflow.models.dag.DagTag(name, doc)[source]

基类:airflow.models.base.Base

每个 DAG 的标签名称,允许在 DAG 视图中快速筛选。

__tablename__ = 'dag_tag'[source]
name[source]
dag_id[source]
__table_args__ = ()[source]
__repr__()[source]

返回 repr(self)。

airflow.models.dag.DagOwnerAttributes(name, doc)[源代码]

基类:airflow.models.base.Base

定义不同所有者属性的表。

例如,所有者的链接将作为超链接传递到“DAGs”视图。

__tablename__ = 'dag_owner_attributes'[源代码]
dag_id[源代码]
owner[源代码]
__repr__()[源代码]

返回 repr(self)。

classmethod get_all(session)[源代码]
airflow.models.dag.DagModel(concurrency=None, **kwargs)[源代码]

基类:airflow.models.base.Base

包含 DAG 属性的表。

property next_dagrun_data_interval: airflow.timetables.base.DataInterval | None[源代码]
property timezone[源代码]
property safe_dag_id[源代码]
property relative_fileloc: pathlib.Path | None[源代码]

可导入的 dag“文件”相对于配置的 DAG 文件夹的文件位置。

__tablename__ = 'dag'[源代码]

这些项存储在数据库中,用于与状态相关的信息

dag_id[源代码]
root_dag_id[源代码]
is_paused_at_creation[源代码]
is_paused[源代码]
is_subdag[源代码]
is_active[源代码]
last_parsed_time[源代码]
last_pickled[源代码]
last_expired[源代码]
scheduler_lock[源代码]
pickle_id[源代码]
fileloc[源代码]
processor_subdir[源代码]
owners[源代码]
description[源代码]
default_view[源代码]
schedule_interval[源代码]
timetable_description[源代码]
dataset_expression[源代码]
tags[源代码]
max_active_tasks[源代码]
max_active_runs[源代码]
max_consecutive_failed_dag_runs[源代码]
has_task_concurrency_limits[源代码]
has_import_errors[源代码]
next_dagrun[源代码]
next_dagrun_data_interval_start[源代码]
next_dagrun_data_interval_end[源代码]
next_dagrun_create_after[源代码]
__table_args__ = ()[源代码]
parent_dag[源代码]
schedule_dataset_references[源代码]
schedule_dataset_alias_references[源代码]
schedule_datasets[源代码]
task_outlet_dataset_references[源代码]
NUM_DAGS_PER_DAGRUN_QUERY[源代码]
__repr__()[源代码]

返回 repr(self)。

static get_dagmodel(dag_id, session=NEW_SESSION)[源代码]
classmethod get_current(dag_id, session=NEW_SESSION)[源代码]
get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)[源代码]
get_is_paused(*, session=None)[源代码]

提供与“DAG”的接口兼容性。

get_is_active(*, session=None)[源代码]

提供与“DAG”的接口兼容性。

static get_paused_dag_ids(dag_ids, session=NEW_SESSION)[源代码]

给定一个 dag_ids 列表,获取一组已暂停的 DAG ID。

参数:
  • dag_ids (list[str]) – Dag ID 列表

  • session (sqlalchemy.orm.session.Session) – ORM 会话

返回

已暂停的 Dag_ids

返回类型

set[str]

get_default_view()[源代码]

获取默认 DAG 视图,如果 DagModel 没有值,则返回默认配置值。

set_is_paused(is_paused, including_subdags=True, session=NEW_SESSION)[源代码]

暂停/取消暂停 DAG。

参数:
  • is_paused (bool) – DAG 是否已暂停

  • including_subdags (bool) – 是否包含 DAG 的子 DAG

  • session – 会话

dag_display_name()[源代码]
classmethod deactivate_deleted_dags(alive_dag_filelocs, processor_subdir, session=NEW_SESSION)[源代码]

将 DAG 的 is_active 设置为 False,对于那些 DAG 文件已被删除的 DAG。

参数:
  • alive_dag_filelocs (Container[str]) – 活跃 DAG 的文件路径

  • processor_subdir (str) – DAG 处理器子目录

  • session (sqlalchemy.orm.session.Session) – ORM 会话

classmethod dags_needing_dagruns(session)[源代码]

返回(并锁定)一个 DAG 对象列表,这些对象由于要创建一个新的 DagRun。

这将返回一个行级锁定的行结果集,使用“SELECT ... FOR UPDATE”查询,您应确保在单个事务中做出任何调度决策 - 一旦事务提交,它将被解锁。

calculate_dagrun_date_fields(dag, last_automated_dag_run)[源代码]

计算 next_dagrunnext_dagrun_create_after`

参数:
  • dag (DAG) – DAG 对象

  • last_automated_dag_run (None | datetime.datetime | airflow.timetables.base.DataInterval) – 此 DAG 最近一次运行的数据区间(或日期时间),如果尚未计划,则为 none。

get_dataset_triggered_next_run_info(*, session=NEW_SESSION)[源代码]
airflow.models.dag.dag(dag_id='', description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=airflow_conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs=airflow_conf.getint('core', 'max_active_runs_per_dag'), max_consecutive_failed_dag_runs=airflow_conf.getint('core', 'max_consecutive_failed_dag_runs_per_dag'), dagrun_timeout=None, sla_miss_callback=None, default_view=airflow_conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation=airflow_conf.get_mandatory_value('webserver', 'dag_orientation'), catchup=airflow_conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True, fail_stop=False, dag_display_name=None)[source]

Python DAG 装饰器,将函数包装到 Airflow DAG 中。

接受操作符 kwarg 的 kwargs。 可以用来参数化 DAG。

参数:
  • dag_args – DAG 对象的参数

  • dag_kwargs – DAG 对象的 Kwargs。

class airflow.models.dag.DagContext[source]

当 DAG 用作 ContextManager 时,DAG 上下文用于保持当前的 DAG。

可以将 DAG 用作上下文

with DAG(
    dag_id="example_dag",
    default_args=default_args,
    schedule="0 0 * * *",
    dagrun_timeout=timedelta(minutes=60),
) as dag:
    ...

如果这样做,上下文将存储 DAG,并且每当创建新任务时,它将使用此存储的 DAG 作为父 DAG。

autoregistered_dags: set[tuple[DAG, types.ModuleType]][source]
current_autoregister_module_name: str | None[source]
classmethod push_context_managed_dag(dag)[source]
classmethod pop_context_managed_dag()[source]
classmethod get_current_dag()[source]

此条目是否对您有帮助?