Внутреннее устройство

Celery состоит из нескольких компонентов, которые взаимодействуют между собой для обработки задач:

  • Брокер сообщений — это механизм, который позволяет передавать задачи от приложения к рабочим процессам Celery. Вы можете выбрать различные брокеры, такие как RabbitMQ, Redis или Amazon SQS, в зависимости от ваших потребностей.
  • Рабочие процессы — это отдельные процессы, которые слушают брокер сообщений и выполняют поступающие в очередь задачи. Вы можете запускать несколько рабочих процессов, чтобы распределить нагрузку и обеспечить параллельное выполнение задач.
  • Задачи — это функции, которые вы определяете в вашем файле tasks.py. Они могут быть выполнены асинхронно рабочими процессами.
  • Результаты. Celery позволяет получать результаты выполнения задач. Вы можете выбрать различные бэкенды для хранения результатов, такие как Redis, база данных или файловая система.

Celery использует механизмы сериализации данных для передачи задач и результатов между компонентами.


Установка и запуск

Для работы с Celery понадобится установить брокер сообщений. Брокер сообщений — это специальная служба, к которой подключаются отправители и получатели данных. В роли брокера могут выступать Redis, RabbitMQ и Amazon SQS. В качестве примера возьмём Redis, так как он является простым и наиболее используемым.

Redis — одно из самых популярных хранилищ данных в памяти, которое используется для различных задач, включая: 

  • кеширование, 
  • сессионное хранилище, 
  • очереди заданий и многое другое.

Давайте установим и запустим Redis с помощью Docker.

  • Чтобы запустить контейнер Redis, выполните команду:
docker run -p 6379:6379 --name my-redis -d redis

Для установки Celery выполните команду через pip:

pip install celery[redis]==5.3.1

После установки Celery вы можете создать файл tasks.py, в котором будет содержаться код для ваших задач.

Пример простой задачи в файле tasks.py:

from celery import Celery  
app = Celery(  
   'tasks',  
   broker='redis://localhost:6379/0',  
   backend='redis://localhost:6379/0'  
)  
@app.task  
def add(x, y):  
   return x + y
  • tasks — название нашего Celery-приложения. 
  • broker — URL брокера сообщений. 
  • backend — URL хранилища результатов; можно указать URL брокера. 

Данный код вы можете найти в репозитории с практической работой.

Для запуска рабочих процессов Celery, которые будут обрабатывать задачи, выполните команду:

celery -A tasks worker

Теперь очередь задач готова к работе. Вы можете добавлять задачи и выполнять их:

>>> from tasks import add  
>>> result = add.delay(4, 5)  
>>> result  
<AsyncResult: 6c6de6cb-4e95-4815-b02a-0d5d048b6e80>  
>>> result.get()  
9

Обзор возможностей

Celery даёт много возможностей для эффективной работы с очередями задач. Наиболее популярные из них, с которыми вы столкнётесь на практике:

  • Планирование задач.
  • Группы и цепочки задач.
  • Отслеживание состояния задач.

Остановимся на них подробнее.

Планирование задач

Планирование задач в Celery позволяет запускать задачи в определённое время или через определённые интервалы. 

Для этого используется Celery Beat — специальный инструмент для планирования и управления периодическими задачами. Celery Beat позволяет задать расписание выполнения задач.

Пример периодической задачи:

from random import random  
  
from celery import Celery  
from celery.schedules import crontab  
  
app = Celery(  
   'tasks',  
   broker='redis://localhost:6379/0',  
   backend='redis://localhost:6379/0'  
)  
  
@app.on_after_configure.connect  
def setup_periodic_tasks(sender, **kwargs):  
   sender.add_periodic_task(60, check_cat.s())  
   sender.add_periodic_task(  
       crontab(hour=7, minute=30, day_of_week=1),  
       check_cat.s()  
   )  
  
@app.task  
def check_cat():  
   if random() < 0.5:  
       print("Кот ничего не сломал.")  
   else:  
       print("Кот что-то сломал...")

В данном примере вы определяете периодическую задачу check_cat, которая будет выполняться каждую минуту и проверять, сломал ли кот что-либо. 

С помощью crontab запланировали ту же задачу check_cat, которая будет выполняться каждый понедельник в 07:30.

Для запуска сервиса Celery Beat запустите команду:

celery -A tasks beat

Также можно запустить beat внутри рабочего процесса:

celery -A tasks worker -B

Но делать это не рекомендуется, исходя из принципа единой ответственности. При изменении worker затрагивается и Celery Beat. Если что-то из этого упадёт, то упадёт всё сразу.

Группы и цепочки задач

Они позволяют объединять несколько задач и управлять их последовательным или параллельным выполнением.

Группы задач

Группы задач — это механизм для параллельного запуска группы задач и получения результатов отдельно для каждой из них.

Ниже приведён пример, в котором мы покупаем молоко и хлеб. Эти задачи группируются для параллельного выполнения.

@app.task  
def buy_milk(volume: int) -> int:  
   print(f'Покупаем {volume} литров молока')  
   return volume  
  
@app.task  
def buy_bread(count: int) -> int:  
   print(f'Покупаем {count} буханок хлеба')  
   return count  
  
from celery import group  
from tasks import buy_milk, buy_bread  
  
task1 = buy_milk.s(7)  
task2 = buy_bread.s(5)  
  
task_group = group(task1, task2)  
result = task_group.apply_async()  
  
results = result.get()  
print(results)  # [7, 5]

Цепочки задач

Цепочки задач — это механизм для последовательного запуска задач, где результат одной задачи передаётся в качестве аргумента следующей.

В примере ниже — результат задачи fetch_user_name передаётся на вход задаче greeting_user.

@app.task  
def fetch_user_name(id: int) -> str:  
   return f'Пётр {id}Первый'  
  
@app.task  
def greeting_user(name: str) -> str:  
   return f'Здравствуй, {name}!'  
  
from celery import chain  
from tasks import fetch_user_name, greeting_user  
  
task1 = fetch_user_name.s(1)  
task2 = greeting_user.s()  
  
task_chain = chain(task1 | task2)  
result = task_chain.apply_async()  
  
final_result = result.get()  
print(final_result)  # Здравствуй, Пётр Первый!

Отслеживание состояния задач

Celery позволяет отслеживать статус выполнения задач. При запуске асинхронной задачи с помощью apply_async() вы получаете объект AsyncResult, который позволяет отследить состояние задачи и получить её результаты по завершении.

В примере ниже мы получим ошибку, если на вход дадим строку. Такую ошибку можно найти с помощью проверки result.successful() и атрибута result.

@app.task  
def heavy_task(n: int) -> int:  
   result = 1  
   for i in range(2, n):  
       result *= i  
       time.sleep(0.01)  
   return result  
  
from tasks import heavy_task  
  
def get_factorial(arg):  
   result = heavy_task.apply_async(args=(arg,))  
  
   while not result.ready():  
       # Задача ещё выполняется  
       pass  
  
   if result.successful():  
       result_value = result.get()  
   else:  
       # Информация об ошибке  
       result_value = result.result  
  
   print(result_value)  
  
get_factorial(50)    
# 608281864034267560872252163321295376887552831379210240000000000  
get_factorial('Сейчас будет ошибка')    
# 'str' object cannot be interpreted as an integer

Таким образом, можно гибко манипулировать задачами, повышая производительность с помощью группировки и выстраивания задач в цепочки, а также повышая отказоустойчивость с помощью отслеживания состояния задач.


📂 Task Queue | Последнее изменение: 17.03.2024 10:36