Google Cloud Tasks

Cloud Tasks 是一项全托管服务,可用于管理大量分布式任务的执行、分派和交付。使用 Cloud Tasks,您可以在用户或服务到服务请求之外异步执行工作。

有关该服务的更多信息,请访问Cloud Tasks 产品文档

前置任务

要使用这些运算符,您需要执行一些操作

队列操作

创建队列

要创建新队列,请使用CloudTasksQueueCreateOperator

tests/system/google/cloud/tasks/example_queue.py

create_queue = CloudTasksQueueCreateOperator(
    location=LOCATION,
    task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_queue",
)

删除队列

要删除队列,请使用CloudTasksQueueDeleteOperator

tests/system/google/cloud/tasks/example_queue.py

delete_queue = CloudTasksQueueDeleteOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="delete_queue",
)

恢复队列

要恢复队列,请使用CloudTasksQueueResumeOperator

tests/system/google/cloud/tasks/example_queue.py

resume_queue = CloudTasksQueueResumeOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="resume_queue",
)

暂停队列

要暂停队列,请使用CloudTasksQueuePauseOperator

tests/system/google/cloud/tasks/example_queue.py

pause_queue = CloudTasksQueuePauseOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="pause_queue",
)

清除队列

要清除队列,请使用CloudTasksQueuePurgeOperator

tests/system/google/cloud/tasks/example_queue.py

purge_queue = CloudTasksQueuePurgeOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="purge_queue",
)

获取队列

要获取队列,请使用CloudTasksQueueGetOperator

tests/system/google/cloud/tasks/example_queue.py

get_queue = CloudTasksQueueGetOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="get_queue",
)

get_queue_result = BashOperator(
    task_id="get_queue_result",
    bash_command=f"echo {get_queue.output}",
)

更新队列

要更新队列,请使用CloudTasksQueueUpdateOperator

tests/system/google/cloud/tasks/example_queue.py

update_queue = CloudTasksQueueUpdateOperator(
    task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
    task_id="update_queue",
)

列出队列

要列出所有队列,请使用CloudTasksQueuesListOperator

tests/system/google/cloud/tasks/example_queue.py

list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")

任务操作

创建任务

要在特定队列中创建新任务,请使用CloudTasksTaskCreateOperator

tests/system/google/cloud/tasks/example_tasks.py

create_task = CloudTasksTaskCreateOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task=TASK,
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_task_to_run",
)

获取任务

要获取特定队列中的任务,请使用CloudTasksTaskGetOperator

tests/system/google/cloud/tasks/example_tasks.py

tasks_get = CloudTasksTaskGetOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="tasks_get",
)

运行任务

要在特定队列中运行任务,请使用CloudTasksTaskRunOperator

tests/system/google/cloud/tasks/example_tasks.py

run_task = CloudTasksTaskRunOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    task_id="run_task",
)

列出任务

要列出特定队列中的所有任务,请使用CloudTasksTasksListOperator

tests/system/google/cloud/tasks/example_tasks.py

list_tasks = CloudTasksTasksListOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="list_tasks",
)

删除任务

要从特定队列中删除任务,请使用CloudTasksTaskDeleteOperator

tests/system/google/cloud/tasks/example_tasks.py

create_task = CloudTasksTaskCreateOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task=TASK,
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_task_to_run",
)

参考

欲了解更多信息,请参阅

此条目是否有帮助?