Google Cloud Transfer Service 运算符¶
先决任务¶
要使用这些运算符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
为您的项目启用结算,如 Google Cloud 文档中所述。
启用 API,如 Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息请参阅。
CloudDataTransferServiceCreateJobOperator¶
创建一个传输作业。
此函数接受两种日期格式
与 Google API 一致
{ "year": 2019, "month": 2, "day": 11 }
作为
datetime
对象
此函数接受两种时间格式
与 Google API 一致
{ "hours": 12, "minutes": 30, "seconds": 0 }
作为
time
对象
如果您想创建从 AWS S3 复制数据的传输作业,则必须配置连接。有关 AWS 配置的信息,请参阅:Amazon Web Services 连接。可通过参数 aws_conn_id
指定选定的 AWS 连接。
有关参数定义,请参阅 CloudDataTransferServiceCreateJobOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
gcs_to_gcs_transfer_body = {
DESCRIPTION: "description",
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: PROJECT_ID_TRANSFER,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
aws_to_gcs_transfer_body = {
DESCRIPTION: GCP_DESCRIPTION,
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: GCP_TRANSFER_JOB_NAME,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)
模板化¶
template_fields: Sequence[str] = (
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Transfer Service - 方法: transferJobs.create。
CloudDataTransferServiceDeleteJobOperator¶
删除一个传输作业。
有关参数定义,请参阅 CloudDataTransferServiceDeleteJobOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job_s3_to_gcs",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Transfer Service - REST 资源: transferJobs - Status
CloudDataTransferServiceRunJobOperator¶
运行一个传输作业。
有关参数定义,请参阅 CloudDataTransferServiceRunJobOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id="run_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Transfer Service - REST 资源: transferJobs - Run
CloudDataTransferServiceUpdateJobOperator¶
更新一个传输作业。
有关参数定义,请参阅 CloudDataTransferServiceUpdateJobOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
update_body = {
PROJECT_ID: PROJECT_ID_TRANSFER,
TRANSFER_JOB: {DESCRIPTION: "description_updated"},
TRANSFER_JOB_FIELD_MASK: "description",
}
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
update_transfer = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
body=update_body,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
更多信息¶
CloudDataTransferServiceCancelOperationOperator¶
获取传输操作。结果将返回到 XCOM。
有关参数定义,请参阅 CloudDataTransferServiceCancelOperationOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
task_id="cancel_operation",
operation_name="{{task_instance.xcom_pull("
"'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']}}",
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Transfer Service - 方法: transferOperations.cancel
CloudDataTransferServiceGetOperationOperator¶
获取传输操作。结果将返回到 XCOM。
有关参数定义,请参阅 CloudDataTransferServiceGetOperationOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"google_impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Transfer Service - 方法: transferOperations.get
CloudDataTransferServiceListOperationsOperator¶
列出传输操作。结果将返回到 XCOM。
有关参数定义,请参阅 CloudDataTransferServiceListOperationsOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
},
)
模板化¶
template_fields: Sequence[str] = (
"request_filter",
"gcp_conn_id",
"google_impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Transfer Service - 方法: transferOperations.list
CloudDataTransferServicePauseOperationOperator¶
暂停传输操作。
有关参数定义,请参阅 CloudDataTransferServicePauseOperationOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
pause_operation = CloudDataTransferServicePauseOperationOperator(
task_id="pause_operation",
operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
"key='sensed_operations')[0]['name']}}",
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Transfer Service - 方法: transferOperations.pause
CloudDataTransferServiceResumeOperationOperator¶
恢复传输操作。
有关参数定义,请参阅 CloudDataTransferServiceResumeOperationOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
resume_operation = CloudDataTransferServiceResumeOperationOperator(
task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Transfer Service - 方法: transferOperations.resume
CloudDataTransferServiceJobStatusSensor¶
等待属于该作业的至少一个操作达到预期状态。
有关参数定义,请参阅 CloudDataTransferServiceJobStatusSensor
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_end",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"impersonation_chain",
)
CloudDataTransferServiceGCSToGCSOperator¶
将数据从一个 GCS 存储桶复制到另一个存储桶。
有关参数定义,请参阅 CloudDataTransferServiceGCSToGCSOperator
。
使用此运算符¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcs_to_gcs.py
transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator(
task_id="transfer_gcs_to_gcs",
source_bucket=BUCKET_NAME_SRC,
source_path=FILE_URI,
destination_bucket=BUCKET_NAME_DST,
destination_path=FILE_URI,
wait=True,
)
模板化¶
template_fields: Sequence[str] = (
"gcp_conn_id",
"source_bucket",
"destination_bucket",
"source_path",
"destination_path",
"description",
"object_conditions",
"google_impersonation_chain",
)
参考资料¶
更多信息请参阅