DatabricksSqlOperator¶
使用 DatabricksSqlOperator
在 Databricks SQL 仓库或 Databricks 集群上执行 SQL。
使用操作符¶
操作符针对配置的仓库执行给定的 SQL 查询。唯一必需的参数是
sql
- 要执行的 SQL 查询。有 3 种指定 SQL 查询的方法包含 SQL 语句的简单字符串。
表示 SQL 语句的字符串列表。
包含 SQL 查询的文件的名称。文件必须具有
.sql
扩展名。每个查询应以;<new_line>
结尾
sql_warehouse_name
(要使用的 Databricks SQL 仓库的名称) 或http_path
(Databricks SQL 仓库或 Databricks 集群的 HTTP 路径) 之一。
其他参数是可选的,可以在类文档中找到。
示例¶
选择数据¶
以下是使用 DatabricksSqlOperator 从表中选择数据的示例用法
# Example of using the Databricks SQL Operator to select data.
select = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="select_data",
sql="select * from default.my_airflow_table",
)
将数据选择到文件中¶
以下是使用 DatabricksSqlOperator 从表中选择数据并将其存储在文件中的示例用法
# Example of using the Databricks SQL Operator to select data into a file with JSONL format.
select_into_file = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="select_data_into_file",
sql="select * from default.my_airflow_table",
output_path="/tmp/1.jsonl",
output_format="jsonl",
)
执行多个语句¶
以下是使用 DatabricksSqlOperator 执行多个 SQL 语句的示例用法
# Example of using the Databricks SQL Operator to perform multiple operations.
create = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="create_and_populate_table",
sql=[
"drop table if exists default.my_airflow_table",
"create table default.my_airflow_table(id int, v string)",
"insert into default.my_airflow_table values (1, 'test 1'), (2, 'test 2')",
],
)
从文件执行多个语句¶
以下是使用 DatabricksSqlOperator 从文件执行语句的示例用法
# Example of using the Databricks SQL Operator to select data.
# SQL statements should be in the file with name test.sql
create_file = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="create_and_populate_from_file",
sql="test.sql",
)
DatabricksSqlSensor¶
使用 DatabricksSqlSensor
为可通过 Databricks SQL 仓库或交互式集群访问的表运行传感器。
使用传感器¶
传感器执行用户提供的 SQL 语句。唯一必需的参数是
sql
- 要为传感器执行的 SQL 查询。sql_warehouse_name
(要使用的 Databricks SQL 仓库的名称) 或http_path
(Databricks SQL 仓库或 Databricks 集群的 HTTP 路径) 之一。
其他参数是可选的,可以在类文档中找到。
示例¶
配置要与传感器一起使用的 Databricks 连接。
# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"
使用 SQL 语句探测特定表
# Example of using the Databricks SQL Sensor to check existence of data in a table.
sql_sensor = DatabricksSqlSensor(
databricks_conn_id=connection_id,
sql_warehouse_name=sql_warehouse_name,
catalog="hive_metastore",
task_id="sql_sensor_task",
sql="select * from hive_metastore.temp.sample_table_3 limit 1",
timeout=60 * 2,
)
DatabricksPartitionSensor¶
传感器是一种特殊类型的操作符,旨在执行一项操作——等待某事发生。它可以是基于时间的,或者等待文件,或者外部事件,但它们所做的就是等待直到发生某些事情,然后成功,以便它们的下游任务可以运行。
对于 Databricks 分区传感器,我们检查分区及其相关值是否存在,如果不存在,则等待直到分区值到达。等待时间和检查间隔可以分别在超时和 poke_interval 参数中配置。
使用 DatabricksPartitionSensor
为可通过 Databricks SQL 仓库或交互式集群访问的表运行传感器。
使用传感器¶
传感器接受表名和分区名、来自用户的值,并生成 SQL 查询以检查指定的分区名、值是否存在于指定的表中。
必需的参数是
table_name
(用于分区检查的表名)。partitions
(要检查的分区名)。partition_operator
(分区的比较运算符,用于值范围或限制,例如 partition_name >= partition_value)。 支持Databricks 比较运算符。sql_warehouse_name
(要使用的 Databricks SQL 仓库的名称) 或http_path
(Databricks SQL 仓库或 Databricks 集群的 HTTP 路径) 之一。
其他参数是可选的,可以在类文档中找到。
示例¶
配置要与传感器一起使用的 Databricks 连接。
# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"
探测特定表是否存在数据/分区
# Example of using the Databricks Partition Sensor to check the presence
# of the specified partition(s) in a table.
partition_sensor = DatabricksPartitionSensor(
databricks_conn_id=connection_id,
sql_warehouse_name=sql_warehouse_name,
catalog="hive_metastore",
task_id="partition_sensor_task",
table_name="sample_table_2",
schema="temp",
partitions={"date": "2023-01-03", "name": ["abc", "def"]},
partition_operator="=",
timeout=60 * 2,
)