Google API 到 Amazon S3¶
使用 GoogleApiToS3Operator
传输操作,向支持发现的任何 Google API 发送请求,并将其响应保存到 Amazon S3 文件中。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 安装 Airflow®
设置连接.
操作符¶
Google Sheets 到 Amazon S3 传输操作符¶
此示例从 Google Sheets 加载数据并将其保存到 Amazon S3 文件中。
tests/system/amazon/aws/example_google_api_sheets_to_s3.py
task_google_sheets_values_to_s3 = GoogleApiToS3Operator(
task_id="google_sheet_data_to_s3",
google_api_service_name="sheets",
google_api_service_version="v4",
google_api_endpoint_path="sheets.spreadsheets.values.get",
google_api_endpoint_params={"spreadsheetId": GOOGLE_SHEET_ID, "range": GOOGLE_SHEET_RANGE},
s3_destination_key=f"s3://{s3_bucket}/{s3_key}",
)
您可以在 此处 找到有关所用 Google API 端点的更多信息。
Google Youtube 到 Amazon S3¶
这是一个更高级的 DAG 示例,用于使用 GoogleApiToS3Operator
,该操作符使用 xcom 在任务之间传递数据,以检索有关 YouTube 视频的特定信息。
它会在指定的时间范围 (YOUTUBE_VIDEO_PUBLISHED_AFTER
, YOUTUBE_VIDEO_PUBLISHED_BEFORE
) 内,在 YouTube 频道 (YOUTUBE_CHANNEL_ID
) 上搜索最多 50 个视频(由于分页),将响应保存到 Amazon S3 中,并将数据推送到 xcom。
tests/system/amazon/aws/example_google_api_youtube_to_s3.py
video_ids_to_s3 = GoogleApiToS3Operator(
task_id="video_ids_to_s3",
google_api_service_name="youtube",
google_api_service_version="v3",
google_api_endpoint_path="youtube.search.list",
gcp_conn_id=conn_id_name,
google_api_endpoint_params={
"part": "snippet",
"channelId": YOUTUBE_CHANNEL_ID,
"maxResults": 50,
"publishedAfter": YOUTUBE_VIDEO_PUBLISHED_AFTER,
"publishedBefore": YOUTUBE_VIDEO_PUBLISHED_BEFORE,
"type": "video",
"fields": "items/id/videoId",
},
google_api_response_via_xcom="video_ids_response",
s3_destination_key=f"https://s3.us-west-2.amazonaws.com/{s3_bucket_name}/youtube_search",
s3_overwrite=True,
)
它将 YouTube ID 传递给下一个请求,该请求随后获取所请求视频的信息 (YOUTUBE_VIDEO_FIELDS
),并将其保存到 Amazon S3 (S3_BUCKET_NAME
) 中。
tests/system/amazon/aws/example_google_api_youtube_to_s3.py
video_data_to_s3 = GoogleApiToS3Operator(
task_id="video_data_to_s3",
google_api_service_name="youtube",
google_api_service_version="v3",
gcp_conn_id=conn_id_name,
google_api_endpoint_path="youtube.videos.list",
google_api_endpoint_params={
"part": YOUTUBE_VIDEO_PARTS,
"maxResults": 50,
"fields": YOUTUBE_VIDEO_FIELDS,
},
google_api_endpoint_params_via_xcom="video_ids",
s3_destination_key=f"https://s3.us-west-2.amazonaws.com/{s3_bucket_name}/youtube_videos",
s3_overwrite=True,
)