Google Cloud SQL 操作器

前提任务

要使用这些操作器,您必须执行以下几项操作

CloudSQLCreateInstanceDatabaseOperator

在 Cloud SQL 实例中创建新的数据库。

有关参数定义,请参阅 CloudSQLCreateInstanceDatabaseOperator

使用操作器

您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_db_create_task = CloudSQLCreateInstanceDatabaseOperator(
    body=db_create_body, instance=INSTANCE_NAME, task_id="sql_db_create_task"
)

示例请求体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

db_create_body = {"instance": INSTANCE_NAME, "name": DB_NAME, "project": PROJECT_ID}

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何在实例中创建新数据库

CloudSQLDeleteInstanceDatabaseOperator

从 Cloud SQL 实例中删除数据库。

有关参数定义,请参阅 CloudSQLDeleteInstanceDatabaseOperator

使用操作器

您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_delete_task",
    trigger_rule=TriggerRule.ALL_DONE,
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何删除数据库

CloudSQLPatchInstanceDatabaseOperator

使用 patch 语义更新包含 Cloud SQL 实例中数据库信息的资源。请参阅:https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

有关参数定义,请参阅 CloudSQLPatchInstanceDatabaseOperator

使用操作器

您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_db_patch_task = CloudSQLPatchInstanceDatabaseOperator(
    body=db_patch_body,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_patch_task",
)

示例请求体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

db_patch_body = {"charset": "utf16", "collation": "utf16_general_ci"}

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何更新数据库

CloudSQLDeleteInstanceOperator

在 Google Cloud 中删除 Cloud SQL 实例。

它也用于删除只读和故障转移副本。

有关参数定义,请参阅 CloudSQLDeleteInstanceOperator

使用操作器

您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
    instance=INSTANCE_NAME, task_id="sql_instance_delete_task", trigger_rule=TriggerRule.ALL_DONE
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何删除 SQL 实例

CloudSQLExportInstanceOperator

将 Cloud SQL 实例中的数据导出到 Cloud Storage 存储桶中,格式为 SQL 转储或 CSV 文件。

注意

此操作器是幂等的。如果使用相同的导出文件 URI 多次执行,GCS 中的导出文件将被简单覆盖。

有关参数定义,请参阅 CloudSQLExportInstanceOperator

参数

定义导出操作的示例请求体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

export_body = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}
export_body_deferrable = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI_DEFERRABLE,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}

使用操作器

您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_export_task = CloudSQLExportInstanceOperator(
    body=export_body, instance=INSTANCE_NAME, task_id="sql_export_task"
)

此外,对于所有这些操作,您可以在可延迟模式下使用操作器

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_export_def_task = CloudSQLExportInstanceOperator(
    body=export_body_deferrable,
    instance=INSTANCE_NAME,
    task_id="sql_export_def_task",
    deferrable=True,
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何导出数据

故障排除

如果您在 Google Cloud 中收到“Unauthorized”(未授权)错误,请确保 Cloud SQL 实例的服务帐号已获得向所选 GCS 存储桶写入数据的授权。

与 GCS 通信的不是在 Airflow 中配置的服务帐号,而是特定 Cloud SQL 实例的服务帐号。

要授予服务帐号对 GCS 存储桶的相应 WRITE 权限,您可以使用 GCSBucketCreateAclEntryOperator,如示例所示

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_gcp_add_bucket_permission_task = GCSBucketCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="WRITER",
    bucket=file_url_split[1],  # netloc (bucket)
    task_id="sql_gcp_add_bucket_permission_task",
)

CloudSQLImportInstanceOperator

将 Cloud Storage 中的 SQL 转储或 CSV 文件导入 Cloud SQL 实例。

CSV 导入:

对于 CSV 导入,此操作器不是幂等的。如果多次导入同一个文件,导入的数据将在数据库中重复。此外,如果存在任何唯一约束,重复导入可能会导致错误。

SQL 导入:

如果 SQL 文件也是由 Cloud SQL 导出的,则此操作器对于 SQL 导入是幂等的。导出的 SQL 包含所有要导入的表的 'DROP TABLE IF EXISTS' 语句。

如果导入文件是以不同方式生成的,则不保证幂等性。这必须在 SQL 文件级别上确保。

有关参数定义,请参阅 CloudSQLImportInstanceOperator

参数

定义导入操作的示例请求体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

import_body = {"importContext": {"fileType": "sql", "uri": FILE_URI}}

使用操作器

您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_import_task = CloudSQLImportInstanceOperator(
    body=import_body, instance=INSTANCE_NAME, task_id="sql_import_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何导入数据

故障排除

如果您在 Google Cloud 中收到“Unauthorized”(未授权)错误,请确保 Cloud SQL 实例的服务帐号已获得从所选 GCS 对象读取数据的授权。

与 GCS 通信的不是在 Airflow 中配置的服务帐号,而是特定 Cloud SQL 实例的服务帐号。

要授予服务帐号对 GCS 对象的相应 READ 权限,您可以使用 GCSBucketCreateAclEntryOperator,如示例所示

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_gcp_add_object_permission_task = GCSObjectCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="READER",
    bucket=file_url_split[1],  # netloc (bucket)
    object_name=file_url_split[2][1:],  # path (strip first '/')
    task_id="sql_gcp_add_object_permission_task",
)

CloudSQLCreateInstanceOperator

在 Google Cloud 中创建新的 Cloud SQL 实例。

它也用于创建只读副本。

有关参数定义,请参阅 CloudSQLCreateInstanceOperator

如果存在同名实例,将不执行任何操作,并且操作器将成功。

参数

定义带故障转移副本的实例的示例请求体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

body = {
    "name": INSTANCE_NAME,
    "settings": {
        "tier": "db-n1-standard-1",
        "backupConfiguration": {"binaryLogEnabled": True, "enabled": True, "startTime": "05:00"},
        "activationPolicy": "ALWAYS",
        "dataDiskSizeGb": 30,
        "dataDiskType": "PD_SSD",
        "databaseFlags": [],
        "ipConfiguration": {
            "ipv4Enabled": True,
            "requireSsl": True,
        },
        "locationPreference": {"zone": "europe-west4-a"},
        "maintenanceWindow": {"hour": 5, "day": 7, "updateTrack": "canary"},
        "pricingPlan": "PER_USE",
        "storageAutoResize": True,
        "storageAutoResizeLimit": 0,
        "userLabels": {"my-key": "my-value"},
    },
    "databaseVersion": "MYSQL_5_7",
    "region": "europe-west4",
}

使用操作器

您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_instance_create_task = CloudSQLCreateInstanceOperator(
    body=body, instance=INSTANCE_NAME, task_id="sql_instance_create_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何创建实例

CloudSQLInstancePatchOperator

更新 Google Cloud 中 Cloud SQL 实例的设置(部分更新)。

有关参数定义,请参阅 CloudSQLInstancePatchOperator

这是一个部分更新,因此只会设置/更新请求体中指定的设置值。现有实例的其余配置将保持不变。

参数

定义实例的示例请求体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

patch_body = {
    "name": INSTANCE_NAME,
    "settings": {
        "dataDiskSizeGb": 35,
        "maintenanceWindow": {"hour": 3, "day": 6, "updateTrack": "canary"},
        "userLabels": {"my-key-patch": "my-value-patch"},
    },
}

使用操作器

您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_instance_patch_task = CloudSQLInstancePatchOperator(
    body=patch_body, instance=INSTANCE_NAME, task_id="sql_instance_patch_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何打补丁更新实例

CloudSQLCloneInstanceOperator

克隆 Cloud SQL 实例。

有关参数定义,请参阅 CloudSQLCloneInstanceOperator

参数

有关 clone_context 对象属性,请参阅 CloneContext

使用操作器

您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_instance_clone = CloudSQLCloneInstanceOperator(
    instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "destination_instance_name",
    "gcp_conn_id",
    "api_version",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何克隆实例

CloudSQLExecuteQueryOperator

在 Google Cloud SQL 实例中执行 DDL 或 DML SQL 查询。不支持 DQL(从 Google Cloud SQL 中检索数据)。您可以运行 SELECT 查询,但这些查询的结果将被丢弃。

您可以指定各种连接方法来连接正在运行的实例,从公共 IP 明文连接,到使用 SSL 的公共 IP,或通过 Cloud SQL Proxy 进行 TCP 和套接字连接。代理会根据操作器的需要动态下载、启动/停止。

有一种 gcpcloudsql://* 连接类型,您应该使用它来定义操作器要使用的连接类型。此连接是一种“元”连接。它本身不用于建立实际连接,而是确定 CloudSQLDatabaseHook 是否应启动 Cloud SQL Proxy,以及应动态创建何种数据库连接(Postgres 或 MySQL)以通过公共 IP 地址或代理连接到 Cloud SQL。CloudSqlDatabaseHook 使用 CloudSqlProxyRunner 来管理 Cloud SQL Proxy 的生命周期(每个任务都有自己的 Cloud SQL Proxy)

构建连接时,您应使用 CloudSQLDatabaseHook 中描述的连接参数。下方显示了所有可能的连接类型的连接示例。此类连接可在不同任务(CloudSqlQueryOperator 的实例)之间重用。如果需要,每个任务都会启动自己的代理,并使用自己的 TCP 或 UNIX 套接字。

有关参数定义,请参阅 CloudSQLExecuteQueryOperator

由于查询操作器可以运行任意查询,因此不能保证其幂等性。SQL 查询设计者应将查询设计为幂等。例如,Postgres 和 MySQL 都支持 CREATE TABLE IF NOT EXISTS 语句,可用于以幂等方式创建表。

参数

如果您通过环境变量中定义的 AIRFLOW_CONN_{CONN_ID} URL 定义连接,请确保 URL 中的 URL 组件经过 URL 编码。有关详细信息,请参阅以下示例。

请注意,在使用 SSL 连接的情况下,您需要有一种机制来使证书/密钥文件在预定义的位置可用于操作器可以运行的所有工作进程。例如,可以通过在所有工作进程的同一路径下挂载类似 NFS 的卷来提供此功能。

所有非 SSL 连接的示例连接定义。请注意,连接 URI 的所有组件都应进行 URL 编码

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py

# Connect via proxy over TCP
CONNECTION_PROXY_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_use_tcp": "True",
    },
}

# Connect via proxy over UNIX socket (specific proxy version)
CONNECTION_PROXY_SOCKET_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_version": "v1.33.9",
        "sql_proxy_use_tcp": "False",
    },
}

# Connect directly via TCP (non-SSL)
CONNECTION_PUBLIC_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "False",
    },
}

所有启用 SSL 的连接的类似连接定义

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py

# Connect directly via TCP (SSL)
CONNECTION_PUBLIC_TCP_SSL_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "True",
    },
}

还可以通过环境变量配置连接(请注意,如果您使用标准的 AIRFLOW 表示法通过环境变量定义连接,则操作器中的连接 ID 与 AIRFLOW_CONN_{CONN_ID} 的后缀大写字母匹配)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect via proxy over TCP
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=True".format(**postgres_kwargs)
)

# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect via proxy over TCP (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=True".format(**mysql_kwargs)
)

# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
os.environ["AIRFLOW_CONN_PROXY_MYSQL_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

# Special case: MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql
# Proxy binary path AND with missing project_id
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL_NO_PROJECT_ID"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

使用操作器

下面的示例操作器使用了先前准备好的连接。它可能是来自 Airflow 数据库的 connection_id,或者通过环境变量配置的连接(请注意,如果您使用标准的 AIRFLOW 表示法通过环境变量定义连接,则操作器中的连接 ID 与 AIRFLOW_CONN_{CONN_ID} 的后缀大写字母匹配)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py

                query_task = CloudSQLExecuteQueryOperator(
                    gcp_cloudsql_conn_id=connection_id,
                    task_id=task_id,
                    sql=SQL,
                )

SSL 设置也可以在操作器级别指定。在这种情况下,连接中配置的 SSL 设置将被覆盖。其中一种方法是指定每个证书文件的路径,如下所示。请注意,出于安全原因,这些文件将被复制到具有最低所需权限的临时位置。

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py

        query_task = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_client_cert=ssl_cert_path,
            ssl_server_cert=ssl_server_cert_path,
            ssl_client_key=ssl_key_path,
        )

您还可以将您的 SSL 证书保存到 Google Cloud Secret Manager 中,并提供一个 Secret ID。Secret 格式如下:.. code-block:: python

{“sslcert”: “”, “sslkey”: “”, “sslrootcert”: “”}

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py

        query_task_secret = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_secret_id=secret_id,
        )

模板化

template_fields: Sequence[str] = (
    "sql",
    "gcp_cloudsql_conn_id",
    "gcp_conn_id",
    "ssl_server_cert",
    "ssl_client_cert",
    "ssl_client_key",
    "ssl_secret_id",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}

更多信息

请参阅 Google Cloud SQL 文档,了解 MySQLPostgreSQL 相关的代理。

参考

更多信息请参阅

此条目有帮助吗?