Внутреннее устройство
Celery состоит из нескольких компонентов, которые взаимодействуют между собой для обработки задач:
- Брокер сообщений — это механизм, который позволяет передавать задачи от приложения к рабочим процессам Celery. Вы можете выбрать различные брокеры, такие как RabbitMQ, Redis или Amazon SQS, в зависимости от ваших потребностей.
- Рабочие процессы — это отдельные процессы, которые слушают брокер сообщений и выполняют поступающие в очередь задачи. Вы можете запускать несколько рабочих процессов, чтобы распределить нагрузку и обеспечить параллельное выполнение задач.
- Задачи — это функции, которые вы определяете в вашем файле tasks.py. Они могут быть выполнены асинхронно рабочими процессами.
- Результаты. Celery позволяет получать результаты выполнения задач. Вы можете выбрать различные бэкенды для хранения результатов, такие как Redis, база данных или файловая система.
Celery использует механизмы сериализации данных для передачи задач и результатов между компонентами.
Установка и запуск
Для работы с Celery понадобится установить брокер сообщений. Брокер сообщений — это специальная служба, к которой подключаются отправители и получатели данных. В роли брокера могут выступать Redis, RabbitMQ и Amazon SQS. В качестве примера возьмём Redis, так как он является простым и наиболее используемым.
Redis — одно из самых популярных хранилищ данных в памяти, которое используется для различных задач, включая:
- кеширование,
- сессионное хранилище,
- очереди заданий и многое другое.
Давайте установим и запустим Redis с помощью Docker.
- Чтобы запустить контейнер Redis, выполните команду:
docker run -p 6379:6379 --name my-redis -d redis
Для установки Celery выполните команду через pip:
pip install celery[redis]==5.3.1
После установки Celery вы можете создать файл tasks.py
, в котором будет содержаться код для ваших задач.
Пример простой задачи в файле tasks.py
:
from celery import Celery
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@app.task
def add(x, y):
return x + y
- tasks — название нашего Celery-приложения.
- broker — URL брокера сообщений.
- backend — URL хранилища результатов; можно указать URL брокера.
Данный код вы можете найти в репозитории с практической работой.
Для запуска рабочих процессов Celery, которые будут обрабатывать задачи, выполните команду:
celery -A tasks worker
Теперь очередь задач готова к работе. Вы можете добавлять задачи и выполнять их:
>>> from tasks import add
>>> result = add.delay(4, 5)
>>> result
<AsyncResult: 6c6de6cb-4e95-4815-b02a-0d5d048b6e80>
>>> result.get()
9
Обзор возможностей
Celery даёт много возможностей для эффективной работы с очередями задач. Наиболее популярные из них, с которыми вы столкнётесь на практике:
- Планирование задач.
- Группы и цепочки задач.
- Отслеживание состояния задач.
Остановимся на них подробнее.
Планирование задач
Планирование задач в Celery позволяет запускать задачи в определённое время или через определённые интервалы.
Для этого используется Celery Beat — специальный инструмент для планирования и управления периодическими задачами. Celery Beat позволяет задать расписание выполнения задач.
Пример периодической задачи:
from random import random
from celery import Celery
from celery.schedules import crontab
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(60, check_cat.s())
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
check_cat.s()
)
@app.task
def check_cat():
if random() < 0.5:
print("Кот ничего не сломал.")
else:
print("Кот что-то сломал...")
В данном примере вы определяете периодическую задачу check_cat
, которая будет выполняться каждую минуту и проверять, сломал ли кот что-либо.
С помощью crontab запланировали ту же задачу check_cat
, которая будет выполняться каждый понедельник в 07:30.
Для запуска сервиса Celery Beat запустите команду:
celery -A tasks beat
Также можно запустить beat внутри рабочего процесса:
celery -A tasks worker -B
Но делать это не рекомендуется, исходя из принципа единой ответственности. При изменении worker затрагивается и Celery Beat. Если что-то из этого упадёт, то упадёт всё сразу.
Группы и цепочки задач
Они позволяют объединять несколько задач и управлять их последовательным или параллельным выполнением.
Группы задач
Группы задач — это механизм для параллельного запуска группы задач и получения результатов отдельно для каждой из них.
Ниже приведён пример, в котором мы покупаем молоко и хлеб. Эти задачи группируются для параллельного выполнения.
@app.task
def buy_milk(volume: int) -> int:
print(f'Покупаем {volume} литров молока')
return volume
@app.task
def buy_bread(count: int) -> int:
print(f'Покупаем {count} буханок хлеба')
return count
from celery import group
from tasks import buy_milk, buy_bread
task1 = buy_milk.s(7)
task2 = buy_bread.s(5)
task_group = group(task1, task2)
result = task_group.apply_async()
results = result.get()
print(results) # [7, 5]
Цепочки задач
Цепочки задач — это механизм для последовательного запуска задач, где результат одной задачи передаётся в качестве аргумента следующей.
В примере ниже — результат задачи fetch_user_name
передаётся на вход задаче greeting_user
.
@app.task
def fetch_user_name(id: int) -> str:
return f'Пётр {id}Первый'
@app.task
def greeting_user(name: str) -> str:
return f'Здравствуй, {name}!'
from celery import chain
from tasks import fetch_user_name, greeting_user
task1 = fetch_user_name.s(1)
task2 = greeting_user.s()
task_chain = chain(task1 | task2)
result = task_chain.apply_async()
final_result = result.get()
print(final_result) # Здравствуй, Пётр Первый!
Отслеживание состояния задач
Celery позволяет отслеживать статус выполнения задач. При запуске асинхронной задачи с помощью apply_async()
вы получаете объект AsyncResult
, который позволяет отследить состояние задачи и получить её результаты по завершении.
В примере ниже мы получим ошибку, если на вход дадим строку. Такую ошибку можно найти с помощью проверки result.successful()
и атрибута result
.
@app.task
def heavy_task(n: int) -> int:
result = 1
for i in range(2, n):
result *= i
time.sleep(0.01)
return result
from tasks import heavy_task
def get_factorial(arg):
result = heavy_task.apply_async(args=(arg,))
while not result.ready():
# Задача ещё выполняется
pass
if result.successful():
result_value = result.get()
else:
# Информация об ошибке
result_value = result.result
print(result_value)
get_factorial(50)
# 608281864034267560872252163321295376887552831379210240000000000
get_factorial('Сейчас будет ошибка')
# 'str' object cannot be interpreted as an integer
Таким образом, можно гибко манипулировать задачами, повышая производительность с помощью группировки и выстраивания задач в цепочки, а также повышая отказоустойчивость с помощью отслеживания состояния задач.
📂 Task Queue | Последнее изменение: 17.03.2024 10:36