Google Cloud Workflows 运算符

您可以使用 Workflows 创建无服务器工作流,将一系列无服务器任务按照您定义的顺序链接在一起。结合 Google Cloud 的 API、Cloud Functions 和 Cloud Run 等无服务器产品的强大功能,以及对外部 API 的调用,创建灵活的无服务器应用程序。

有关该服务的更多信息,请访问 Workflows 产品文档 <产品文档

先决条件任务

要使用这些运算符,您必须执行以下几项操作

创建工作流

要创建工作流,请使用 WorkflowsCreateWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py[源代码]

create_workflow = WorkflowsCreateWorkflowOperator(
    task_id="create_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow=WORKFLOW,
    workflow_id=WORKFLOW_ID,
)

工作流的定义应与此示例类似

tests/system/google/cloud/workflows/example_workflows.py[源代码]

WORKFLOW_CONTENT = """
- getLanguage:
    assign:
        - inputLanguage: "English"
- readWikipedia:
    call: http.get
    args:
        url: https://www.wikipedia.org/
        query:
            action: opensearch
            search: ${inputLanguage}
    result: wikiResult
- returnResult:
    return: ${wikiResult}
"""

WORKFLOW = {
    "description": "Test workflow",
    "labels": {"airflow-version": "dev"},
    "source_contents": WORKFLOW_CONTENT,
}

有关创作工作流的更多信息,请查看官方产品文档 <产品文档

更新工作流

要更新工作流,请使用 WorkflowsUpdateWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py[源代码]

update_workflow = WorkflowsUpdateWorkflowOperator(
    task_id="update_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    update_mask=FieldMask(paths=["name", "description"]),
)

获取工作流

要获取工作流,请使用 WorkflowsGetWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py[源代码]

get_workflow = WorkflowsGetWorkflowOperator(
    task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

列出工作流

要列出工作流,请使用 WorkflowsListWorkflowsOperator

tests/system/google/cloud/workflows/example_workflows.py[源代码]

list_workflows = WorkflowsListWorkflowsOperator(
    task_id="list_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
)

删除工作流

要删除工作流,请使用 WorkflowsDeleteWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py[源代码]

delete_workflow = WorkflowsDeleteWorkflowOperator(
    task_id="delete_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

创建执行

要创建执行,请使用 WorkflowsCreateExecutionOperator。由于 API 限制,此运算符不是幂等的。

tests/system/google/cloud/workflows/example_workflows.py[源代码]

create_execution = WorkflowsCreateExecutionOperator(
    task_id="create_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    execution=EXECUTION,
    workflow_id=WORKFLOW_ID,
)

创建运算符不等待执行完成。要等待执行结果,请使用 WorkflowExecutionSensor

tests/system/google/cloud/workflows/example_workflows.py[源代码]

wait_for_execution = WorkflowExecutionSensor(
    task_id="wait_for_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

获取执行

要获取执行,请使用 WorkflowsGetExecutionOperator

tests/system/google/cloud/workflows/example_workflows.py[源代码]

get_execution = WorkflowsGetExecutionOperator(
    task_id="get_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

列出执行

要列出执行,请使用 WorkflowsListExecutionsOperator。默认情况下,此运算符仅返回过去 60 分钟内的执行。

tests/system/google/cloud/workflows/example_workflows.py[源代码]

list_executions = WorkflowsListExecutionsOperator(
    task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

取消执行

要取消执行,请使用 WorkflowsCancelExecutionOperator

tests/system/google/cloud/workflows/example_workflows.py[源代码]

cancel_execution = WorkflowsCancelExecutionOperator(
    task_id="cancel_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=SLEEP_WORKFLOW_ID,
    execution_id=cancel_execution_id,
)

此条目是否有帮助?