AWS Glue¶
AWS Glue 是一种无服务器数据集成服务,可以轻松地发现、准备和组合数据,用于分析、机器学习和应用程序开发。AWS Glue 提供了数据集成所需的所有功能,以便您可以在几分钟而不是几个月内开始分析您的数据并加以利用。
前置任务¶
要使用这些 operators (操作符),您必须完成以下几件事:
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services Connection (连接) ID。如果此参数设置为
None
,则使用默认的 boto3 行为,无需查找连接。否则,使用 Connection (连接) 中存储的凭证。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则使用 AWS Connection (连接) Extra Parameter (额外参数) 中的 region_name。否则,使用指定的值而非连接中的值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果您想使用与 botocore 使用的不同的 CA 证书包,可以指定此参数。
如果此参数设置为
None
或省略,则使用 AWS Connection (连接) Extra Parameter (额外参数) 中的 verify。否则,使用指定的值而非连接中的值。默认值:None
- botocore_config
提供的字典用于构建 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。
示例,有关参数的更多详细信息请参阅 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则使用 AWS Connection (连接) Extra Parameter (额外参数) 中的 config_kwargs。否则,使用指定的值而非连接中的值。默认值:None
注意
指定一个空字典
{}
将覆盖 botocore.config.Config 的连接配置
Operators (操作符)¶
创建 AWS Glue crawler (爬网程序)¶
AWS Glue Crawlers (爬网程序) 允许您轻松地从各种数据源提取数据。要创建新的 AWS Glue Crawler (爬网程序) 或运行现有的 crawler,您可以使用 GlueCrawlerOperator
。
tests/system/amazon/aws/example_glue.py
crawl_s3 = GlueCrawlerOperator(
task_id="crawl_s3",
config=glue_crawler_config,
)
注意
配置中包含的 AWS IAM 角色需要访问源数据位置(例如,如果数据存储在 Amazon S3 中,则需要 s3:PutObject 访问权限)以及 AWSGlueServiceRole 策略。有关更多详细信息的链接,请参见下面的 Reference (参考) 部分。
提交 AWS Glue job (作业)¶
要提交新的 AWS Glue job (作业),您可以使用 GlueJobOperator
。
tests/system/amazon/aws/example_glue.py
submit_glue_job = GlueJobOperator(
task_id="submit_glue_job",
job_name=glue_job_name,
script_location=f"s3://{bucket_name}/etl_script.py",
s3_bucket=bucket_name,
iam_role_name=role_name,
create_job_kwargs={"GlueVersion": "3.0", "NumberOfWorkers": 2, "WorkerType": "G.1X"},
)
注意
用于 crawler (爬网程序) 的同一个 AWS IAM 角色也可以在此处使用,但它需要策略以提供对结果数据输出位置的访问权限。
创建 AWS Glue Data Quality (数据质量)¶
AWS Glue Data Quality (数据质量) 允许您衡量和监控数据的质量,以便做出明智的业务决策。要创建新的 AWS Glue Data Quality (数据质量) 规则集或更新现有规则集,您可以使用 GlueDataQualityOperator
。
tests/system/amazon/aws/example_glue_data_quality.py
create_rule_set = GlueDataQualityOperator(
task_id="create_rule_set",
name=rule_set_name,
ruleset=RULE_SET,
data_quality_ruleset_kwargs={
"TargetTable": {
"TableName": athena_table,
"DatabaseName": athena_database,
}
},
)
启动 AWS Glue Data Quality (数据质量) Evaluation Run (评估运行)¶
要启动 AWS Glue Data Quality (数据质量) 规则集评估运行,您可以使用 GlueDataQualityRuleSetEvaluationRunOperator
。
tests/system/amazon/aws/example_glue_data_quality.py
start_evaluation_run = GlueDataQualityRuleSetEvaluationRunOperator(
task_id="start_evaluation_run",
datasource={
"GlueTable": {
"TableName": athena_table,
"DatabaseName": athena_database,
}
},
role=test_context[ROLE_ARN_KEY],
rule_set_names=[rule_set_name],
)
启动 AWS Glue Data Quality (数据质量) Recommendation Run (推荐运行)¶
要启动 AWS Glue Data Quality (数据质量) 规则推荐运行,您可以使用 GlueDataQualityRuleRecommendationRunOperator
。
tests/system/amazon/aws/example_glue_data_quality_with_recommendation.py
recommendation_run = GlueDataQualityRuleRecommendationRunOperator(
task_id="recommendation_run",
datasource={
"GlueTable": {
"TableName": athena_table,
"DatabaseName": athena_database,
}
},
role=test_context[ROLE_ARN_KEY],
recommendation_run_kwargs={"CreatedRulesetName": rule_set_name},
)
Sensors (传感器)¶
等待 AWS Glue crawler (爬网程序) 状态¶
要等待 AWS Glue crawler (爬网程序) 执行达到终止状态,您可以使用 GlueCrawlerSensor
。
tests/system/amazon/aws/example_glue.py
wait_for_crawl = GlueCrawlerSensor(
task_id="wait_for_crawl",
crawler_name=glue_crawler_name,
)
等待 AWS Glue job (作业) 状态¶
要等待 AWS Glue Job (作业) 达到终止状态,您可以使用 GlueJobSensor
tests/system/amazon/aws/example_glue.py
wait_for_job = GlueJobSensor(
task_id="wait_for_job",
job_name=glue_job_name,
# Job ID extracted from previous Glue Job Operator task
run_id=submit_glue_job.output,
verbose=True, # prints glue job logs in airflow logs
)
等待 AWS Glue Data Quality (数据质量) Evaluation Run (评估运行)¶
要等待 AWS Glue Data Quality RuleSet Evaluation Run (数据质量规则集评估运行) 达到终止状态,您可以使用 GlueDataQualityRuleSetEvaluationRunSensor
tests/system/amazon/aws/example_glue_data_quality.py
await_evaluation_run_sensor = GlueDataQualityRuleSetEvaluationRunSensor(
task_id="await_evaluation_run_sensor",
evaluation_run_id=start_evaluation_run.output,
)
等待 AWS Glue Data Quality (数据质量) Recommendation Run (推荐运行)¶
要等待 AWS Glue Data Quality (数据质量) 推荐运行达到终止状态,您可以使用 GlueDataQualityRuleRecommendationRunSensor
tests/system/amazon/aws/example_glue_data_quality_with_recommendation.py
await_recommendation_run_sensor = GlueDataQualityRuleRecommendationRunSensor(
task_id="await_recommendation_run_sensor",
recommendation_run_id=recommendation_run.output,
)
等待 AWS Glue Catalog (数据目录) Partition (分区)¶
要等待分区出现在 AWS Glue Catalog (数据目录) 中直到达到终止状态,您可以使用 GlueCatalogPartitionSensor
tests/system/amazon/aws/example_glue.py
wait_for_catalog_partition = GlueCatalogPartitionSensor(
task_id="wait_for_catalog_partition",
table_name="input",
database_name=glue_db_name,
expression="category='mixed'",
)