airflow.providers.influxdb.hooks.influxdb

此模块允许连接到 InfluxDB 数据库。

InfluxDBHook

与 InfluxDB 交互。

模块内容

class airflow.providers.influxdb.hooks.influxdb.InfluxDBHook(conn_id=default_conn_name, *args, **kwargs)[source]

基类: airflow.providers.common.compat.sdk.BaseHook

与 InfluxDB 交互。

执行与 InfluxDB 的连接并获取客户端。

参数:

influxdb_conn_id – 引用到 InfluxDB 连接 ID。

conn_name_attr = 'influxdb_conn_id'[source]
default_conn_name = 'influxdb_default'[source]
conn_type = 'influxdb'[source]
hook_name = 'Influxdb'[source]
influxdb_conn_id = 'influxdb_default'[source]
connection[source]
client: influxdb_client.InfluxDBClient | None = None[source]
extras: dict[source]
uri = None[source]
classmethod get_connection_form_widgets()[source]

返回用于在连接表单中添加的组件。

get_client(uri, kwargs)[source]
get_uri(conn)[source]

根据 InfluxDB 主机要求向 URI 添加额外参数。

get_conn()[source]

使用 token 和组织名称初始化新的 InfluxDB 连接。

query(query)[source]

执行查询。

注意:查询中应包含 bucket 名称。

参数:

query – InfluxDB 查询

返回:

列表

返回类型:

list[influxdb_client.client.flux_table.FluxTable]

query_to_df(query)[source]

执行查询并返回 pandas DataFrame。

注意:查询中应包含 bucket 名称。

参数:

query – InfluxDB 查询

返回:

pd.DataFrame

返回类型:

pandas.DataFrame

write(bucket_name, point_name, tag_name, tag_value, field_name, field_value, synchronous=False)[source]

向指定的 bucket 写入 Point。

示例: Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)

create_organization(name)[source]

创建一个新组织。

delete_organization(org_id)[source]

按 ID 删除组织。

create_bucket(bucket_name, description, org_id, retention_rules=None)[source]

为组织创建 bucket。

find_bucket_id_by_name(bucket_name)[source]

按名称获取 bucket ID。

delete_bucket(bucket_name)[source]

按名称删除 bucket。

此条目是否有帮助?