Google Cloud Tasks¶
Cloud Tasks 是一项全托管服务,可用于管理大量分布式任务的执行、分派和交付。使用 Cloud Tasks,您可以在用户或服务到服务请求之外异步执行工作。
有关该服务的更多信息,请访问Cloud Tasks 产品文档
前置任务¶
要使用这些运算符,您需要执行一些操作
使用Cloud 控制台选择或创建 Cloud Platform 项目。
按照Google Cloud 文档中的说明,为您的项目启用结算功能。
按照Cloud 控制台文档中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关安装的详细信息可用。
队列操作¶
创建队列¶
要创建新队列,请使用CloudTasksQueueCreateOperator
tests/system/google/cloud/tasks/example_queue.py
create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_queue",
)
删除队列¶
要删除队列,请使用CloudTasksQueueDeleteOperator
tests/system/google/cloud/tasks/example_queue.py
delete_queue = CloudTasksQueueDeleteOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="delete_queue",
)
恢复队列¶
要恢复队列,请使用CloudTasksQueueResumeOperator
tests/system/google/cloud/tasks/example_queue.py
resume_queue = CloudTasksQueueResumeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="resume_queue",
)
暂停队列¶
要暂停队列,请使用CloudTasksQueuePauseOperator
tests/system/google/cloud/tasks/example_queue.py
pause_queue = CloudTasksQueuePauseOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="pause_queue",
)
清除队列¶
要清除队列,请使用CloudTasksQueuePurgeOperator
tests/system/google/cloud/tasks/example_queue.py
purge_queue = CloudTasksQueuePurgeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="purge_queue",
)
获取队列¶
要获取队列,请使用CloudTasksQueueGetOperator
tests/system/google/cloud/tasks/example_queue.py
get_queue = CloudTasksQueueGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="get_queue",
)
get_queue_result = BashOperator(
task_id="get_queue_result",
bash_command=f"echo {get_queue.output}",
)
更新队列¶
要更新队列,请使用CloudTasksQueueUpdateOperator
tests/system/google/cloud/tasks/example_queue.py
update_queue = CloudTasksQueueUpdateOperator(
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
task_id="update_queue",
)
列出队列¶
要列出所有队列,请使用CloudTasksQueuesListOperator
tests/system/google/cloud/tasks/example_queue.py
list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")
任务操作¶
创建任务¶
要在特定队列中创建新任务,请使用CloudTasksTaskCreateOperator
tests/system/google/cloud/tasks/example_tasks.py
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
获取任务¶
要获取特定队列中的任务,请使用CloudTasksTaskGetOperator
tests/system/google/cloud/tasks/example_tasks.py
tasks_get = CloudTasksTaskGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="tasks_get",
)
运行任务¶
要在特定队列中运行任务,请使用CloudTasksTaskRunOperator
tests/system/google/cloud/tasks/example_tasks.py
run_task = CloudTasksTaskRunOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
task_id="run_task",
)
列出任务¶
要列出特定队列中的所有任务,请使用CloudTasksTasksListOperator
tests/system/google/cloud/tasks/example_tasks.py
list_tasks = CloudTasksTasksListOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="list_tasks",
)
删除任务¶
要从特定队列中删除任务,请使用CloudTasksTaskDeleteOperator
tests/system/google/cloud/tasks/example_tasks.py
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
参考¶
欲了解更多信息,请参阅