Google Cloud Workflows 操作器¶
您可以使用 Workflows 来创建无服务器工作流,这些工作流按照您定义的顺序将一系列无服务器任务链接在一起。结合 Google Cloud 的强大 API、Cloud Functions 和 Cloud Run 等无服务器产品,以及对外部 API 的调用,来创建灵活的无服务器应用程序。
有关此服务的更多信息,请访问 Workflows 官方文档 <产品文档。
前提任务¶
要使用这些操作器,您必须完成以下几项:
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用结算,详情请参阅 Google Cloud 文档。
启用 API,详情请参阅 Cloud Console 文档。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'详细信息请参阅 安装。
创建工作流¶
要创建工作流,请使用 WorkflowsCreateWorkflowOperator
。
tests/system/google/cloud/workflows/example_workflows.py
create_workflow = WorkflowsCreateWorkflowOperator(
task_id="create_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow=WORKFLOW,
workflow_id=WORKFLOW_ID,
)
工作流应以类似于此示例的方式定义
tests/system/google/cloud/workflows/example_workflows.py
WORKFLOW_CONTENT = """
- getLanguage:
assign:
- inputLanguage: "English"
- readWikipedia:
call: http.get
args:
url: https://www.wikipedia.org/
query:
action: opensearch
search: ${inputLanguage}
result: wikiResult
- returnResult:
return: ${wikiResult}
"""
WORKFLOW = {
"description": "Test workflow",
"labels": {"airflow-version": "dev"},
"source_contents": WORKFLOW_CONTENT,
}
有关编写工作流的更多信息,请查看官方产品文档 <产品文档。
更新工作流¶
要更新工作流,请使用 WorkflowsUpdateWorkflowOperator
。
tests/system/google/cloud/workflows/example_workflows.py
update_workflow = WorkflowsUpdateWorkflowOperator(
task_id="update_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
update_mask=FieldMask(paths=["name", "description"]),
)
获取工作流¶
要获取工作流,请使用 WorkflowsGetWorkflowOperator
。
tests/system/google/cloud/workflows/example_workflows.py
get_workflow = WorkflowsGetWorkflowOperator(
task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
列出工作流¶
要列出工作流,请使用 WorkflowsListWorkflowsOperator
。
tests/system/google/cloud/workflows/example_workflows.py
list_workflows = WorkflowsListWorkflowsOperator(
task_id="list_workflows",
location=LOCATION,
project_id=PROJECT_ID,
)
删除工作流¶
要删除工作流,请使用 WorkflowsDeleteWorkflowOperator
。
tests/system/google/cloud/workflows/example_workflows.py
delete_workflow = WorkflowsDeleteWorkflowOperator(
task_id="delete_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
创建执行¶
要创建执行,请使用 WorkflowsCreateExecutionOperator
。由于 API 限制,此操作器不是幂等的。
tests/system/google/cloud/workflows/example_workflows.py
create_execution = WorkflowsCreateExecutionOperator(
task_id="create_execution",
location=LOCATION,
project_id=PROJECT_ID,
execution=EXECUTION,
workflow_id=WORKFLOW_ID,
)
创建操作器不会等待执行完成。要等待执行结果,请使用 WorkflowExecutionSensor
。
tests/system/google/cloud/workflows/example_workflows.py
wait_for_execution = WorkflowExecutionSensor(
task_id="wait_for_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
获取执行¶
要获取执行,请使用 WorkflowsGetExecutionOperator
。
tests/system/google/cloud/workflows/example_workflows.py
get_execution = WorkflowsGetExecutionOperator(
task_id="get_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
列出执行¶
要列出执行,请使用 WorkflowsListExecutionsOperator
。默认情况下,此操作器仅返回过去 60 分钟内的执行。
tests/system/google/cloud/workflows/example_workflows.py
list_executions = WorkflowsListExecutionsOperator(
task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
取消执行¶
要取消执行,请使用 WorkflowsCancelExecutionOperator
。
tests/system/google/cloud/workflows/example_workflows.py
cancel_execution = WorkflowsCancelExecutionOperator(
task_id="cancel_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=SLEEP_WORKFLOW_ID,
execution_id=cancel_execution_id,
)