airflow.providers.databricks.operators.databricks

此模块包含 Databricks 操作器。

属性

DEFER_METHOD_NAME

XCOM_RUN_ID_KEY

XCOM_JOB_ID_KEY

XCOM_RUN_PAGE_URL_KEY

XCOM_STATEMENT_ID_KEY

DatabricksJobRunLink

构建一个用于监控 Databricks 作业运行的链接。

DatabricksCreateJobsOperator

使用 API 端点创建(或重置)一个 Databricks 作业。

DatabricksSubmitRunOperator

使用 api/2.1/jobs/runs/submit API 端点向 Databricks 提交一个 Spark 作业运行。

DatabricksRunNowOperator

使用 api/2.1/jobs/run-now API 端点在 Databricks 上运行一个现有的 Spark 作业运行。

DatabricksSQLStatementsOperator

使用 api/2.0/sql/statements/ API 端点向 Databricks 提交一个 Databricks SQL 语句。

DatabricksTaskBaseOperator

作为 Databricks 作业任务或 Databricks 工作流内任务运行的操作器的基类。

DatabricksNotebookOperator

使用 Airflow 操作器在 Databricks 上运行一个 Notebook。

DatabricksTaskOperator

使用 Airflow 操作器在 Databricks 上运行一个任务。

函数

is_repair_reason_match_exist(operator, run_state)

检查修复原因是否与运行状态消息匹配。

update_job_for_repair(operator, hook, job_id, run_state)

更新作业设置(部分)以修复所有失败任务的运行。

模块内容

airflow.providers.databricks.operators.databricks.DEFER_METHOD_NAME = 'execute_complete'[source]
airflow.providers.databricks.operators.databricks.XCOM_RUN_ID_KEY = 'run_id'[source]
airflow.providers.databricks.operators.databricks.XCOM_JOB_ID_KEY = 'job_id'[source]
airflow.providers.databricks.operators.databricks.XCOM_RUN_PAGE_URL_KEY = 'run_page_url'[source]
airflow.providers.databricks.operators.databricks.XCOM_STATEMENT_ID_KEY = 'statement_id'[source]
airflow.providers.databricks.operators.databricks.is_repair_reason_match_exist(operator, run_state)[source]

检查修复原因是否与运行状态消息匹配。

参数:
返回:

如果修复原因与运行状态消息匹配则返回 True,否则返回 False

返回类型:

bool

airflow.providers.databricks.operators.databricks.update_job_for_repair(operator, hook, job_id, run_state)[source]

更新作业设置(部分)以修复所有失败任务的运行。

参数:

基类: airflow.sdk.BaseOperatorLink

构建一个用于监控 Databricks 作业运行的链接。

name = 'See Databricks Job Run'[source]

链接的名称。这将是任务 UI 上的按钮名称。

外部系统的链接。

参数:
返回:

外部系统的链接

返回类型:

str

class airflow.providers.databricks.operators.databricks.DatabricksCreateJobsOperator(*, json=None, name=None, description=None, tags=None, tasks=None, job_clusters=None, email_notifications=None, webhook_notifications=None, notification_settings=None, timeout_seconds=None, schedule=None, max_concurrent_runs=None, git_source=None, access_control_list=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, databricks_retry_args=None, **kwargs)[source]

基类: airflow.models.BaseOperator

使用 API 端点创建(或重置)一个 Databricks 作业。

参数:
  • json (Any | None) –

    一个包含 API 参数的 JSON 对象,这些参数将直接传递给 api/2.1/jobs/create 端点。如果提供了此操作器的其他命名参数(即 nametagstasks 等),它们将与此 json 字典合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶层 json 键。(可模板化)

    另请参阅

    有关模板的更多信息,请参阅Jinja 模板

  • name (str | None) – 作业的可选名称。

  • description (str | None) – 作业的可选描述。

  • tags (dict[str, str] | None) – 与作业关联的标签映射。

  • tasks (list[dict] | None) – 将由该作业执行的任务规范列表。对象数组 (JobTaskSettings)。

  • job_clusters (list[dict] | None) – 可由该作业的任务共享和重用的作业集群规范列表。对象数组 (JobCluster)。

  • email_notifications (dict | None) – 对象 (JobEmailNotifications)。

  • webhook_notifications (dict | None) – 对象 (WebhookNotifications)。

  • notification_settings (dict | None) – 可选的通知设置。

  • timeout_seconds (int | None) – 应用于此作业每次运行的可选超时。

  • schedule (dict | None) – 对象 (CronSchedule)。

  • max_concurrent_runs (int | None) – 作业最大允许的并发运行数量。

  • git_source (dict | None) – 包含此作业的 Notebook 任务所使用的 Notebook 的远程仓库的可选规范。对象 (GitSource)。

  • access_control_list (list[dict] | None) –

    要在此作业上设置的权限列表。对象数组 (AccessControlRequestForUser) 或对象 (AccessControlRequestForGroup) 或对象 (AccessControlRequestForServicePrincipal)。

    另请参阅

    这仅在创建时使用。要重置 ACL,请考虑使用 Databricks UI。

  • databricks_conn_id (str) – 对 Databricks 连接 的引用。(可模板化)

  • polling_period_seconds (int) – 控制我们轮询此运行结果的速率。默认情况下,操作器将每 30 秒轮询一次。

  • databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。其值必须大于或等于 1。

  • databricks_retry_delay (int) – 两次重试之间等待的秒数(可能是一个浮点数)。

  • databricks_retry_args (dict[Any, Any] | None) – 包含传递给 tenacity.Retrying 类参数的可选字典。

template_fields: collections.abc.Sequence[str] = ('json', 'databricks_conn_id')[source]
ui_color = '#1CB1C2'[source]
ui_fgcolor = '#fff'[source]
json[source]
databricks_conn_id = 'databricks_default'[source]
polling_period_seconds = 30[source]
databricks_retry_limit = 3[source]
databricks_retry_delay = 1[source]
databricks_retry_args = None[source]
execute(context)[source]

创建操作器时派生。

Context 与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator(*, json=None, tasks=None, spark_jar_task=None, notebook_task=None, spark_python_task=None, spark_submit_task=None, pipeline_task=None, dbt_task=None, new_cluster=None, existing_cluster_id=None, libraries=None, run_name=None, timeout_seconds=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, databricks_retry_args=None, do_xcom_push=True, idempotency_token=None, access_control_list=None, wait_for_termination=True, git_source=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.models.BaseOperator

使用 api/2.1/jobs/runs/submit API 端点向 Databricks 提交一个 Spark 作业运行。

参阅: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit

有三种实例化此操作器的方法。

另请参阅

有关如何使用此操作器的更多信息,请参阅指南:DatabricksSubmitRunOperator

参数:
  • tasks (list[object] | None) –

    对象数组 (RunSubmitTaskSettings),项目数 <= 100。

  • json (Any | None) –

    一个包含 API 参数的 JSON 对象,这些参数将直接传递给 api/2.1/jobs/runs/submit 端点。如果提供了此操作器的其他命名参数(即 spark_jar_tasknotebook_task 等),它们将与此 json 字典合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶层 json 键。(可模板化)

  • spark_jar_task (dict[str, str] | None) –

    JAR 任务的主类和参数。请注意,实际的 JAR 在 libraries 中指定。必须指定 spark_jar_tasknotebook_taskspark_python_taskspark_submit_taskpipeline_taskdbt_task **其中之一**。此字段将是可模板化的。

  • notebook_task (dict[str, str] | None) –

    Notebook 任务的 Notebook 路径和参数。必须指定 spark_jar_tasknotebook_taskspark_python_taskspark_submit_taskpipeline_taskdbt_task **其中之一**。此字段将是可模板化的。

  • spark_python_task (dict[str, str | list[str]] | None) –

    运行 Python 文件所需的 Python 文件路径和参数。必须指定 spark_jar_tasknotebook_taskspark_python_taskspark_submit_taskpipeline_taskdbt_task **其中之一**。此字段将是可模板化的。

  • spark_submit_task (dict[str, list[str]] | None) –

    运行 spark-submit 命令所需的参数。必须指定 spark_jar_tasknotebook_taskspark_python_taskspark_submit_taskpipeline_taskdbt_task **其中之一**。此字段将是可模板化的。

  • pipeline_task (dict[str, str] | None) –

    执行 Delta Live Tables pipeline 任务所需的参数。提供的字典必须至少包含 pipeline_id 字段!必须指定 spark_jar_tasknotebook_taskspark_python_taskspark_submit_taskpipeline_taskdbt_task **其中之一**。此字段将是可模板化的。

  • dbt_task (dict[str, str | list[str]] | None) – 执行 dbt 任务所需的参数。提供的字典必须至少包含 commands 字段,并且还需要设置 git_source 参数。必须指定 spark_jar_tasknotebook_taskspark_python_taskspark_submit_taskpipeline_taskdbt_task **其中之一**。此字段将是可模板化的。

  • new_cluster (dict[str, object] | None) –

    用于运行此任务的新集群的规范。 必须 指定 new_cluster existing_cluster_id (使用 pipeline_task 时除外)。此字段将进行模板化。

  • existing_cluster_id (str | None) – 用于运行此任务的现有集群的 ID。 必须 指定 new_cluster existing_cluster_id (使用 pipeline_task 时除外)。此字段将进行模板化。

  • libraries (list[dict[str, Any]] | None) –

    此运行将使用的库。此字段将进行模板化。

  • run_name (str | None) – 用于此任务的运行名称。默认情况下,此名称将设置为 Airflow task_id。此 task_id 是超类 BaseOperator 的必需参数。此字段将进行模板化。

  • idempotency_token (str | None) – 一个可选令牌,可用于保证作业运行请求的幂等性。如果已存在使用提供的令牌的运行,则该请求不会创建新的运行,而是返回现有运行的 ID。此令牌最多必须包含 64 个字符。

  • access_control_list (list[dict[str, str]] | None) – 可选的字典列表,表示给定作业运行的访问控制列表 (ACL)。每个字典包含以下字段 - 特定主题(用户使用 user_name,组使用 group_name)以及该主题的 permission_level。有关更多详细信息,请参阅作业 API 文档。

  • wait_for_termination (bool) – 是否应等待作业运行终止。默认为 True

  • timeout_seconds (int | None) – 此运行的超时时间。默认为 0,表示没有超时。此字段将进行模板化。

  • databricks_conn_id (str) – Databricks 连接的引用。默认情况下,通常会是 databricks_default。要使用基于令牌的身份验证,请在连接的 extra 字段中提供键 token,并创建键 host 并将 host 字段留空。(模板化)

  • polling_period_seconds (int) – 控制我们轮询此运行结果的速率。默认情况下,操作器将每 30 秒轮询一次。

  • databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。其值必须大于或等于 1。

  • databricks_retry_delay (int) – 两次重试之间等待的秒数(可能是一个浮点数)。

  • databricks_retry_args (dict[Any, Any] | None) – 包含传递给 tenacity.Retrying 类参数的可选字典。

  • do_xcom_push (bool) – 是否应将 run_id 和 run_page_url 推送到 xcom。

  • git_source (dict[str, str] | None) – 可选地指定一个远程 Git 存储库,从中检索支持的任务类型。

  • deferrable (bool) –

    在可延迟模式下运行 operator。

template_fields: collections.abc.Sequence[str] = ('json', 'databricks_conn_id')[source]
template_ext: collections.abc.Sequence[str] = ('.json-tpl',)[source]
ui_color = '#1CB1C2'[source]
ui_fgcolor = '#fff'[source]
json[source]
databricks_conn_id = 'databricks_default'[source]
polling_period_seconds = 30[source]
databricks_retry_limit = 3[source]
databricks_retry_delay = 1[source]
databricks_retry_args = None[source]
wait_for_termination = True[source]
deferrable = True[source]
run_id: int | None = None[source]
do_xcom_push = True[source]
execute(context)[source]

创建操作器时派生。

Context 与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

on_kill()[source]

覆盖此方法以在任务实例被终止时清理子进程。

operator 中使用的任何 threading、subprocess 或 multiprocessing 模块都需要进行清理,否则会留下僵尸进程。

execute_complete(context, event)[source]
class airflow.providers.databricks.operators.databricks.DatabricksRunNowOperator(*, job_id=None, job_name=None, job_parameters=None, json=None, dbt_commands=None, notebook_params=None, python_params=None, jar_params=None, spark_submit_params=None, python_named_params=None, idempotency_token=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, databricks_retry_args=None, do_xcom_push=True, wait_for_termination=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), repair_run=False, databricks_repair_reason_new_settings=None, cancel_previous_runs=False, **kwargs)[source]

基类: airflow.models.BaseOperator

使用 api/2.1/jobs/run-now API 端点在 Databricks 上运行一个现有的 Spark 作业运行。

参见: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow

实例化此 operator 有两种方式。

第一种方式是,您可以直接将通常用于调用 api/2.1/jobs/run-now 端点的 JSON 负载通过 json 参数传递给我们的 DatabricksRunNowOperator。例如

json = {
    "job_id": 42,
    "job_parameters": {"dry-run": "true", "oldest-time-to-consider": "1457570074236"},
}

notebook_run = DatabricksRunNowOperator(task_id="notebook_run", json=json)

另一种实现相同目的的方式是直接使用 DatabricksRunNowOperator 的命名参数。请注意,对于 run-now 端点中的每个顶级参数,恰好有一个对应的命名参数。使用此方法时,您的代码将如下所示

job_id = 42

dbt_commands = ["dbt deps", "dbt seed", "dbt run"]

notebook_params = {"dry-run": "true", "oldest-time-to-consider": "1457570074236"}

python_params = ["douglas adams", "42"]

jar_params = ["douglas adams", "42"]

spark_submit_params = ["--class", "org.apache.spark.examples.SparkPi"]

notebook_run = DatabricksRunNowOperator(
    job_id=job_id,
    dbt_commands=dbt_commands,
    notebook_params=notebook_params,
    python_params=python_params,
    jar_params=jar_params,
    spark_submit_params=spark_submit_params,
)

如果同时提供了 json 参数 命名参数,它们将被合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶级 json 键。

目前 DatabricksRunNowOperator 支持的命名参数有
  • job_id

  • job_name

  • job_parameters

  • json

  • dbt_commands

  • notebook_params

  • python_params

  • python_named_parameters

  • jar_params

  • spark_submit_params

  • idempotency_token

  • repair_run

  • databricks_repair_reason_new_settings

  • cancel_previous_runs

参数:
  • job_id (str | None) –

    现有 Databricks 作业的 job_id。此字段将进行模板化。

  • job_name (str | None) – 现有 Databricks 作业的名称。必须只存在一个具有指定名称的作业。job_idjob_name 是互斥的。此字段将进行模板化。

  • job_parameters (dict[str, str] | None) –

    一个从键到值的字典,用于在此运行中覆盖或增强作业的参数。作业参数将被传递给作业中任何接受键值参数的任务。作业参数优先于 notebook_paramspython_paramspython_named_parametersjar_paramsspark_submit_params,且不能与它们组合使用。此字段将进行模板化。

  • json (Any | None) –

    一个包含 API 参数的 JSON 对象,这些参数将直接传递给 api/2.1/jobs/run-now 端点。如果提供了其他命名参数(例如 notebook_paramsspark_submit_params 等),它们将与此 json 字典合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶层 json 键。(模板化)

  • dbt_commands (list[str] | None) –

    一个列表,包含使用 dbt 命令行界面运行的 dbt 命令。此字段将进行模板化。

  • notebook_params (dict[str, str] | None) –

    包含键值对的字典,用于包含 notebook 任务的作业,例如:“notebook_params”:{“name”:“john doe”,“age”:“35”}。该映射将传递给 notebook,并且可以通过 dbutils.widgets.get 函数访问。有关更多信息,请参阅 Widgets。如果在 run-now 时未指定,则触发的运行将使用作业的基本参数。notebook_params 不能与 jar_params 同时指定。此字段的 json 表示(即 {“notebook_params”:{“name”:“john doe”,“age”:“35”}})不能超过 10,000 字节。此字段将进行模板化。

  • python_params (list[str] | None) –

    一个参数列表,用于包含 python 任务的作业,例如:“python_params”:[“john doe”,“35”]。这些参数将作为命令行参数传递给 python 文件。如果在 run-now 时指定,它将覆盖作业设置中指定的参数。此字段的 json 表示(即 {“python_params”:[“john doe”,“35”]})不能超过 10,000 字节。此字段将进行模板化。

  • python_named_params (dict[str, str] | None) –

    一个命名参数列表,用于包含 python wheel 任务的作业,例如:“python_named_params”:{“name”:“john doe”,“age”:“35”}。如果在 run-now 时指定,它将覆盖作业设置中指定的参数。此字段将进行模板化。

  • jar_params (list[str] | None) –

    一个参数列表,用于包含 JAR 任务的作业,例如:“jar_params”:[“john doe”,“35”]。这些参数将作为命令行参数传递给 JAR 文件。如果在 run-now 时指定,它将覆盖作业设置中指定的参数。此字段的 json 表示(即 {“jar_params”:[“john doe”,“35”]})不能超过 10,000 字节。此字段将进行模板化。

  • spark_submit_params (list[str] | None) –

    一个参数列表,用于包含 spark submit 任务的作业,例如:“spark_submit_params”:[”–class”,“org.apache.spark.examples.SparkPi”]。这些参数将作为命令行参数传递给 spark-submit 脚本。如果在 run-now 时指定,它将覆盖作业设置中指定的参数。此字段的 json 表示不能超过 10,000 字节。此字段将进行模板化。

  • idempotency_token (str | None) – 一个可选令牌,可用于保证作业运行请求的幂等性。如果已存在使用提供的令牌的运行,则该请求不会创建新的运行,而是返回现有运行的 ID。此令牌最多必须包含 64 个字符。

  • databricks_conn_id (str) – Databricks 连接的引用。默认情况下,通常会是 databricks_default。要使用基于令牌的身份验证,请在连接的 extra 字段中提供键 token,并创建键 host 并将 host 字段留空。(模板化)

  • polling_period_seconds (int) – 控制我们轮询此运行结果的速率。默认情况下,operator 每 30 秒轮询一次。

  • databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。其值必须大于或等于 1。

  • databricks_retry_delay (int) – 两次重试之间等待的秒数(可能是一个浮点数)。

  • databricks_retry_args (dict[Any, Any] | None) – 包含传递给 tenacity.Retrying 类参数的可选字典。

  • do_xcom_push (bool) – 是否应将 run_id 和 run_page_url 推送到 xcom。

  • wait_for_termination (bool) – 是否应等待作业运行终止。默认为 True

  • deferrable (bool) – 在可延迟模式下运行 operator。

  • repair_run (bool) – 如果 Databricks 运行失败,则修复它。

  • databricks_repair_reason_new_settings (dict[str, Any] | None) – 一个字典,包含用于修复运行的原因和 new_settings JSON 对象。默认为 NoneNone 表示在所有情况下使用现有作业设置进行修复,否则检查 RunState 的 state_message 是否包含原因,并使用 Databricks 部分作业更新端点 (https://docs.databricks.com/api/workspace/jobs/update) 根据 new_settings 更新作业设置。如果没有任何匹配项,则不会触发修复。

  • cancel_previous_runs (bool) – 在提交新作业之前取消所有现有的正在运行的作业。

template_fields: collections.abc.Sequence[str] = ('json', 'databricks_conn_id')[source]
template_ext: collections.abc.Sequence[str] = ('.json-tpl',)[source]
ui_color = '#1CB1C2'[source]
ui_fgcolor = '#fff'[source]
json[source]
databricks_conn_id = 'databricks_default'[source]
polling_period_seconds = 30[source]
databricks_retry_limit = 3[source]
databricks_retry_delay = 1[source]
databricks_retry_args = None[source]
wait_for_termination = True[source]
deferrable = True[source]
repair_run = False[source]
databricks_repair_reason_new_settings[source]
cancel_previous_runs = False[source]
run_id: int | None = None[source]
do_xcom_push = True[source]
execute(context)[source]

创建操作器时派生。

Context 与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event=None)[source]
on_kill()[source]

覆盖此方法以在任务实例被终止时清理子进程。

operator 中使用的任何 threading、subprocess 或 multiprocessing 模块都需要进行清理,否则会留下僵尸进程。

class airflow.providers.databricks.operators.databricks.DatabricksSQLStatementsOperator(statement, warehouse_id, *, catalog=None, schema=None, parameters=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, databricks_retry_args=None, do_xcom_push=True, wait_for_termination=True, timeout=3600, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.models.BaseOperator

使用 api/2.0/sql/statements/ API 端点向 Databricks 提交一个 Databricks SQL 语句。

请参阅:https://docs.databricks.com/api/workspace/statementexecution

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南:DatabricksSQLStatementsOperator

参数:
  • statement (str) – 要执行的 SQL 语句。该语句可以选择参数化,请参阅 parameters。

  • warehouse_id (str) – 执行语句的仓库。

  • catalog (str | None) – 为语句执行设置默认目录,类似于 SQL 中的 USE CATALOG。

  • schema (str | None) – 为语句执行设置默认模式,类似于 SQL 中的 USE SCHEMA。

  • parameters (list[dict[str, Any]] | None) –

    要传递到包含参数标记的 SQL 语句中的参数列表。

  • wait_for_termination (bool) – 我们是否应该等待语句执行终止。默认为 True

  • databricks_conn_id (str) – Databricks 连接的引用。默认情况下,通常会是 databricks_default。要使用基于令牌的身份验证,请在连接的 extra 字段中提供键 token,并创建键 host 并将 host 字段留空。(模板化)

  • polling_period_seconds (int) – 控制我们轮询此语句结果的频率。默认情况下,Operator 将每 30 秒轮询一次。

  • databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。其值必须大于或等于 1。

  • databricks_retry_delay (int) – 两次重试之间等待的秒数(可能是一个浮点数)。

  • databricks_retry_args (dict[Any, Any] | None) – 包含传递给 tenacity.Retrying 类参数的可选字典。

  • do_xcom_push (bool) – 是否应该将 statement_id 推送到 xcom。

  • timeout (float) – 执行 SQL 语句的 Airflow 任务的超时时间。默认值为 3600 秒。

  • deferrable (bool) – 在可延迟模式下运行 operator。

template_fields: collections.abc.Sequence[str] = ('databricks_conn_id',)[source]
template_ext: collections.abc.Sequence[str] = ('.json-tpl',)[source]
ui_color = '#1CB1C2'[source]
ui_fgcolor = '#fff'[source]
statement[source]
warehouse_id[source]
catalog = None[source]
schema = None[source]
parameters = None[source]
databricks_conn_id = 'databricks_default'[source]
polling_period_seconds = 30[source]
databricks_retry_limit = 3[source]
databricks_retry_delay = 1[source]
databricks_retry_args = None[source]
wait_for_termination = True[source]
deferrable = True[source]
statement_id: str | None = None[source]
timeout = 3600[source]
do_xcom_push = True[source]
execute(context)[source]

创建操作器时派生。

Context 与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

on_kill()[source]

覆盖此方法以在任务实例被终止时清理子进程。

operator 中使用的任何 threading、subprocess 或 multiprocessing 模块都需要进行清理,否则会留下僵尸进程。

execute_complete(context, event)[source]
class airflow.providers.databricks.operators.databricks.DatabricksTaskBaseOperator(caller='DatabricksTaskBaseOperator', databricks_conn_id='databricks_default', databricks_task_key='', databricks_retry_args=None, databricks_retry_delay=1, databricks_retry_limit=3, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), existing_cluster_id='', job_cluster_key='', new_cluster=None, polling_period_seconds=5, wait_for_termination=True, workflow_run_metadata=None, **kwargs)[source]

基类:airflow.models.BaseOperatorabc.ABC

作为 Databricks 作业任务或 Databricks 工作流内任务运行的操作器的基类。

参数:
  • caller (str) – 用于日志中的调用者 Operator 名称。

  • databricks_conn_id (str) – 要使用的 Airflow 连接名称。

  • databricks_task_key (str) – 一个可选的 `task_key`,用于 Databricks API 引用任务。默认情况下,这将设置为 dag_id + task_id 的哈希值。

  • databricks_retry_args (dict[Any, Any] | None) – 包含传递给 tenacity.Retrying 类参数的可选字典。

  • databricks_retry_delay (int) – 两次重试之间等待的秒数。

  • databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。

  • deferrable (bool) – 是否以可延迟模式运行 Operator。

  • existing_cluster_id (str) – 用于运行此任务的现有集群 ID。

  • job_cluster_key (str) – 作业集群的 key。

  • new_cluster (dict[str, Any] | None) – 用于运行此任务的新集群规范。

  • notebook_packages – 要安装在运行 Notebook 的集群上的 Python 库列表。

  • notebook_params – 一个字典,包含要作为可选参数传递给 Notebook 任务的键值对。

  • polling_period_seconds (int) – 控制我们轮询此 Notebook 作业运行结果的频率。

  • wait_for_termination (bool) – 是否应等待作业运行终止。默认为 True

  • workflow_run_metadata (dict[str, Any] | None) – 工作流运行的元数据。当 Operator 在工作流中使用时,会用到此元数据。预期它是一个字典,包含工作流的 `run_id` 和 `conn_id`。

caller = 'DatabricksTaskBaseOperator'[source]
databricks_conn_id = 'databricks_default'[source]
databricks_retry_args = None[source]
databricks_retry_delay = 1[source]
databricks_retry_limit = 3[source]
deferrable = True[source]
existing_cluster_id = ''[source]
job_cluster_key = ''[source]
new_cluster[source]
polling_period_seconds = 5[source]
wait_for_termination = True[source]
workflow_run_metadata = None[source]
databricks_run_id: int | None = None[source]
property databricks_task_key: str[source]
monitor_databricks_job()[source]

监控 Databricks 作业。

等待作业终止。如果是可延迟模式,则延迟任务。

execute(context)[source]

执行 Operator。如果 wait_for_termination 设置为 True,则启动作业并监控它。

execute_complete(context, event)[source]
class airflow.providers.databricks.operators.databricks.DatabricksNotebookOperator(notebook_path, source, databricks_conn_id='databricks_default', databricks_retry_args=None, databricks_retry_delay=1, databricks_retry_limit=3, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), existing_cluster_id='', job_cluster_key='', new_cluster=None, notebook_packages=None, notebook_params=None, polling_period_seconds=5, wait_for_termination=True, workflow_run_metadata=None, **kwargs)[来源]

Bases: DatabricksTaskBaseOperator

使用 Airflow 操作器在 Databricks 上运行一个 Notebook。

DatabricksNotebookOperator 允许用户在 Databricks 上启动和监控笔记本作业运行,将其作为 Airflow 任务。它可以作为 DatabricksWorkflowTaskGroup 的一部分使用,以利用作业集群的优势,这使得用户可以在更便宜且可在任务之间共享的集群上运行其任务。

另请参阅

关于如何使用此操作符的更多信息,请参阅指南: DatabricksNotebookOperator

参数:
  • notebook_path (str) – Databricks 中笔记本的路径。

  • source (str) – 笔记本的可选位置类型。当设置为 WORKSPACE 时,将从本地 Databricks 工作区获取笔记本。当设置为 GIT 时,将从 git_source 中定义的 Git 仓库获取笔记本。如果该值为空,如果定义了 git_source,任务将使用 GIT;否则使用 WORKSPACE。更多信息请访问 https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate

  • databricks_conn_id (str) – 要使用的 Airflow 连接名称。

  • databricks_retry_args (dict[Any, Any] | None) – 包含传递给 tenacity.Retrying 类参数的可选字典。

  • databricks_retry_delay (int) – 两次重试之间等待的秒数。

  • databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。

  • deferrable (bool) – 是否以可延迟模式运行 Operator。

  • existing_cluster_id (str) – 用于运行此任务的现有集群 ID。

  • job_cluster_key (str) – 作业集群的 key。

  • new_cluster (dict[str, Any] | None) – 用于运行此任务的新集群规范。

  • notebook_packages (list[dict[str, Any]] | None) – 要安装在运行笔记本的集群上的 Python 库列表。

  • notebook_params (dict | None) – 要作为可选参数传递给笔记本任务的键值对字典。

  • polling_period_seconds (int) – 控制我们轮询此 Notebook 作业运行结果的频率。

  • wait_for_termination (bool) – 是否应等待作业运行终止。默认为 True

  • workflow_run_metadata (dict | None) – 工作流运行的元数据。在工作流中使用操作符时使用。预计它是一个包含工作流的 run_id 和 conn_id 的字典。

template_fields = ('notebook_params', 'workflow_run_metadata')[来源]
CALLER = 'DatabricksNotebookOperator'[来源]
notebook_path[来源]
source[来源]
notebook_packages = [][来源]
notebook_params[来源]
class airflow.providers.databricks.operators.databricks.DatabricksTaskOperator(task_config, databricks_conn_id='databricks_default', databricks_retry_args=None, databricks_retry_delay=1, databricks_retry_limit=3, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), existing_cluster_id='', job_cluster_key='', new_cluster=None, polling_period_seconds=5, wait_for_termination=True, workflow_run_metadata=None, **kwargs)[来源]

Bases: DatabricksTaskBaseOperator

使用 Airflow 操作器在 Databricks 上运行一个任务。

DatabricksTaskOperator 允许用户在 Databricks 上启动和监控任务作业运行,将其作为 Airflow 任务。它可以作为 DatabricksWorkflowTaskGroup 的一部分使用,以利用作业集群的优势,这使得用户可以在更便宜且可在任务之间共享的集群上运行其任务。

另请参阅

关于如何使用此操作符的更多信息,请参阅指南: DatabricksTaskOperator

参数:
  • task_config (dict) – 要在 Databricks 上运行的任务配置。

  • databricks_conn_id (str) – 要使用的 Airflow 连接名称。

  • databricks_retry_args (dict[Any, Any] | None) – 包含传递给 tenacity.Retrying 类参数的可选字典。

  • databricks_retry_delay (int) – 两次重试之间等待的秒数。

  • databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。

  • deferrable (bool) – 是否以可延迟模式运行 Operator。

  • existing_cluster_id (str) – 用于运行此任务的现有集群 ID。

  • job_cluster_key (str) – 作业集群的 key。

  • new_cluster (dict[str, Any] | None) – 用于运行此任务的新集群规范。

  • polling_period_seconds (int) – 控制我们轮询此 Notebook 作业运行结果的频率。

  • wait_for_termination (bool) – 是否应等待作业运行终止。默认为 True

CALLER = 'DatabricksTaskOperator'[来源]
template_fields = ('workflow_run_metadata',)[来源]
task_config[来源]

此条目有帮助吗?