airflow.providers.salesforce.hooks.salesforce

连接到您的 Salesforce 实例,从其中检索数据,并将数据写入文件以供其他用途。

注意

此 hook 还依赖于 simple_salesforce 包:https://github.com/simple-salesforce/simple-salesforce

属性

log

SalesforceHook

创建到 Salesforce 的新连接,并允许您从 SFDC 中拉取数据并将其保存到文件。

模块内容

airflow.providers.salesforce.hooks.salesforce.log[source]
class airflow.providers.salesforce.hooks.salesforce.SalesforceHook(salesforce_conn_id=default_conn_name, session_id=None, session=None)[source]

Bases: airflow.hooks.base.BaseHook

创建到 Salesforce 的新连接,并允许您从 SFDC 中拉取数据并将其保存到文件。

然后您可以使用该文件与其他的 Airflow 操作器一起将数据移动到另一个数据源。

参数:
  • conn_id – 包含连接到 Salesforce 所需参数的连接名称。连接类型应为 Salesforce

  • session_id (str | None) – 特定 HTTP 请求会话的访问令牌。

  • session (requests.Session | None) – 自定义的 HTTP 请求会话。这允许使用 simple_salesforce 未暴露的 requests Session 功能。

注意

可以通过几种认证选项创建到 Salesforce 的连接

  • 密码: 提供用户名、密码和安全令牌

  • 直接会话: 提供 session_id 和实例或实例 URL

  • OAuth 2.0 JWT: 提供消费者密钥和私钥或私钥文件路径

  • IP 过滤: 提供用户名、密码和组织 ID

如果在沙盒环境,将域名值输入为“test”。

conn_name_attr = 'salesforce_conn_id'[source]
default_conn_name = 'salesforce_default'[source]
conn_type = 'salesforce'[source]
hook_name = 'Salesforce'[source]
conn_id = 'salesforce_default'[source]
session_id = None[source]
session = None[source]
classmethod get_connection_form_widgets()[source]

返回连接组件以添加到连接表单。

classmethod get_ui_field_behaviour()[source]

返回自定义字段行为。

property conn: simple_salesforce.api.Salesforce[source]

返回一个 Salesforce 实例。(已缓存)。

get_conn()[source]

返回一个 Salesforce 实例。(已缓存)。

make_query(query, include_deleted=False, query_params=None)[source]

向 Salesforce 执行查询。

参数:
  • query (str) – 向 Salesforce 执行的查询。

  • include_deleted (bool) – 如果查询应包含已删除的记录,则为 True。

  • query_params (dict | None) – 其他可选参数

返回:

查询结果。

返回类型:

dict

describe_object(obj)[source]

从 Salesforce 获取对象的描述。

此描述是对象的模式和 Salesforce 为每个对象存储的一些额外元数据。

参数:

obj (str) – 我们正在获取描述的 Salesforce 对象的名称。

返回:

Salesforce 对象的描述。

返回类型:

dict

get_available_fields(obj)[source]

获取对象所有可用字段的列表。

参数:

obj (str) – 我们正在获取描述的 Salesforce 对象的名称。

返回:

字段名称。

返回类型:

list[str]

get_object_from_salesforce(obj, fields)[source]

从 Salesforce 获取对象的所有实例。

对于每个模型,仅获取 fields 中指定的字段。

我们底层实际运行的是

SELECT <fields> FROM <obj>;

参数:
返回:

从 Salesforce 获取对象的所有实例。

返回类型:

dict

write_object_to_file(query_results, filename, fmt='csv', coerce_to_timestamp=False, record_time_added=False)[source]

将查询结果写入文件。

可接受的格式有
  • csv

    逗号分隔值文件。这是默认格式。

  • json

    JSON 数组。数组中的每个元素都是不同的一行。

  • ndjson

    JSON 数组,但每个元素是换行符分隔而不是像 json 那样逗号分隔

这需要大量的清理工作。Pandas 处理 CSV 和 json 输出的方式不统一。对于 datetime 类型尤其如此。Pandas 希望将它们在 CSV 中写入为字符串,但在 json 中写入为毫秒级 Unix 时间戳。

默认情况下,此函数将尝试保留 Salesforce 中所有值的原样表示。您可以使用 coerce_to_timestamp 标志强制所有 datetime 转换为 Unix 时间戳 (UTC)。这可能非常有益,因为它会使所有 datetime 字段看起来一致,并使在其他数据库环境中更容易使用

参数:
  • query_results (list[dict]) – SQL 查询结果

  • filename (str) – 数据应转储到的文件名称

  • fmt (str) – 您希望输出的格式。默认值: 'csv'

  • coerce_to_timestamp (bool) – 如果希望所有 datetime 字段转换为 Unix 时间戳,则为 True。如果希望它们保持与 Salesforce 中相同的格式,则为 False。将该值设置为 False 会导致 datetime 变为字符串。默认值: False

  • record_time_added (bool) – 如果希望在结果数据中添加一个 Unix 时间戳字段来标记从 Salesforce 获取数据的时间,则为 True。默认值: False

返回:

写入文件的 dataframe。

返回类型:

pandas.DataFrame

object_to_df(query_results, coerce_to_timestamp=False, record_time_added=False)[source]

将查询结果导出到 dataframe。

默认情况下,此函数将尝试保留 Salesforce 中所有值的原样表示。您可以使用 coerce_to_timestamp 标志强制所有 datetime 转换为 Unix 时间戳 (UTC)。这可能非常有益,因为它会使所有 datetime 字段看起来一致,并使在其他数据库环境中更容易使用

参数:
  • query_results (list[dict]) – SQL 查询结果

  • coerce_to_timestamp (bool) – 如果希望所有 datetime 字段转换为 Unix 时间戳,则为 True。如果希望它们保持与 Salesforce 中相同的格式,则为 False。将该值设置为 False 会导致 datetime 变为字符串。默认值: False

  • record_time_added (bool) – 如果希望在结果数据中添加一个 Unix 时间戳字段来标记从 Salesforce 获取数据的时间,则为 True。默认值: False

返回:

dataframe。

返回类型:

pandas.DataFrame

test_connection()[source]

测试 Salesforce 连接。

此条目有帮助吗?