Google Cloud Workflows 运算符¶
您可以使用 Workflows 创建无服务器工作流,将一系列无服务器任务按照您定义的顺序链接在一起。结合 Google Cloud 的 API、Cloud Functions 和 Cloud Run 等无服务器产品的强大功能,以及对外部 API 的调用,创建灵活的无服务器应用程序。
有关该服务的更多信息,请访问 Workflows 产品文档 <产品文档。
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
如 Google Cloud 文档中所述,为您的项目启用结算。
如 Cloud Console 文档中所述,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
创建工作流¶
要创建工作流,请使用 WorkflowsCreateWorkflowOperator
。
create_workflow = WorkflowsCreateWorkflowOperator(
task_id="create_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow=WORKFLOW,
workflow_id=WORKFLOW_ID,
)
工作流的定义应与此示例类似
WORKFLOW_CONTENT = """
- getLanguage:
assign:
- inputLanguage: "English"
- readWikipedia:
call: http.get
args:
url: https://www.wikipedia.org/
query:
action: opensearch
search: ${inputLanguage}
result: wikiResult
- returnResult:
return: ${wikiResult}
"""
WORKFLOW = {
"description": "Test workflow",
"labels": {"airflow-version": "dev"},
"source_contents": WORKFLOW_CONTENT,
}
有关创作工作流的更多信息,请查看官方产品文档 <产品文档。
更新工作流¶
要更新工作流,请使用 WorkflowsUpdateWorkflowOperator
。
update_workflow = WorkflowsUpdateWorkflowOperator(
task_id="update_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
update_mask=FieldMask(paths=["name", "description"]),
)
获取工作流¶
要获取工作流,请使用 WorkflowsGetWorkflowOperator
。
get_workflow = WorkflowsGetWorkflowOperator(
task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
列出工作流¶
要列出工作流,请使用 WorkflowsListWorkflowsOperator
。
list_workflows = WorkflowsListWorkflowsOperator(
task_id="list_workflows",
location=LOCATION,
project_id=PROJECT_ID,
)
删除工作流¶
要删除工作流,请使用 WorkflowsDeleteWorkflowOperator
。
delete_workflow = WorkflowsDeleteWorkflowOperator(
task_id="delete_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
创建执行¶
要创建执行,请使用 WorkflowsCreateExecutionOperator
。由于 API 限制,此运算符不是幂等的。
create_execution = WorkflowsCreateExecutionOperator(
task_id="create_execution",
location=LOCATION,
project_id=PROJECT_ID,
execution=EXECUTION,
workflow_id=WORKFLOW_ID,
)
创建运算符不等待执行完成。要等待执行结果,请使用 WorkflowExecutionSensor
。
wait_for_execution = WorkflowExecutionSensor(
task_id="wait_for_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
获取执行¶
要获取执行,请使用 WorkflowsGetExecutionOperator
。
get_execution = WorkflowsGetExecutionOperator(
task_id="get_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
列出执行¶
要列出执行,请使用 WorkflowsListExecutionsOperator
。默认情况下,此运算符仅返回过去 60 分钟内的执行。
list_executions = WorkflowsListExecutionsOperator(
task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
取消执行¶
要取消执行,请使用 WorkflowsCancelExecutionOperator
。
cancel_execution = WorkflowsCancelExecutionOperator(
task_id="cancel_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=SLEEP_WORKFLOW_ID,
execution_id=cancel_execution_id,
)