2025 年 Airflow Summit 将于 10 月 07 日至 09 日举行。立即注册获取早鸟票!

airflow.models.dagbag

FileLoadStat

关于单个文件的信息。

DagBag

DagBag 是一个 DAG 的集合,从文件夹树中解析出来,并具有高级配置设置。

DagPriorityParsingRequest

存储文件解析时将被优先处理的 DAG 解析请求的模型。

函数

generate_md5_hash(context)

模块内容

class airflow.models.dagbag.FileLoadStat[source]

基类: NamedTuple

关于单个文件的信息。

参数:
  • file – 加载的文件。

  • duration – 处理文件所花费的时间。

  • dag_num – 此文件中加载的 DAG 总数。

  • task_num – 此文件中加载的任务总数。

  • dags – 此文件中加载的 DAG 名称。

  • warning_num – 处理此文件捕获的警告总数。

file: str[source]
duration: datetime.timedelta[source]
dag_num: int[source]
task_num: int[source]
dags: str[source]
warning_num: int[source]
class airflow.models.dagbag.DagBag(dag_folder=None, include_examples=NOTSET, safe_mode=NOTSET, read_dags_from_db=False, load_op_links=True, collect_dags=True, known_pools=None, bundle_path=None)[source]

基类: airflow.utils.log.logging_mixin.LoggingMixin

DagBag 是一个 DAG 的集合,从文件夹树中解析出来,并具有高级配置设置。

一些可能的设置包括用作后端的数据库以及用于触发任务的执行器。这使得为生产、开发、测试或不同团队或安全配置文件等运行独立环境变得更容易。曾经是系统级别的设置现在变为 DagBag 级别,以便一个系统可以运行多个独立的设置集。

参数:
  • dag_folder (str | pathlib.Path | None) – 扫描以查找 DAG 的文件夹

  • include_examples (bool | airflow.utils.types.ArgNotSet) – 是否包含 Airflow 附带的示例

  • safe_mode (bool | airflow.utils.types.ArgNotSet) – 当 False 时,扫描所有 Python 模块以查找 DAG。当 True 时,使用启发式方法(包含 DAGairflow 字符串的文件)过滤要扫描的 Python 模块以查找 DAG。

  • read_dags_from_db (bool) – 如果传入 True,则从数据库读取 DAG。如果为 False,则从 Python 文件读取 DAG。

  • load_op_links (bool) – 反序列化 DAG 时是否应通过插件加载额外的 Operator 链接?此标志在调度器 (Scheduler) 中设置为 False,以便不加载额外 Operator 链接,以免在调度器中运行用户代码。

  • collect_dags (bool) – 当为 True 时,在类初始化期间收集 DAG。

  • known_pools (set[str] | None) – 如果不是 None,则当 Task 尝试使用未知 Pool 时生成警告。

bundle_path: pathlib.Path | None = None[source]
dag_folder[source]
dags: dict[str, airflow.models.dag.DAG][source]
file_last_changed: dict[str, datetime.datetime][source]
import_errors: dict[str, str][source]
captured_warnings: dict[str, tuple[str, Ellipsis]][source]
has_logged = False[source]
read_dags_from_db = False[source]
dags_last_fetched: dict[str, datetime.datetime][source]
dags_hash: dict[str, str][source]
known_pools = None[source]
dagbag_import_error_tracebacks = True[source]
dagbag_import_error_traceback_depth[source]
size()[source]
返回值:

此 dagbag 中包含的 dag 数量

返回类型:

int

property dag_ids: list[str][source]

获取 DAG ID。

返回值:

此 bag 中的 DAG ID 列表

返回类型:

list[str]

get_dag(dag_id, session=None)[source]

从字典中获取 DAG,如果过期则刷新。

参数:

dag_id – DAG ID

process_file(filepath, only_if_updated=True, safe_mode=True)[source]

给定 Python 模块或 Zip 文件路径,导入模块并在其中查找 DAG 对象。

property dag_warnings: set[airflow.models.dagwarning.DagWarning][source]

获取 DagBag 中 DAG 的 DagWarning 集合。

bag_dag(dag)[source]

将 DAG 添加到 bag 中。

抛出:

如果检测到循环,则抛出 AirflowDagCycleException。

抛出:

如果此 DAG 已存在于 bag 中,则抛出 AirflowDagDuplicatedIdException。

collect_dags(dag_folder=None, only_if_updated=True, include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'))[source]

在给定路径中查找 Python 模块,导入它们,并将它们添加到 DagBag 集合中。

注意,如果在处理目录时找到 .airflowignore 文件,其行为将非常类似于 .gitignore,忽略与文件中指定任何模式匹配的文件。

注意: .airflowignore 中的模式根据 DAG_IGNORE_FILE_SYNTAX 配置参数,被解释为非锚定正则表达式或类似 gitignore 的 glob 表达式。

collect_dags_from_db()[source]

从数据库收集 DAG。

dagbag_report()[source]

打印关于 DagBag 加载统计信息的报告。

sync_to_db(bundle_name, bundle_version, session=NEW_SESSION)[source]

将 DAG 列表的相关属性保存到数据库。

airflow.models.dagbag.generate_md5_hash(context)[source]
class airflow.models.dagbag.DagPriorityParsingRequest(bundle_name, relative_fileloc)[source]

基类: airflow.models.base.Base

存储文件解析时将被优先处理的 DAG 解析请求的模型。

__tablename__ = 'dag_priority_parsing_request'[source]
id[source]
bundle_name[source]
relative_fileloc[source]
__repr__()[source]

此条目有帮助吗?