Google Cloud Tasks¶
Cloud Tasks 是一项完全托管的服务,允许您管理大量分布式任务的执行、分派和交付。使用 Cloud Tasks,您可以在用户或服务到服务请求之外异步执行工作。
有关该服务的更多信息,请访问 Cloud Tasks 产品文档
先决条件任务¶
要使用这些操作符,您必须执行以下几个操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
按照 Google Cloud 文档中的描述,为您的项目启用结算功能。
按照 Cloud Console 文档中的描述,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'详细信息请参阅 安装。
队列操作¶
创建队列¶
要创建新队列,请使用 CloudTasksQueueCreateOperator
create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_queue",
)
删除队列¶
要删除队列,请使用 CloudTasksQueueDeleteOperator
delete_queue = CloudTasksQueueDeleteOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="delete_queue",
)
恢复队列¶
要恢复队列,请使用 CloudTasksQueueResumeOperator
resume_queue = CloudTasksQueueResumeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="resume_queue",
)
暂停队列¶
要暂停队列,请使用 CloudTasksQueuePauseOperator
pause_queue = CloudTasksQueuePauseOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="pause_queue",
)
清除队列¶
要清除队列,请使用 CloudTasksQueuePurgeOperator
purge_queue = CloudTasksQueuePurgeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="purge_queue",
)
获取队列¶
要获取队列,请使用 CloudTasksQueueGetOperator
get_queue = CloudTasksQueueGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="get_queue",
)
get_queue_result = BashOperator(
task_id="get_queue_result",
bash_command=f"echo {get_queue.output}",
)
更新队列¶
要更新队列,请使用 CloudTasksQueueUpdateOperator
update_queue = CloudTasksQueueUpdateOperator(
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
task_id="update_queue",
)
列出队列¶
要列出所有队列,请使用 CloudTasksQueuesListOperator
list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")
任务操作¶
创建任务¶
要在特定队列中创建新任务,请使用 CloudTasksTaskCreateOperator
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
获取任务¶
要获取特定队列中的任务,请使用 CloudTasksTaskGetOperator
tasks_get = CloudTasksTaskGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="tasks_get",
)
运行任务¶
要运行特定队列中的任务,请使用 CloudTasksTaskRunOperator
run_task = CloudTasksTaskRunOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
task_id="run_task",
)
列出任务¶
要列出特定队列中的所有任务,请使用 CloudTasksTasksListOperator
list_tasks = CloudTasksTasksListOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="list_tasks",
)
删除任务¶
要从特定队列中删除任务,请使用 CloudTasksTaskDeleteOperator
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)