Google Cloud SQL 操作符

先决条件任务

要使用这些操作符,您必须执行以下几项操作

CloudSQLCreateInstanceDatabaseOperator

在 Cloud SQL 实例中创建一个新的数据库。

有关参数定义,请查看 CloudSQLCreateInstanceDatabaseOperator

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_db_create_task = CloudSQLCreateInstanceDatabaseOperator(
    body=db_create_body, instance=INSTANCE_NAME, task_id="sql_db_create_task"
)

示例请求正文

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

db_create_body = {"instance": INSTANCE_NAME, "name": DB_NAME, "project": PROJECT_ID}

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

有关在实例中创建新数据库的信息,请参阅 Google Cloud SQL API 文档:创建新数据库

CloudSQLDeleteInstanceDatabaseOperator

从 Cloud SQL 实例中删除一个数据库。

有关参数定义,请查看 CloudSQLDeleteInstanceDatabaseOperator

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_delete_task",
    trigger_rule=TriggerRule.ALL_DONE,
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

有关删除数据库的信息,请参阅 Google Cloud SQL API 文档:删除数据库

CloudSQLPatchInstanceDatabaseOperator

使用补丁语义更新 Cloud SQL 实例中有关数据库的资源信息。请参阅:https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

有关参数定义,请查看 CloudSQLPatchInstanceDatabaseOperator

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_db_patch_task = CloudSQLPatchInstanceDatabaseOperator(
    body=db_patch_body,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_patch_task",
)

示例请求正文

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

db_patch_body = {"charset": "utf16", "collation": "utf16_general_ci"}

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

有关更新数据库的信息,请参阅 Google Cloud SQL API 文档:更新数据库

CloudSQLDeleteInstanceOperator

删除 Google Cloud 中的 Cloud SQL 实例。

它也用于删除读取副本和故障转移副本。

有关参数定义,请查看 CloudSQLDeleteInstanceOperator

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
    instance=INSTANCE_NAME, task_id="sql_instance_delete_task", trigger_rule=TriggerRule.ALL_DONE
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

有关删除 SQL 实例的信息,请参阅 Google Cloud SQL API 文档:删除 SQL 实例

CloudSQLExportInstanceOperator

将 Cloud SQL 实例中的数据导出到 Cloud Storage 存储桶,作为 SQL 转储或 CSV 文件。

注意

此操作符是幂等的。如果使用相同的导出文件 URI 执行多次,则 GCS 中的导出文件将被简单地覆盖。

有关参数定义,请查看 CloudSQLExportInstanceOperator

参数

定义导出操作的示例正文

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

export_body = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}
export_body_deferrable = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI_DEFERRABLE,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_export_task = CloudSQLExportInstanceOperator(
    body=export_body, instance=INSTANCE_NAME, task_id="sql_export_task"
)

对于所有这些操作,您也可以在可延迟模式下使用操作符

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_export_def_task = CloudSQLExportInstanceOperator(
    body=export_body_deferrable,
    instance=INSTANCE_NAME,
    task_id="sql_export_def_task",
    deferrable=True,
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

有关导出数据的信息,请参阅 Google Cloud SQL API 文档:导出数据

故障排除

如果您在 Google Cloud 中收到“未授权”错误,请确保 Cloud SQL 实例的服务帐户有权写入所选的 GCS 存储桶。

与 GCS 通信的不是 Airflow 中配置的服务帐户,而是特定 Cloud SQL 实例的服务帐户。

要授予服务帐户对 GCS 存储桶的适当 WRITE 权限,可以使用 GCSBucketCreateAclEntryOperator,如示例所示

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_gcp_add_bucket_permission_task = GCSBucketCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="WRITER",
    bucket=file_url_split[1],  # netloc (bucket)
    task_id="sql_gcp_add_bucket_permission_task",
)

CloudSQLImportInstanceOperator

将 Cloud Storage 中的 SQL 转储或 CSV 文件中的数据导入到 Cloud SQL 实例中。

CSV 导入:

对于 CSV 导入,此操作符不是幂等的。如果多次导入同一个文件,导入的数据将在数据库中重复。此外,如果存在任何唯一约束,重复导入可能会导致错误。

SQL 导入:

如果 SQL 导入也是由 Cloud SQL 导出的,则此操作符对于 SQL 导入是幂等的。导出的 SQL 包含所有要导入的表的“DROP TABLE IF EXISTS”语句。

如果导入文件是以不同的方式生成的,则不能保证幂等性。必须在 SQL 文件级别上确保幂等性。

有关参数定义,请查看 CloudSQLImportInstanceOperator

参数

定义导入操作的示例正文

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

import_body = {"importContext": {"fileType": "sql", "uri": FILE_URI}}

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_import_task = CloudSQLImportInstanceOperator(
    body=import_body, instance=INSTANCE_NAME, task_id="sql_import_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

有关导入数据的信息,请参阅 Google Cloud SQL API 文档:导入数据

故障排除

如果您在 Google Cloud 中收到“未授权”错误,请确保 Cloud SQL 实例的服务帐户有权从所选的 GCS 对象读取数据。

与 GCS 通信的不是 Airflow 中配置的服务帐户,而是特定 Cloud SQL 实例的服务帐户。

要授予服务帐户对 GCS 对象的适当 READ 权限,可以使用 GCSBucketCreateAclEntryOperator,如示例所示

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_gcp_add_object_permission_task = GCSObjectCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="READER",
    bucket=file_url_split[1],  # netloc (bucket)
    object_name=file_url_split[2][1:],  # path (strip first '/')
    task_id="sql_gcp_add_object_permission_task",
)

CloudSQLCreateInstanceOperator

在 Google Cloud 中创建一个新的 Cloud SQL 实例。

它也用于创建读取副本。

有关参数定义,请查看 CloudSQLCreateInstanceOperator

如果存在同名的实例,则不会执行任何操作,并且操作符将成功。

参数

使用故障转移副本定义实例的示例正文

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

body = {
    "name": INSTANCE_NAME,
    "settings": {
        "tier": "db-n1-standard-1",
        "backupConfiguration": {"binaryLogEnabled": True, "enabled": True, "startTime": "05:00"},
        "activationPolicy": "ALWAYS",
        "dataDiskSizeGb": 30,
        "dataDiskType": "PD_SSD",
        "databaseFlags": [],
        "ipConfiguration": {
            "ipv4Enabled": True,
            "requireSsl": True,
        },
        "locationPreference": {"zone": "europe-west4-a"},
        "maintenanceWindow": {"hour": 5, "day": 7, "updateTrack": "canary"},
        "pricingPlan": "PER_USE",
        "storageAutoResize": True,
        "storageAutoResizeLimit": 0,
        "userLabels": {"my-key": "my-value"},
    },
    "databaseVersion": "MYSQL_5_7",
    "region": "europe-west4",
}

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_instance_create_task = CloudSQLCreateInstanceOperator(
    body=body, instance=INSTANCE_NAME, task_id="sql_instance_create_task"
)

模板

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档以创建实例

CloudSQLInstancePatchOperator

更新 Google Cloud 中 Cloud SQL 实例的设置(部分更新)。

有关参数定义,请查看CloudSQLInstancePatchOperator

这是一个部分更新,因此只会设置/更新正文中指定的设置值。其余现有实例的配置将保持不变。

参数

定义实例的示例正文

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

patch_body = {
    "name": INSTANCE_NAME,
    "settings": {
        "dataDiskSizeGb": 35,
        "maintenanceWindow": {"hour": 3, "day": 6, "updateTrack": "canary"},
        "userLabels": {"my-key-patch": "my-value-patch"},
    },
}

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_instance_patch_task = CloudSQLInstancePatchOperator(
    body=patch_body, instance=INSTANCE_NAME, task_id="sql_instance_patch_task"
)

模板

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档以修补实例

CloudSQLCloneInstanceOperator

克隆 Cloud SQL 实例。

有关参数定义,请查看CloudSQLCloneInstanceOperator

参数

有关 clone_context 对象属性,请参阅CloneContext

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_instance_clone = CloudSQLCloneInstanceOperator(
    instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
)

模板

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "destination_instance_name",
    "gcp_conn_id",
    "api_version",
)

更多信息

请参阅 Google Cloud SQL API 文档以克隆实例

CloudSQLExecuteQueryOperator

在 Google Cloud SQL 实例中执行 DDL 或 DML SQL 查询。不支持 DQL(从 Google Cloud SQL 检索数据)。您可以运行 SELECT 查询,但这些查询的结果将被丢弃。

您可以指定各种连接方法来连接到正在运行的实例,从公共 IP 普通连接,通过带 SSL 的公共 IP 或通过 Cloud SQL 代理的 TCP 和套接字连接。代理会根据操作员的需要动态下载和启动/停止。

有一个 gcpcloudsql://* 连接类型,您应该使用它来定义操作员要使用的连接类型。该连接是一种“元”连接类型。它本身不用于进行实际连接,但它确定 CloudSQLDatabaseHook 是否应该启动 Cloud SQL 代理,以及应该动态创建哪种数据库连接(Postgres 或 MySQL)以通过公共 IP 地址或通过代理连接到 Cloud SQL。 CloudSqlDatabaseHook 使用 CloudSqlProxyRunner 来管理 Cloud SQL 代理生命周期(每个任务都有自己的 Cloud SQL 代理)

构建连接时,您应该使用 CloudSQLDatabaseHook 中描述的连接参数。您可以在下面看到所有可能的连接类型的连接示例。这种连接可以在不同的任务(CloudSqlQueryOperator 的实例)之间重复使用。如果需要,每个任务都将启动自己的代理,并带有自己的 TCP 或 UNIX 套接字。

有关参数定义,请查看CloudSQLExecuteQueryOperator

由于查询操作符可以运行任意查询,因此无法保证它是幂等的。SQL 查询设计者应该设计查询以使其成为幂等的。例如,Postgres 和 MySQL 都支持 CREATE TABLE IF NOT EXISTS 语句,该语句可用于以幂等方式创建表。

参数

如果您通过在环境变量中定义的 AIRFLOW_CONN_{CONN_ID} URL 来定义连接,请确保 URL 中的 URL 组件已进行 URL 编码。有关详细信息,请参见下面的示例。

请注意,在 SSL 连接的情况下,您需要有一种机制来使证书/密钥文件在所有可以运行操作符的工作程序上都可以在预定义的位置使用。例如,可以通过在所有工作程序上的相同路径中安装类似 NFS 的卷来提供此机制。

所有非 SSL 连接的示例连接定义。请注意,连接 URI 的所有组件都应进行 URL 编码

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py[源代码]

# Connect via proxy over TCP
CONNECTION_PROXY_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_use_tcp": "True",
    },
}

# Connect via proxy over UNIX socket (specific proxy version)
CONNECTION_PROXY_SOCKET_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_version": "v1.33.9",
        "sql_proxy_use_tcp": "False",
    },
}

# Connect directly via TCP (non-SSL)
CONNECTION_PUBLIC_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "False",
    },
}

所有启用 SSL 连接的类似连接定义

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[源代码]

# Connect directly via TCP (SSL)
CONNECTION_PUBLIC_TCP_SSL_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "True",
    },
}

也可以通过环境变量配置连接(请注意,如果您使用标准的 AIRFLOW 表示法通过环境变量定义连接,则操作员的连接 ID 与 AIRFLOW_CONN_{CONN_ID} 后缀大写匹配)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py[源代码]


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect via proxy over TCP
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=True".format(**postgres_kwargs)
)

# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect via proxy over TCP (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=True".format(**mysql_kwargs)
)

# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
os.environ["AIRFLOW_CONN_PROXY_MYSQL_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

# Special case: MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql
# Proxy binary path AND with missing project_id
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL_NO_PROJECT_ID"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[源代码]


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

使用操作符

下面的示例操作符正在使用先前准备好的连接。它可能是 Airflow 数据库中的 connection_id 或通过环境变量配置的连接(请注意,如果您使用标准的 AIRFLOW 表示法通过环境变量定义连接,则操作员的连接 ID 与 AIRFLOW_CONN_{CONN_ID} 后缀大写匹配)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py[源代码]

                query_task = CloudSQLExecuteQueryOperator(
                    gcp_cloudsql_conn_id=connection_id,
                    task_id=task_id,
                    sql=SQL,
                )

SSL 设置也可以在操作符级别指定。在这种情况下,连接中配置的 SSL 设置将被覆盖。一种方法是指定每个证书文件的路径,如下所示。请注意,出于安全原因,这些文件将被复制到具有最低要求权限的临时位置。

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[源代码]

        query_task = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_client_cert=ssl_cert_path,
            ssl_server_cert=ssl_server_cert_path,
            ssl_client_key=ssl_key_path,
        )

您还可以将 SSL 证书保存到 Google Cloud Secret Manager 中,并提供一个密钥 ID。密钥格式为:.. code-block:: python

{“sslcert”: “”, “sslkey”: “”, “sslrootcert”: “”}

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[源代码]

        query_task_secret = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_secret_id=secret_id,
        )

模板

template_fields: Sequence[str] = (
    "sql",
    "gcp_cloudsql_conn_id",
    "gcp_conn_id",
    "ssl_server_cert",
    "ssl_client_cert",
    "ssl_client_key",
    "ssl_secret_id",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}

更多信息

请参阅 Google Cloud SQL 文档,了解 MySQLPostgreSQL 相关代理。

此条目是否有帮助?