airflow.providers.mongo.hooks.mongo

MongoDB 的 Hook。

MongoHook

用于与 MongoDB 交互的 PyMongo 封装器。

模块内容

class airflow.providers.mongo.hooks.mongo.MongoHook(mongo_conn_id=default_conn_name, *args, **kwargs)[source]

基类: airflow.hooks.base.BaseHook

用于与 MongoDB 交互的 PyMongo 封装器。

Mongo 连接文档 https://docs.mongodb.com/manual/reference/connection-string/index.html 您可以在连接的 extra 字段中指定连接字符串选项 https://docs.mongodb.com/manual/reference/connection-string/index.html#connection-string-options

如果您想使用 DNS seedlist,请将 srv 设置为 True。

例如:

{“srv”: true, “replicaSet”: “test”, “ssl”: true, “connectTimeoutMS”: 30000}

要启用 SSL,可以在连接字符串选项(extra 下)中使用 “ssl”: true 选项。在启用 SSL 的情况下,除非指定,否则连接中默认不包含 allow_insecure 选项。这样做是为了确保在处理与 MongoDB 的连接时使用安全介质。

allow_insecure 仅在 ssl 上下文中有效,并且是可配置的,可以在以下场景之一中使用

HTTP (ssl = False) 在此情况下,ssl 被禁用,使用 allow_insecure 没有意义。连接 extra 示例:{“ssl”: false}

HTTPS(不安全)(ssl = True, allow_insecure = True) 在此情况下,ssl 被启用,并且连接允许不安全的连接。连接 extra 示例:{“ssl”: true, “allow_insecure”: true}

HTTPS(安全)(ssl = True, allow_insecure = False - 启用 SSL 时的默认值):在此情况下,ssl 被启用,并且连接不允许不安全的连接(启用 SSL 时的默认行为)。连接 extra 示例:{“ssl”: true} 或 {“ssl”: true, “allow_insecure”: false}

注意:tlsssl 的别名,可以代替 ssl 使用。示例:{“ssl”: false} 或 {“tls”: false}。

参数:

mongo_conn_id (str) – 连接到 MongoDB 时使用的 Mongo 连接 ID

conn_name_attr = 'mongo_conn_id'[source]
default_conn_name = 'mongo_default'[source]
conn_type = 'mongo'[source]
hook_name = 'MongoDB'[source]
classmethod get_connection_form_widgets()[source]

返回要添加到连接表单的连接小部件。

classmethod get_ui_field_behaviour()[source]

返回自定义字段行为。

mongo_conn_id = 'mongo_default'[source]
connection[source]
extras[source]
client: pymongo.MongoClient | None = None[source]
uri[source]
allow_insecure = False[source]
ssl_enabled = False[source]
__enter__()[source]

创建上下文管理器时返回对象。

__exit__(exc_type, exc_val, exc_tb)[source]

退出上下文管理器时关闭 Mongo 连接。

get_conn()[source]

获取 PyMongo 客户端。

close()[source]
get_collection(mongo_collection, mongo_db=None)[source]

获取一个用于查询的 mongo 集合对象。

除非另行指定,否则使用连接模式作为数据库。

aggregate(mongo_collection, aggregate_query, mongo_db=None, **kwargs)[source]

运行聚合管道并返回结果。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.aggregate https://pymongo.readthedocs.io/en/stable/examples/aggregation.html

find(mongo_collection: str, query: dict, find_one: typing_extensions.Literal[False], mongo_db: str | None = None, projection: list | dict | None = None, **kwargs) pymongo.cursor.Cursor[source]
find(mongo_collection: str, query: dict, find_one: typing_extensions.Literal[True], mongo_db: str | None = None, projection: list | dict | None = None, **kwargs) Any | None

运行 mongo find 查询并返回结果。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find

insert_one(mongo_collection, doc, mongo_db=None, **kwargs)[source]

在 mongo 集合中插入一个文档。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_one

insert_many(mongo_collection, docs, mongo_db=None, **kwargs)[source]

在 mongo 集合中插入多个文档。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_many

update_one(mongo_collection, filter_doc, update_doc, mongo_db=None, **kwargs)[source]

更新 mongo 集合中的单个文档。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.update_one

参数:
  • mongo_collection (str) – 要更新的集合的名称。

  • filter_doc (dict) – 匹配要更新的文档的查询。

  • update_doc (dict) – 要应用的修改。

  • mongo_db (str | None) – 要使用的数据库的名称。可以省略;在这种情况下,将使用连接字符串中的数据库。

update_many(mongo_collection, filter_doc, update_doc, mongo_db=None, **kwargs)[source]

更新 mongo 集合中的一个或多个文档。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.update_many

参数:
  • mongo_collection (str) – 要更新的集合的名称。

  • filter_doc (dict) – 匹配要更新的文档的查询。

  • update_doc (dict) – 要应用的修改。

  • mongo_db (str | None) – 要使用的数据库的名称。可以省略;在这种情况下,将使用连接字符串中的数据库。

replace_one(mongo_collection, doc, filter_doc=None, mongo_db=None, **kwargs)[source]

替换 mongo 集合中的单个文档。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.replace_one

注意

如果没有给出 filter_doc,则假定替换文档包含 _id 字段,该字段将用作过滤器。

参数:
  • mongo_collection (str) – 要更新的集合的名称。

  • doc (dict) – 新文档。

  • filter_doc (dict | None) – 匹配要替换的文档的查询。可以省略;在这种情况下,将使用 doc 中的 _id 字段。

  • mongo_db (str | None) – 要使用的数据库的名称。可以省略;在这种情况下,将使用连接字符串中的数据库。

replace_many(mongo_collection, docs, filter_docs=None, mongo_db=None, upsert=False, collation=None, **kwargs)[source]

替换 mongo 集合中的多个文档。

使用包含多个 ReplaceOne 操作的 bulk_write https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write

注意

如果没有给出 filter_docs,则假定所有替换文档都包含 _id 字段,这些字段将用作过滤器。

参数:
  • mongo_collection (str) – 要更新的集合的名称。

  • docs (list[dict]) – 新文档。

  • filter_docs (list[dict] | None) – 匹配要替换的文档的查询列表。可以省略;在这种情况下,将使用 airflow.docs 中的 _id 字段。

  • mongo_db (str | None) – 要使用的数据库的名称。可以省略;在这种情况下,将使用连接字符串中的数据库。

  • upsert (bool) – 如果为 True,则如果没有文档与替换操作的过滤器匹配,则执行插入。

  • collation (pymongo.collation.Collation | None) – Collation 的实例。此选项仅在 MongoDB 3.4 及更高版本上受支持。

delete_one(mongo_collection, filter_doc, mongo_db=None, **kwargs)[source]

删除 mongo 集合中的单个文档。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.delete_one

参数:
  • mongo_collection (str) – 要从中删除文档的集合的名称。

  • filter_doc (dict) – 匹配要删除的文档的查询。

  • mongo_db (str | None) – 要使用的数据库的名称。可以省略;在这种情况下,将使用连接字符串中的数据库。

delete_many(mongo_collection, filter_doc, mongo_db=None, **kwargs)[source]

删除 mongo 集合中的一个或多个文档。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.delete_many

参数:
  • mongo_collection (str) – 要从中删除文档的集合的名称。

  • filter_doc (dict) – 匹配要删除的文档的查询。

  • mongo_db (str | None) – 要使用的数据库的名称。可以省略;在这种情况下,将使用连接字符串中的数据库。

distinct(mongo_collection, distinct_key, filter_doc=None, mongo_db=None, **kwargs)[source]

返回给定键在整个集合中的唯一值列表。

https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.distinct

参数:
  • mongo_collection (str) – 要执行 distinct 操作的集合的名称。

  • distinct_key (str) – 要返回唯一值的字段。

  • filter_doc (dict | None) – 匹配要获取唯一值的文档的查询。可以省略;在这种情况下,将覆盖整个集合。

  • mongo_db (str | None) – 要使用的数据库的名称。可以省略;在这种情况下,将使用连接字符串中的数据库。

此条目有帮助吗?