airflow.providers.mongo.hooks.mongo¶
MongoDB 的 Hook。
类¶
用于与 MongoDB 交互的 PyMongo 封装器。 |
模块内容¶
- class airflow.providers.mongo.hooks.mongo.MongoHook(mongo_conn_id=default_conn_name, *args, **kwargs)[source]¶
基类:
airflow.hooks.base.BaseHook
用于与 MongoDB 交互的 PyMongo 封装器。
Mongo 连接文档 https://docs.mongodb.com/manual/reference/connection-string/index.html 您可以在连接的 extra 字段中指定连接字符串选项 https://docs.mongodb.com/manual/reference/connection-string/index.html#connection-string-options
如果您想使用 DNS seedlist,请将 srv 设置为 True。
- 例如:
{“srv”: true, “replicaSet”: “test”, “ssl”: true, “connectTimeoutMS”: 30000}
要启用 SSL,可以在连接字符串选项(extra 下)中使用 “ssl”: true 选项。在启用 SSL 的情况下,除非指定,否则连接中默认不包含 allow_insecure 选项。这样做是为了确保在处理与 MongoDB 的连接时使用安全介质。
allow_insecure 仅在 ssl 上下文中有效,并且是可配置的,可以在以下场景之一中使用
HTTP (ssl = False) 在此情况下,ssl 被禁用,使用 allow_insecure 没有意义。连接 extra 示例:{“ssl”: false}
HTTPS(不安全)(ssl = True, allow_insecure = True) 在此情况下,ssl 被启用,并且连接允许不安全的连接。连接 extra 示例:{“ssl”: true, “allow_insecure”: true}
HTTPS(安全)(ssl = True, allow_insecure = False - 启用 SSL 时的默认值):在此情况下,ssl 被启用,并且连接不允许不安全的连接(启用 SSL 时的默认行为)。连接 extra 示例:{“ssl”: true} 或 {“ssl”: true, “allow_insecure”: false}
注意:tls 是 ssl 的别名,可以代替 ssl 使用。示例:{“ssl”: false} 或 {“tls”: false}。
- 参数:
mongo_conn_id (str) – 连接到 MongoDB 时使用的 Mongo 连接 ID。
- get_collection(mongo_collection, mongo_db=None)[source]¶
获取一个用于查询的 mongo 集合对象。
除非另行指定,否则使用连接模式作为数据库。
- aggregate(mongo_collection, aggregate_query, mongo_db=None, **kwargs)[source]¶
运行聚合管道并返回结果。
https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.aggregate https://pymongo.readthedocs.io/en/stable/examples/aggregation.html
- find(mongo_collection: str, query: dict, find_one: typing_extensions.Literal[False], mongo_db: str | None = None, projection: list | dict | None = None, **kwargs) pymongo.cursor.Cursor [source]¶
- find(mongo_collection: str, query: dict, find_one: typing_extensions.Literal[True], mongo_db: str | None = None, projection: list | dict | None = None, **kwargs) Any | None
运行 mongo find 查询并返回结果。
- update_one(mongo_collection, filter_doc, update_doc, mongo_db=None, **kwargs)[source]¶
更新 mongo 集合中的单个文档。
- update_many(mongo_collection, filter_doc, update_doc, mongo_db=None, **kwargs)[source]¶
更新 mongo 集合中的一个或多个文档。
- replace_one(mongo_collection, doc, filter_doc=None, mongo_db=None, **kwargs)[source]¶
替换 mongo 集合中的单个文档。
注意
如果没有给出
filter_doc
,则假定替换文档包含_id
字段,该字段将用作过滤器。
- replace_many(mongo_collection, docs, filter_docs=None, mongo_db=None, upsert=False, collation=None, **kwargs)[source]¶
替换 mongo 集合中的多个文档。
使用包含多个 ReplaceOne 操作的 bulk_write https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write
注意
如果没有给出
filter_docs
,则假定所有替换文档都包含_id
字段,这些字段将用作过滤器。- 参数:
mongo_collection (str) – 要更新的集合的名称。
filter_docs (list[dict] | None) – 匹配要替换的文档的查询列表。可以省略;在这种情况下,将使用 airflow.docs 中的 _id 字段。
mongo_db (str | None) – 要使用的数据库的名称。可以省略;在这种情况下,将使用连接字符串中的数据库。
upsert (bool) – 如果为
True
,则如果没有文档与替换操作的过滤器匹配,则执行插入。collation (pymongo.collation.Collation | None) –
Collation
的实例。此选项仅在 MongoDB 3.4 及更高版本上受支持。