Google Cloud SQL 操作器¶
前提任务¶
要使用这些操作器,您必须执行以下几项操作
使用 Cloud 控制台 选择或创建一个 Cloud Platform 项目。
如 Google Cloud 文档 中所述,为您的项目启用结算功能。
如 Cloud 控制台文档 中所述,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'详细信息请参阅 安装。
CloudSQLCreateInstanceDatabaseOperator¶
在 Cloud SQL 实例中创建新的数据库。
有关参数定义,请参阅 CloudSQLCreateInstanceDatabaseOperator
。
使用操作器¶
您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_db_create_task = CloudSQLCreateInstanceDatabaseOperator(
body=db_create_body, instance=INSTANCE_NAME, task_id="sql_db_create_task"
)
示例请求体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
db_create_body = {"instance": INSTANCE_NAME, "name": DB_NAME, "project": PROJECT_ID}
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud SQL API 文档,了解 如何在实例中创建新数据库。
CloudSQLDeleteInstanceDatabaseOperator¶
从 Cloud SQL 实例中删除数据库。
有关参数定义,请参阅 CloudSQLDeleteInstanceDatabaseOperator
。
使用操作器¶
您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
instance=INSTANCE_NAME,
database=DB_NAME,
task_id="sql_db_delete_task",
trigger_rule=TriggerRule.ALL_DONE,
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"database",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud SQL API 文档,了解 如何删除数据库。
CloudSQLPatchInstanceDatabaseOperator¶
使用 patch 语义更新包含 Cloud SQL 实例中数据库信息的资源。请参阅:https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
有关参数定义,请参阅 CloudSQLPatchInstanceDatabaseOperator
。
使用操作器¶
您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_db_patch_task = CloudSQLPatchInstanceDatabaseOperator(
body=db_patch_body,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id="sql_db_patch_task",
)
示例请求体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
db_patch_body = {"charset": "utf16", "collation": "utf16_general_ci"}
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"database",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud SQL API 文档,了解 如何更新数据库。
CloudSQLDeleteInstanceOperator¶
在 Google Cloud 中删除 Cloud SQL 实例。
它也用于删除只读和故障转移副本。
有关参数定义,请参阅 CloudSQLDeleteInstanceOperator
。
使用操作器¶
您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
instance=INSTANCE_NAME, task_id="sql_instance_delete_task", trigger_rule=TriggerRule.ALL_DONE
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud SQL API 文档,了解 如何删除 SQL 实例。
CloudSQLExportInstanceOperator¶
将 Cloud SQL 实例中的数据导出到 Cloud Storage 存储桶中,格式为 SQL 转储或 CSV 文件。
注意
此操作器是幂等的。如果使用相同的导出文件 URI 多次执行,GCS 中的导出文件将被简单覆盖。
有关参数定义,请参阅 CloudSQLExportInstanceOperator
。
参数¶
定义导出操作的示例请求体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
export_body = {
"exportContext": {
"fileType": "sql",
"uri": FILE_URI,
"sqlExportOptions": {"schemaOnly": False},
"offload": True,
}
}
export_body_deferrable = {
"exportContext": {
"fileType": "sql",
"uri": FILE_URI_DEFERRABLE,
"sqlExportOptions": {"schemaOnly": False},
"offload": True,
}
}
使用操作器¶
您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_export_task = CloudSQLExportInstanceOperator(
body=export_body, instance=INSTANCE_NAME, task_id="sql_export_task"
)
此外,对于所有这些操作,您可以在可延迟模式下使用操作器
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_export_def_task = CloudSQLExportInstanceOperator(
body=export_body_deferrable,
instance=INSTANCE_NAME,
task_id="sql_export_def_task",
deferrable=True,
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud SQL API 文档,了解 如何导出数据。
故障排除¶
如果您在 Google Cloud 中收到“Unauthorized”(未授权)错误,请确保 Cloud SQL 实例的服务帐号已获得向所选 GCS 存储桶写入数据的授权。
与 GCS 通信的不是在 Airflow 中配置的服务帐号,而是特定 Cloud SQL 实例的服务帐号。
要授予服务帐号对 GCS 存储桶的相应 WRITE 权限,您可以使用 GCSBucketCreateAclEntryOperator
,如示例所示
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_gcp_add_bucket_permission_task = GCSBucketCreateAclEntryOperator(
entity=f"user-{service_account_email}",
role="WRITER",
bucket=file_url_split[1], # netloc (bucket)
task_id="sql_gcp_add_bucket_permission_task",
)
CloudSQLImportInstanceOperator¶
将 Cloud Storage 中的 SQL 转储或 CSV 文件导入 Cloud SQL 实例。
CSV 导入:¶
对于 CSV 导入,此操作器不是幂等的。如果多次导入同一个文件,导入的数据将在数据库中重复。此外,如果存在任何唯一约束,重复导入可能会导致错误。
SQL 导入:¶
如果 SQL 文件也是由 Cloud SQL 导出的,则此操作器对于 SQL 导入是幂等的。导出的 SQL 包含所有要导入的表的 'DROP TABLE IF EXISTS' 语句。
如果导入文件是以不同方式生成的,则不保证幂等性。这必须在 SQL 文件级别上确保。
有关参数定义,请参阅 CloudSQLImportInstanceOperator
。
参数¶
定义导入操作的示例请求体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
import_body = {"importContext": {"fileType": "sql", "uri": FILE_URI}}
使用操作器¶
您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_import_task = CloudSQLImportInstanceOperator(
body=import_body, instance=INSTANCE_NAME, task_id="sql_import_task"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud SQL API 文档,了解 如何导入数据。
故障排除¶
如果您在 Google Cloud 中收到“Unauthorized”(未授权)错误,请确保 Cloud SQL 实例的服务帐号已获得从所选 GCS 对象读取数据的授权。
与 GCS 通信的不是在 Airflow 中配置的服务帐号,而是特定 Cloud SQL 实例的服务帐号。
要授予服务帐号对 GCS 对象的相应 READ 权限,您可以使用 GCSBucketCreateAclEntryOperator
,如示例所示
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_gcp_add_object_permission_task = GCSObjectCreateAclEntryOperator(
entity=f"user-{service_account_email}",
role="READER",
bucket=file_url_split[1], # netloc (bucket)
object_name=file_url_split[2][1:], # path (strip first '/')
task_id="sql_gcp_add_object_permission_task",
)
CloudSQLCreateInstanceOperator¶
在 Google Cloud 中创建新的 Cloud SQL 实例。
它也用于创建只读副本。
有关参数定义,请参阅 CloudSQLCreateInstanceOperator
。
如果存在同名实例,将不执行任何操作,并且操作器将成功。
参数¶
定义带故障转移副本的实例的示例请求体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
body = {
"name": INSTANCE_NAME,
"settings": {
"tier": "db-n1-standard-1",
"backupConfiguration": {"binaryLogEnabled": True, "enabled": True, "startTime": "05:00"},
"activationPolicy": "ALWAYS",
"dataDiskSizeGb": 30,
"dataDiskType": "PD_SSD",
"databaseFlags": [],
"ipConfiguration": {
"ipv4Enabled": True,
"requireSsl": True,
},
"locationPreference": {"zone": "europe-west4-a"},
"maintenanceWindow": {"hour": 5, "day": 7, "updateTrack": "canary"},
"pricingPlan": "PER_USE",
"storageAutoResize": True,
"storageAutoResizeLimit": 0,
"userLabels": {"my-key": "my-value"},
},
"databaseVersion": "MYSQL_5_7",
"region": "europe-west4",
}
使用操作器¶
您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_instance_create_task = CloudSQLCreateInstanceOperator(
body=body, instance=INSTANCE_NAME, task_id="sql_instance_create_task"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud SQL API 文档,了解 如何创建实例。
CloudSQLInstancePatchOperator¶
更新 Google Cloud 中 Cloud SQL 实例的设置(部分更新)。
有关参数定义,请参阅 CloudSQLInstancePatchOperator
。
这是一个部分更新,因此只会设置/更新请求体中指定的设置值。现有实例的其余配置将保持不变。
参数¶
定义实例的示例请求体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
patch_body = {
"name": INSTANCE_NAME,
"settings": {
"dataDiskSizeGb": 35,
"maintenanceWindow": {"hour": 3, "day": 6, "updateTrack": "canary"},
"userLabels": {"my-key-patch": "my-value-patch"},
},
}
使用操作器¶
您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_instance_patch_task = CloudSQLInstancePatchOperator(
body=patch_body, instance=INSTANCE_NAME, task_id="sql_instance_patch_task"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud SQL API 文档,了解 如何打补丁更新实例。
CloudSQLCloneInstanceOperator¶
克隆 Cloud SQL 实例。
有关参数定义,请参阅 CloudSQLCloneInstanceOperator
。
参数¶
有关 clone_context
对象属性,请参阅 CloneContext
使用操作器¶
您可以创建带或不带项目 ID 的操作器。如果缺少项目 ID,则会从使用的 Google Cloud 连接中检索。此处显示了两种变体
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_instance_clone = CloudSQLCloneInstanceOperator(
instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"destination_instance_name",
"gcp_conn_id",
"api_version",
)
更多信息¶
请参阅 Google Cloud SQL API 文档,了解 如何克隆实例。
CloudSQLExecuteQueryOperator¶
在 Google Cloud SQL 实例中执行 DDL 或 DML SQL 查询。不支持 DQL(从 Google Cloud SQL 中检索数据)。您可以运行 SELECT 查询,但这些查询的结果将被丢弃。
您可以指定各种连接方法来连接正在运行的实例,从公共 IP 明文连接,到使用 SSL 的公共 IP,或通过 Cloud SQL Proxy 进行 TCP 和套接字连接。代理会根据操作器的需要动态下载、启动/停止。
有一种 gcpcloudsql://*
连接类型,您应该使用它来定义操作器要使用的连接类型。此连接是一种“元”连接。它本身不用于建立实际连接,而是确定 CloudSQLDatabaseHook
是否应启动 Cloud SQL Proxy,以及应动态创建何种数据库连接(Postgres 或 MySQL)以通过公共 IP 地址或代理连接到 Cloud SQL。CloudSqlDatabaseHook
使用 CloudSqlProxyRunner
来管理 Cloud SQL Proxy 的生命周期(每个任务都有自己的 Cloud SQL Proxy)
构建连接时,您应使用 CloudSQLDatabaseHook
中描述的连接参数。下方显示了所有可能的连接类型的连接示例。此类连接可在不同任务(CloudSqlQueryOperator
的实例)之间重用。如果需要,每个任务都会启动自己的代理,并使用自己的 TCP 或 UNIX 套接字。
有关参数定义,请参阅 CloudSQLExecuteQueryOperator
。
由于查询操作器可以运行任意查询,因此不能保证其幂等性。SQL 查询设计者应将查询设计为幂等。例如,Postgres 和 MySQL 都支持 CREATE TABLE IF NOT EXISTS 语句,可用于以幂等方式创建表。
参数¶
如果您通过环境变量中定义的 AIRFLOW_CONN_{CONN_ID}
URL 定义连接,请确保 URL 中的 URL 组件经过 URL 编码。有关详细信息,请参阅以下示例。
请注意,在使用 SSL 连接的情况下,您需要有一种机制来使证书/密钥文件在预定义的位置可用于操作器可以运行的所有工作进程。例如,可以通过在所有工作进程的同一路径下挂载类似 NFS 的卷来提供此功能。
所有非 SSL 连接的示例连接定义。请注意,连接 URI 的所有组件都应进行 URL 编码
tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py
# Connect via proxy over TCP
CONNECTION_PROXY_TCP_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "True",
"sql_proxy_use_tcp": "True",
},
}
# Connect via proxy over UNIX socket (specific proxy version)
CONNECTION_PROXY_SOCKET_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "True",
"sql_proxy_version": "v1.33.9",
"sql_proxy_use_tcp": "False",
},
}
# Connect directly via TCP (non-SSL)
CONNECTION_PUBLIC_TCP_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "False",
"use_ssl": "False",
},
}
所有启用 SSL 的连接的类似连接定义
tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py
# Connect directly via TCP (SSL)
CONNECTION_PUBLIC_TCP_SSL_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "False",
"use_ssl": "True",
},
}
还可以通过环境变量配置连接(请注意,如果您使用标准的 AIRFLOW 表示法通过环境变量定义连接,则操作器中的连接 ID 与 AIRFLOW_CONN_{CONN_ID}
的后缀大写字母匹配)
tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py
# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).
postgres_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# Postgres: connect via proxy over TCP
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_use_tcp=True".format(**postgres_kwargs)
)
# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_SOCKET"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_version=v1.13&"
"sql_proxy_use_tcp=False".format(**postgres_kwargs)
)
# Postgres: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=False".format(**postgres_kwargs)
)
# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**postgres_kwargs)
)
mysql_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# MySQL: connect via proxy over TCP (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_MYSQL_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_version=v1.13&"
"sql_proxy_use_tcp=True".format(**mysql_kwargs)
)
# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
os.environ["AIRFLOW_CONN_PROXY_MYSQL_SOCKET"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_use_tcp=False".format(**mysql_kwargs)
)
# MySQL: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=False".format(**mysql_kwargs)
)
# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
)
# Special case: MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql
# Proxy binary path AND with missing project_id
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL_NO_PROJECT_ID"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
)
tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py
# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).
postgres_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**postgres_kwargs)
)
mysql_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# MySQL: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
)
使用操作器¶
下面的示例操作器使用了先前准备好的连接。它可能是来自 Airflow 数据库的 connection_id,或者通过环境变量配置的连接(请注意,如果您使用标准的 AIRFLOW 表示法通过环境变量定义连接,则操作器中的连接 ID 与 AIRFLOW_CONN_{CONN_ID}
的后缀大写字母匹配)
tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py
query_task = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=connection_id,
task_id=task_id,
sql=SQL,
)
SSL 设置也可以在操作器级别指定。在这种情况下,连接中配置的 SSL 设置将被覆盖。其中一种方法是指定每个证书文件的路径,如下所示。请注意,出于安全原因,这些文件将被复制到具有最低所需权限的临时位置。
tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py
query_task = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=conn_id,
task_id=task_id,
sql=SQL,
ssl_client_cert=ssl_cert_path,
ssl_server_cert=ssl_server_cert_path,
ssl_client_key=ssl_key_path,
)
您还可以将您的 SSL 证书保存到 Google Cloud Secret Manager 中,并提供一个 Secret ID。Secret 格式如下:.. code-block:: python
{“sslcert”: “”, “sslkey”: “”, “sslrootcert”: “”}
tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py
query_task_secret = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=conn_id,
task_id=task_id,
sql=SQL,
ssl_secret_id=secret_id,
)
模板化¶
template_fields: Sequence[str] = (
"sql",
"gcp_cloudsql_conn_id",
"gcp_conn_id",
"ssl_server_cert",
"ssl_client_cert",
"ssl_client_key",
"ssl_secret_id",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}
更多信息¶
请参阅 Google Cloud SQL 文档,了解 MySQL 和 PostgreSQL 相关的代理。
参考¶
更多信息请参阅