Celery является незаменимым инструментом для веб-приложений, в которых требуется обрабатывать большое количество продолжительных задач. В данном материале мы рассмотрим, как Celery используется вместе с Flask.
Предположим, что есть веб-приложение, которое позволяет пользователям загружать изображения для обработки. Мы хотим предоставить им опции одновременной загрузки и обработки нескольких изображений с возможностью отслеживания статуса обработки каждого изображения.
import random
from flask import Flask, request, jsonify
from celery import Celery, group
import time
app = Flask(__name__)
# Конфигурация Celery
celery = Celery(
app.name,
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
)
# Задача Celery для обработки изображения
@celery.task
def process_image(image_id: str):
# В реальной ситуации здесь может быть обработка изображения
# В данном примере просто делаем задержку для демонстрации
time.sleep(random.randint(5, 15))
return f'Image {image_id} processed'
@app.route('/process_images', methods=['POST'])
def process_images():
images = request.json.get('images')
if images and isinstance(images, list):
# Создаём группу задач
task_group = group(
process_image.s(image_id)
for image_id in images
)
# Запускаем группу задач и сохраняем её
result = task_group.apply_async()
result.save()
# Возвращаем пользователю ID группы для отслеживания
return jsonify({'group_id': result.id}), 202
else:
return jsonify({'error': 'Missing or invalid images parameter'}), 400
@app.route('/status/<group_id>', methods=['GET'])
def get_group_status(group_id: str):
result = celery.GroupResult.restore(group_id)
if result:
# Если группа с таким ID существует,
# возвращаем долю выполненных задач
status = result.completed_count() / len(result)
return jsonify({'status': status}), 200
else:
# Иначе возвращаем ошибку
return jsonify({'error': 'Invalid group_id'}), 404
if __name__ == '__main__':
app.run(debug=True)
Запустите Redis (в качестве брокера Celery):
docker run -p 6379:6379 --name my-redis -d redis
Запустите воркеры Celery для обработки задач:
celery -A app.celery worker --loglevel=info
Запустите Flask-приложение:
python app.py
Теперь вы можете отправлять POST-запросы на /process_images
в вашем Flask-приложении, передавая список images в JSON-формате. Приложение создаст группу задач для обработки каждого изображения в списке и вернёт group_id
, который позволит отслеживать статус обработки группы задач. Вы также можете получить текущий статус группы задач, отправив GET-запрос на /status/<group_id>
.
Если какое-либо изображение не успело обработаться, вы увидите другое значение, например 0.66667.
Многие сайты используют именно такую модель взаимодействия — раз в секунду они делают запрос к серверу, как будто спрашивают: «Ну как там?» — получают ответ и отображают его пользователю.
Этот пример демонстрирует, как использование группировки задач и отслеживание их статуса с помощью Celery позволяют параллельно обрабатывать множество задач и контролировать их выполнение, улучшая производительность и управляемость веб-приложения.
Практика
Доработайте приложение, добавив следующие endpoints:
/cancel/<group_id>
Отменяет группу задач. В этом вам поможет метод revoke./control
Отображает информацию о задачах и очередях с помощью методов celery.app.control. Сделайте так, чтобы информация актуализировалась раз в 10 секунд, а не при каждом запросе.
📂 Task Queue | Последнее изменение: 17.03.2024 11:59