airflow.providers.databricks.operators.databricks¶
此模块包含 Databricks 操作器。
属性¶
类¶
构建一个用于监控 Databricks 作业运行的链接。 |
|
使用 API 端点创建(或重置)一个 Databricks 作业。 |
|
使用 api/2.1/jobs/runs/submit API 端点向 Databricks 提交一个 Spark 作业运行。 |
|
使用 api/2.1/jobs/run-now API 端点在 Databricks 上运行一个现有的 Spark 作业运行。 |
|
使用 api/2.0/sql/statements/ API 端点向 Databricks 提交一个 Databricks SQL 语句。 |
|
作为 Databricks 作业任务或 Databricks 工作流内任务运行的操作器的基类。 |
|
使用 Airflow 操作器在 Databricks 上运行一个 Notebook。 |
|
使用 Airflow 操作器在 Databricks 上运行一个任务。 |
函数¶
|
检查修复原因是否与运行状态消息匹配。 |
|
更新作业设置(部分)以修复所有失败任务的运行。 |
模块内容¶
- airflow.providers.databricks.operators.databricks.is_repair_reason_match_exist(operator, run_state)[source]¶
检查修复原因是否与运行状态消息匹配。
- 参数:
operator (Any) – 正在处理的 Databricks 操作器
run_state (airflow.providers.databricks.hooks.databricks.RunState) – Databricks 作业的运行状态
- 返回:
如果修复原因与运行状态消息匹配则返回 True,否则返回 False
- 返回类型:
- airflow.providers.databricks.operators.databricks.update_job_for_repair(operator, hook, job_id, run_state)[source]¶
更新作业设置(部分)以修复所有失败任务的运行。
- 参数:
operator (Any) – 正在处理的 Databricks 操作器
hook (Any) – Databricks hook
job_id (int) – Databricks 作业 ID
run_state (airflow.providers.databricks.hooks.databricks.RunState) – Databricks 作业的运行状态
- class airflow.providers.databricks.operators.databricks.DatabricksJobRunLink[source]¶
基类:
airflow.sdk.BaseOperatorLink
构建一个用于监控 Databricks 作业运行的链接。
- get_link(operator, *, ti_key)[source]¶
外部系统的链接。
- 参数:
operator (airflow.models.BaseOperator) – 此链接关联的 Airflow 操作器对象。
ti_key (airflow.models.taskinstancekey.TaskInstanceKey) – 要返回链接的任务实例 ID。
- 返回:
外部系统的链接
- 返回类型:
- class airflow.providers.databricks.operators.databricks.DatabricksCreateJobsOperator(*, json=None, name=None, description=None, tags=None, tasks=None, job_clusters=None, email_notifications=None, webhook_notifications=None, notification_settings=None, timeout_seconds=None, schedule=None, max_concurrent_runs=None, git_source=None, access_control_list=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, databricks_retry_args=None, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
使用 API 端点创建(或重置)一个 Databricks 作业。
另请参阅
https://docs.databricks.com/api/workspace/jobs/create https://docs.databricks.com/api/workspace/jobs/reset
- 参数:
json (Any | None) –
一个包含 API 参数的 JSON 对象,这些参数将直接传递给
api/2.1/jobs/create
端点。如果提供了此操作器的其他命名参数(即name
、tags
、tasks
等),它们将与此 json 字典合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶层 json 键。(可模板化)另请参阅
有关模板的更多信息,请参阅Jinja 模板。
name (str | None) – 作业的可选名称。
description (str | None) – 作业的可选描述。
tasks (list[dict] | None) – 将由该作业执行的任务规范列表。对象数组 (JobTaskSettings)。
job_clusters (list[dict] | None) – 可由该作业的任务共享和重用的作业集群规范列表。对象数组 (JobCluster)。
email_notifications (dict | None) – 对象 (JobEmailNotifications)。
webhook_notifications (dict | None) – 对象 (WebhookNotifications)。
notification_settings (dict | None) – 可选的通知设置。
timeout_seconds (int | None) – 应用于此作业每次运行的可选超时。
schedule (dict | None) – 对象 (CronSchedule)。
max_concurrent_runs (int | None) – 作业最大允许的并发运行数量。
git_source (dict | None) – 包含此作业的 Notebook 任务所使用的 Notebook 的远程仓库的可选规范。对象 (GitSource)。
access_control_list (list[dict] | None) –
要在此作业上设置的权限列表。对象数组 (AccessControlRequestForUser) 或对象 (AccessControlRequestForGroup) 或对象 (AccessControlRequestForServicePrincipal)。
另请参阅
这仅在创建时使用。要重置 ACL,请考虑使用 Databricks UI。
databricks_conn_id (str) – 对 Databricks 连接 的引用。(可模板化)
polling_period_seconds (int) – 控制我们轮询此运行结果的速率。默认情况下,操作器将每 30 秒轮询一次。
databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。其值必须大于或等于 1。
databricks_retry_delay (int) – 两次重试之间等待的秒数(可能是一个浮点数)。
databricks_retry_args (dict[Any, Any] | None) – 包含传递给
tenacity.Retrying
类参数的可选字典。
- template_fields: collections.abc.Sequence[str] = ('json', 'databricks_conn_id')[source]¶
- class airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator(*, json=None, tasks=None, spark_jar_task=None, notebook_task=None, spark_python_task=None, spark_submit_task=None, pipeline_task=None, dbt_task=None, new_cluster=None, existing_cluster_id=None, libraries=None, run_name=None, timeout_seconds=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, databricks_retry_args=None, do_xcom_push=True, idempotency_token=None, access_control_list=None, wait_for_termination=True, git_source=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
使用 api/2.1/jobs/runs/submit API 端点向 Databricks 提交一个 Spark 作业运行。
参阅: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit
有三种实例化此操作器的方法。
另请参阅
有关如何使用此操作器的更多信息,请参阅指南:DatabricksSubmitRunOperator
- 参数:
对象数组 (RunSubmitTaskSettings),项目数 <= 100。
json (Any | None) –
一个包含 API 参数的 JSON 对象,这些参数将直接传递给
api/2.1/jobs/runs/submit
端点。如果提供了此操作器的其他命名参数(即spark_jar_task
、notebook_task
等),它们将与此 json 字典合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶层 json 键。(可模板化)spark_jar_task (dict[str, str] | None) –
JAR 任务的主类和参数。请注意,实际的 JAR 在
libraries
中指定。必须指定spark_jar_task
、notebook_task
、spark_python_task
、spark_submit_task
、pipeline_task
或dbt_task
**其中之一**。此字段将是可模板化的。notebook_task (dict[str, str] | None) –
Notebook 任务的 Notebook 路径和参数。必须指定
spark_jar_task
、notebook_task
、spark_python_task
、spark_submit_task
、pipeline_task
或dbt_task
**其中之一**。此字段将是可模板化的。spark_python_task (dict[str, str | list[str]] | None) –
运行 Python 文件所需的 Python 文件路径和参数。必须指定
spark_jar_task
、notebook_task
、spark_python_task
、spark_submit_task
、pipeline_task
或dbt_task
**其中之一**。此字段将是可模板化的。spark_submit_task (dict[str, list[str]] | None) –
运行 spark-submit 命令所需的参数。必须指定
spark_jar_task
、notebook_task
、spark_python_task
、spark_submit_task
、pipeline_task
或dbt_task
**其中之一**。此字段将是可模板化的。pipeline_task (dict[str, str] | None) –
执行 Delta Live Tables pipeline 任务所需的参数。提供的字典必须至少包含
pipeline_id
字段!必须指定spark_jar_task
、notebook_task
、spark_python_task
、spark_submit_task
、pipeline_task
或dbt_task
**其中之一**。此字段将是可模板化的。dbt_task (dict[str, str | list[str]] | None) – 执行 dbt 任务所需的参数。提供的字典必须至少包含
commands
字段,并且还需要设置git_source
参数。必须指定spark_jar_task
、notebook_task
、spark_python_task
、spark_submit_task
、pipeline_task
或dbt_task
**其中之一**。此字段将是可模板化的。new_cluster (dict[str, object] | None) –
用于运行此任务的新集群的规范。 必须 指定
new_cluster
或existing_cluster_id
(使用pipeline_task
时除外)。此字段将进行模板化。existing_cluster_id (str | None) – 用于运行此任务的现有集群的 ID。 必须 指定
new_cluster
或existing_cluster_id
(使用pipeline_task
时除外)。此字段将进行模板化。libraries (list[dict[str, Any]] | None) –
此运行将使用的库。此字段将进行模板化。
run_name (str | None) – 用于此任务的运行名称。默认情况下,此名称将设置为 Airflow
task_id
。此task_id
是超类BaseOperator
的必需参数。此字段将进行模板化。idempotency_token (str | None) – 一个可选令牌,可用于保证作业运行请求的幂等性。如果已存在使用提供的令牌的运行,则该请求不会创建新的运行,而是返回现有运行的 ID。此令牌最多必须包含 64 个字符。
access_control_list (list[dict[str, str]] | None) – 可选的字典列表,表示给定作业运行的访问控制列表 (ACL)。每个字典包含以下字段 - 特定主题(用户使用
user_name
,组使用group_name
)以及该主题的permission_level
。有关更多详细信息,请参阅作业 API 文档。wait_for_termination (bool) – 是否应等待作业运行终止。默认为
True
。timeout_seconds (int | None) – 此运行的超时时间。默认为 0,表示没有超时。此字段将进行模板化。
databricks_conn_id (str) – Databricks 连接的引用。默认情况下,通常会是
databricks_default
。要使用基于令牌的身份验证,请在连接的 extra 字段中提供键token
,并创建键host
并将host
字段留空。(模板化)polling_period_seconds (int) – 控制我们轮询此运行结果的速率。默认情况下,操作器将每 30 秒轮询一次。
databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。其值必须大于或等于 1。
databricks_retry_delay (int) – 两次重试之间等待的秒数(可能是一个浮点数)。
databricks_retry_args (dict[Any, Any] | None) – 包含传递给
tenacity.Retrying
类参数的可选字典。do_xcom_push (bool) – 是否应将 run_id 和 run_page_url 推送到 xcom。
git_source (dict[str, str] | None) – 可选地指定一个远程 Git 存储库,从中检索支持的任务类型。
deferrable (bool) –
在可延迟模式下运行 operator。
- template_fields: collections.abc.Sequence[str] = ('json', 'databricks_conn_id')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json-tpl',)[source]¶
- class airflow.providers.databricks.operators.databricks.DatabricksRunNowOperator(*, job_id=None, job_name=None, job_parameters=None, json=None, dbt_commands=None, notebook_params=None, python_params=None, jar_params=None, spark_submit_params=None, python_named_params=None, idempotency_token=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, databricks_retry_args=None, do_xcom_push=True, wait_for_termination=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), repair_run=False, databricks_repair_reason_new_settings=None, cancel_previous_runs=False, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
使用 api/2.1/jobs/run-now API 端点在 Databricks 上运行一个现有的 Spark 作业运行。
参见: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow
实例化此 operator 有两种方式。
第一种方式是,您可以直接将通常用于调用
api/2.1/jobs/run-now
端点的 JSON 负载通过json
参数传递给我们的DatabricksRunNowOperator
。例如json = { "job_id": 42, "job_parameters": {"dry-run": "true", "oldest-time-to-consider": "1457570074236"}, } notebook_run = DatabricksRunNowOperator(task_id="notebook_run", json=json)
另一种实现相同目的的方式是直接使用
DatabricksRunNowOperator
的命名参数。请注意,对于run-now
端点中的每个顶级参数,恰好有一个对应的命名参数。使用此方法时,您的代码将如下所示job_id = 42 dbt_commands = ["dbt deps", "dbt seed", "dbt run"] notebook_params = {"dry-run": "true", "oldest-time-to-consider": "1457570074236"} python_params = ["douglas adams", "42"] jar_params = ["douglas adams", "42"] spark_submit_params = ["--class", "org.apache.spark.examples.SparkPi"] notebook_run = DatabricksRunNowOperator( job_id=job_id, dbt_commands=dbt_commands, notebook_params=notebook_params, python_params=python_params, jar_params=jar_params, spark_submit_params=spark_submit_params, )
如果同时提供了 json 参数 和 命名参数,它们将被合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶级
json
键。- 目前
DatabricksRunNowOperator
支持的命名参数有 job_id
job_name
job_parameters
json
dbt_commands
notebook_params
python_params
python_named_parameters
jar_params
spark_submit_params
idempotency_token
repair_run
databricks_repair_reason_new_settings
cancel_previous_runs
- 参数:
job_id (str | None) –
现有 Databricks 作业的 job_id。此字段将进行模板化。
job_name (str | None) – 现有 Databricks 作业的名称。必须只存在一个具有指定名称的作业。
job_id
和job_name
是互斥的。此字段将进行模板化。job_parameters (dict[str, str] | None) –
一个从键到值的字典,用于在此运行中覆盖或增强作业的参数。作业参数将被传递给作业中任何接受键值参数的任务。作业参数优先于
notebook_params
、python_params
、python_named_parameters
、jar_params
和spark_submit_params
,且不能与它们组合使用。此字段将进行模板化。json (Any | None) –
一个包含 API 参数的 JSON 对象,这些参数将直接传递给
api/2.1/jobs/run-now
端点。如果提供了其他命名参数(例如notebook_params
、spark_submit_params
等),它们将与此 json 字典合并。如果在合并过程中发生冲突,命名参数将优先并覆盖顶层 json 键。(模板化)另请参阅
有关模板化的更多信息,请参阅 Jinja 模板化。 https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow
dbt_commands (list[str] | None) –
一个列表,包含使用 dbt 命令行界面运行的 dbt 命令。此字段将进行模板化。
notebook_params (dict[str, str] | None) –
包含键值对的字典,用于包含 notebook 任务的作业,例如:“notebook_params”:{“name”:“john doe”,“age”:“35”}。该映射将传递给 notebook,并且可以通过 dbutils.widgets.get 函数访问。有关更多信息,请参阅 Widgets。如果在 run-now 时未指定,则触发的运行将使用作业的基本参数。notebook_params 不能与 jar_params 同时指定。此字段的 json 表示(即 {“notebook_params”:{“name”:“john doe”,“age”:“35”}})不能超过 10,000 字节。此字段将进行模板化。
python_params (list[str] | None) –
一个参数列表,用于包含 python 任务的作业,例如:“python_params”:[“john doe”,“35”]。这些参数将作为命令行参数传递给 python 文件。如果在 run-now 时指定,它将覆盖作业设置中指定的参数。此字段的 json 表示(即 {“python_params”:[“john doe”,“35”]})不能超过 10,000 字节。此字段将进行模板化。
python_named_params (dict[str, str] | None) –
一个命名参数列表,用于包含 python wheel 任务的作业,例如:“python_named_params”:{“name”:“john doe”,“age”:“35”}。如果在 run-now 时指定,它将覆盖作业设置中指定的参数。此字段将进行模板化。
jar_params (list[str] | None) –
一个参数列表,用于包含 JAR 任务的作业,例如:“jar_params”:[“john doe”,“35”]。这些参数将作为命令行参数传递给 JAR 文件。如果在 run-now 时指定,它将覆盖作业设置中指定的参数。此字段的 json 表示(即 {“jar_params”:[“john doe”,“35”]})不能超过 10,000 字节。此字段将进行模板化。
spark_submit_params (list[str] | None) –
一个参数列表,用于包含 spark submit 任务的作业,例如:“spark_submit_params”:[”–class”,“org.apache.spark.examples.SparkPi”]。这些参数将作为命令行参数传递给 spark-submit 脚本。如果在 run-now 时指定,它将覆盖作业设置中指定的参数。此字段的 json 表示不能超过 10,000 字节。此字段将进行模板化。
idempotency_token (str | None) – 一个可选令牌,可用于保证作业运行请求的幂等性。如果已存在使用提供的令牌的运行,则该请求不会创建新的运行,而是返回现有运行的 ID。此令牌最多必须包含 64 个字符。
databricks_conn_id (str) – Databricks 连接的引用。默认情况下,通常会是
databricks_default
。要使用基于令牌的身份验证,请在连接的 extra 字段中提供键token
,并创建键host
并将host
字段留空。(模板化)polling_period_seconds (int) – 控制我们轮询此运行结果的速率。默认情况下,operator 每 30 秒轮询一次。
databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。其值必须大于或等于 1。
databricks_retry_delay (int) – 两次重试之间等待的秒数(可能是一个浮点数)。
databricks_retry_args (dict[Any, Any] | None) – 包含传递给
tenacity.Retrying
类参数的可选字典。do_xcom_push (bool) – 是否应将 run_id 和 run_page_url 推送到 xcom。
wait_for_termination (bool) – 是否应等待作业运行终止。默认为
True
。deferrable (bool) – 在可延迟模式下运行 operator。
repair_run (bool) – 如果 Databricks 运行失败,则修复它。
databricks_repair_reason_new_settings (dict[str, Any] | None) – 一个字典,包含用于修复运行的原因和 new_settings JSON 对象。默认为 None。 None 表示在所有情况下使用现有作业设置进行修复,否则检查 RunState 的 state_message 是否包含原因,并使用 Databricks 部分作业更新端点 (https://docs.databricks.com/api/workspace/jobs/update) 根据 new_settings 更新作业设置。如果没有任何匹配项,则不会触发修复。
cancel_previous_runs (bool) – 在提交新作业之前取消所有现有的正在运行的作业。
- template_fields: collections.abc.Sequence[str] = ('json', 'databricks_conn_id')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json-tpl',)[source]¶
- 目前
- class airflow.providers.databricks.operators.databricks.DatabricksSQLStatementsOperator(statement, warehouse_id, *, catalog=None, schema=None, parameters=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, databricks_retry_args=None, do_xcom_push=True, wait_for_termination=True, timeout=3600, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
使用 api/2.0/sql/statements/ API 端点向 Databricks 提交一个 Databricks SQL 语句。
请参阅:https://docs.databricks.com/api/workspace/statementexecution
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:DatabricksSQLStatementsOperator
- 参数:
statement (str) – 要执行的 SQL 语句。该语句可以选择参数化,请参阅 parameters。
warehouse_id (str) – 执行语句的仓库。
catalog (str | None) – 为语句执行设置默认目录,类似于 SQL 中的 USE CATALOG。
schema (str | None) – 为语句执行设置默认模式,类似于 SQL 中的 USE SCHEMA。
parameters (list[dict[str, Any]] | None) –
要传递到包含参数标记的 SQL 语句中的参数列表。
wait_for_termination (bool) – 我们是否应该等待语句执行终止。默认为
True
。databricks_conn_id (str) – Databricks 连接的引用。默认情况下,通常会是
databricks_default
。要使用基于令牌的身份验证,请在连接的 extra 字段中提供键token
,并创建键host
并将host
字段留空。(模板化)polling_period_seconds (int) – 控制我们轮询此语句结果的频率。默认情况下,Operator 将每 30 秒轮询一次。
databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。其值必须大于或等于 1。
databricks_retry_delay (int) – 两次重试之间等待的秒数(可能是一个浮点数)。
databricks_retry_args (dict[Any, Any] | None) – 包含传递给
tenacity.Retrying
类参数的可选字典。do_xcom_push (bool) – 是否应该将 statement_id 推送到 xcom。
timeout (float) – 执行 SQL 语句的 Airflow 任务的超时时间。默认值为 3600 秒。
deferrable (bool) – 在可延迟模式下运行 operator。
- template_fields: collections.abc.Sequence[str] = ('databricks_conn_id',)[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json-tpl',)[source]¶
- class airflow.providers.databricks.operators.databricks.DatabricksTaskBaseOperator(caller='DatabricksTaskBaseOperator', databricks_conn_id='databricks_default', databricks_task_key='', databricks_retry_args=None, databricks_retry_delay=1, databricks_retry_limit=3, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), existing_cluster_id='', job_cluster_key='', new_cluster=None, polling_period_seconds=5, wait_for_termination=True, workflow_run_metadata=None, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
,abc.ABC
作为 Databricks 作业任务或 Databricks 工作流内任务运行的操作器的基类。
- 参数:
caller (str) – 用于日志中的调用者 Operator 名称。
databricks_conn_id (str) – 要使用的 Airflow 连接名称。
databricks_task_key (str) – 一个可选的 `task_key`,用于 Databricks API 引用任务。默认情况下,这将设置为
dag_id + task_id
的哈希值。databricks_retry_args (dict[Any, Any] | None) – 包含传递给
tenacity.Retrying
类参数的可选字典。databricks_retry_delay (int) – 两次重试之间等待的秒数。
databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。
deferrable (bool) – 是否以可延迟模式运行 Operator。
existing_cluster_id (str) – 用于运行此任务的现有集群 ID。
job_cluster_key (str) – 作业集群的 key。
notebook_packages – 要安装在运行 Notebook 的集群上的 Python 库列表。
notebook_params – 一个字典,包含要作为可选参数传递给 Notebook 任务的键值对。
polling_period_seconds (int) – 控制我们轮询此 Notebook 作业运行结果的频率。
wait_for_termination (bool) – 是否应等待作业运行终止。默认为
True
。workflow_run_metadata (dict[str, Any] | None) – 工作流运行的元数据。当 Operator 在工作流中使用时,会用到此元数据。预期它是一个字典,包含工作流的 `run_id` 和 `conn_id`。
- class airflow.providers.databricks.operators.databricks.DatabricksNotebookOperator(notebook_path, source, databricks_conn_id='databricks_default', databricks_retry_args=None, databricks_retry_delay=1, databricks_retry_limit=3, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), existing_cluster_id='', job_cluster_key='', new_cluster=None, notebook_packages=None, notebook_params=None, polling_period_seconds=5, wait_for_termination=True, workflow_run_metadata=None, **kwargs)[来源]¶
Bases:
DatabricksTaskBaseOperator
使用 Airflow 操作器在 Databricks 上运行一个 Notebook。
DatabricksNotebookOperator 允许用户在 Databricks 上启动和监控笔记本作业运行,将其作为 Airflow 任务。它可以作为 DatabricksWorkflowTaskGroup 的一部分使用,以利用作业集群的优势,这使得用户可以在更便宜且可在任务之间共享的集群上运行其任务。
另请参阅
关于如何使用此操作符的更多信息,请参阅指南: DatabricksNotebookOperator
- 参数:
notebook_path (str) – Databricks 中笔记本的路径。
source (str) – 笔记本的可选位置类型。当设置为 WORKSPACE 时,将从本地 Databricks 工作区获取笔记本。当设置为 GIT 时,将从 git_source 中定义的 Git 仓库获取笔记本。如果该值为空,如果定义了 git_source,任务将使用 GIT;否则使用 WORKSPACE。更多信息请访问 https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
databricks_conn_id (str) – 要使用的 Airflow 连接名称。
databricks_retry_args (dict[Any, Any] | None) – 包含传递给
tenacity.Retrying
类参数的可选字典。databricks_retry_delay (int) – 两次重试之间等待的秒数。
databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。
deferrable (bool) – 是否以可延迟模式运行 Operator。
existing_cluster_id (str) – 用于运行此任务的现有集群 ID。
job_cluster_key (str) – 作业集群的 key。
notebook_packages (list[dict[str, Any]] | None) – 要安装在运行笔记本的集群上的 Python 库列表。
notebook_params (dict | None) – 要作为可选参数传递给笔记本任务的键值对字典。
polling_period_seconds (int) – 控制我们轮询此 Notebook 作业运行结果的频率。
wait_for_termination (bool) – 是否应等待作业运行终止。默认为
True
。workflow_run_metadata (dict | None) – 工作流运行的元数据。在工作流中使用操作符时使用。预计它是一个包含工作流的 run_id 和 conn_id 的字典。
- class airflow.providers.databricks.operators.databricks.DatabricksTaskOperator(task_config, databricks_conn_id='databricks_default', databricks_retry_args=None, databricks_retry_delay=1, databricks_retry_limit=3, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), existing_cluster_id='', job_cluster_key='', new_cluster=None, polling_period_seconds=5, wait_for_termination=True, workflow_run_metadata=None, **kwargs)[来源]¶
Bases:
DatabricksTaskBaseOperator
使用 Airflow 操作器在 Databricks 上运行一个任务。
DatabricksTaskOperator 允许用户在 Databricks 上启动和监控任务作业运行,将其作为 Airflow 任务。它可以作为 DatabricksWorkflowTaskGroup 的一部分使用,以利用作业集群的优势,这使得用户可以在更便宜且可在任务之间共享的集群上运行其任务。
另请参阅
关于如何使用此操作符的更多信息,请参阅指南: DatabricksTaskOperator
- 参数:
task_config (dict) – 要在 Databricks 上运行的任务配置。
databricks_conn_id (str) – 要使用的 Airflow 连接名称。
databricks_retry_args (dict[Any, Any] | None) – 包含传递给
tenacity.Retrying
类参数的可选字典。databricks_retry_delay (int) – 两次重试之间等待的秒数。
databricks_retry_limit (int) – 如果 Databricks 后端无法访问,重试的次数。
deferrable (bool) – 是否以可延迟模式运行 Operator。
existing_cluster_id (str) – 用于运行此任务的现有集群 ID。
job_cluster_key (str) – 作业集群的 key。
polling_period_seconds (int) – 控制我们轮询此 Notebook 作业运行结果的频率。
wait_for_termination (bool) – 是否应等待作业运行终止。默认为
True
。