Google API 到 Amazon S3¶
使用 GoogleApiToS3Operator
传输来向任何支持发现的 Google API 发出请求,并将其响应保存在 Amazon S3 文件中。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参考 Airflow® 的安装
设置连接.
操作符¶
Google Sheets 到 Amazon S3 传输操作符¶
此示例从 Google Sheets 加载数据并将其保存到 Amazon S3 文件中。
tests/system/amazon/aws/example_google_api_sheets_to_s3.py
task_google_sheets_values_to_s3 = GoogleApiToS3Operator(
task_id="google_sheet_data_to_s3",
google_api_service_name="sheets",
google_api_service_version="v4",
google_api_endpoint_path="sheets.spreadsheets.values.get",
google_api_endpoint_params={"spreadsheetId": GOOGLE_SHEET_ID, "range": GOOGLE_SHEET_RANGE},
s3_destination_key=f"s3://{s3_bucket}/{s3_key}",
)
您可以在此处找到有关所使用的 Google API 端点的更多信息。
Google Youtube 到 Amazon S3¶
这是一个更高级的示例 DAG,用于使用 GoogleApiToS3Operator
,它使用 xcom 在任务之间传递数据,以检索有关 YouTube 视频的特定信息。
它在给定时间范围 (YOUTUBE_VIDEO_PUBLISHED_AFTER
, YOUTUBE_VIDEO_PUBLISHED_BEFORE
) 内,在一个 YouTube 频道 (YOUTUBE_CHANNEL_ID
) 上搜索最多 50 个视频(由于分页),将响应保存在 Amazon S3 中,并将数据推送到 xcom。
tests/system/amazon/aws/example_google_api_youtube_to_s3.py
video_ids_to_s3 = GoogleApiToS3Operator(
task_id="video_ids_to_s3",
google_api_service_name="youtube",
google_api_service_version="v3",
google_api_endpoint_path="youtube.search.list",
gcp_conn_id=conn_id_name,
google_api_endpoint_params={
"part": "snippet",
"channelId": YOUTUBE_CHANNEL_ID,
"maxResults": 50,
"publishedAfter": YOUTUBE_VIDEO_PUBLISHED_AFTER,
"publishedBefore": YOUTUBE_VIDEO_PUBLISHED_BEFORE,
"type": "video",
"fields": "items/id/videoId",
},
google_api_response_via_xcom="video_ids_response",
s3_destination_key=f"https://s3.us-west-2.amazonaws.com/{s3_bucket_name}/youtube_search",
s3_overwrite=True,
)
它将 YouTube ID 传递给下一个请求,然后获取请求的视频的信息 (YOUTUBE_VIDEO_FIELDS
),并将它们保存在 Amazon S3 (S3_BUCKET_NAME
) 中。
tests/system/amazon/aws/example_google_api_youtube_to_s3.py
video_data_to_s3 = GoogleApiToS3Operator(
task_id="video_data_to_s3",
google_api_service_name="youtube",
google_api_service_version="v3",
gcp_conn_id=conn_id_name,
google_api_endpoint_path="youtube.videos.list",
google_api_endpoint_params={
"part": YOUTUBE_VIDEO_PARTS,
"maxResults": 50,
"fields": YOUTUBE_VIDEO_FIELDS,
},
google_api_endpoint_params_via_xcom="video_ids",
s3_destination_key=f"https://s3.us-west-2.amazonaws.com/{s3_bucket_name}/youtube_videos",
s3_overwrite=True,
)