Внутреннее устройство
Celery состоит из нескольких компонентов, которые взаимодействуют между собой для обработки задач:
- Брокер сообщений — это механизм, который позволяет передавать задачи от приложения к рабочим процессам Celery. Вы можете выбрать различные брокеры, такие как RabbitMQ, Redis или Amazon SQS, в зависимости от ваших потребностей.
- Рабочие процессы — это отдельные процессы, которые слушают брокер сообщений и выполняют поступающие в очередь задачи. Вы можете запускать несколько рабочих процессов, чтобы распределить нагрузку и обеспечить параллельное выполнение задач.
- Задачи — это функции, которые вы определяете в вашем файле tasks.py. Они могут быть выполнены асинхронно рабочими процессами.
- Результаты. Celery позволяет получать результаты выполнения задач. Вы можете выбрать различные бэкенды для хранения результатов, такие как Redis, база данных или файловая система.
Celery использует механизмы сериализации данных для передачи задач и результатов между компонентами.
Установка и запуск
Для работы с Celery понадобится установить брокер сообщений. Брокер сообщений — это специальная служба, к которой подключаются отправители и получатели данных. В роли брокера могут выступать Redis, RabbitMQ и Amazon SQS. В качестве примера возьмём Redis, так как он является простым и наиболее используемым.
Redis — одно из самых популярных хранилищ данных в памяти, которое используется для различных задач, включая:
- кеширование,
- сессионное хранилище,
- очереди заданий и многое другое.
Давайте установим и запустим Redis с помощью Docker.
- Чтобы запустить контейнер Redis, выполните команду:
Для установки Celery выполните команду через pip:
pip install celery[redis]==5.3.1
После установки Celery вы можете создать файл tasks.py
, в котором будет содержаться код для ваших задач.
Пример простой задачи в файле tasks.py
:
- tasks — название нашего Celery-приложения.
- broker — URL брокера сообщений.
- backend — URL хранилища результатов; можно указать URL брокера.
Данный код вы можете найти в репозитории с практической работой.
Для запуска рабочих процессов Celery, которые будут обрабатывать задачи, выполните команду:
Теперь очередь задач готова к работе. Вы можете добавлять задачи и выполнять их:
Обзор возможностей
Celery даёт много возможностей для эффективной работы с очередями задач. Наиболее популярные из них, с которыми вы столкнётесь на практике:
- Планирование задач.
- Группы и цепочки задач.
- Отслеживание состояния задач.
Остановимся на них подробнее.
Планирование задач
Планирование задач в Celery позволяет запускать задачи в определённое время или через определённые интервалы.
Для этого используется Celery Beat — специальный инструмент для планирования и управления периодическими задачами. Celery Beat позволяет задать расписание выполнения задач.
Пример периодической задачи:
В данном примере вы определяете периодическую задачу check_cat
, которая будет выполняться каждую минуту и проверять, сломал ли кот что-либо.
С помощью crontab запланировали ту же задачу check_cat
, которая будет выполняться каждый понедельник в 07:30.
Для запуска сервиса Celery Beat запустите команду:
Также можно запустить beat внутри рабочего процесса:
Но делать это не рекомендуется, исходя из принципа единой ответственности. При изменении worker затрагивается и Celery Beat. Если что-то из этого упадёт, то упадёт всё сразу.
Группы и цепочки задач
Они позволяют объединять несколько задач и управлять их последовательным или параллельным выполнением.
Группы задач
Группы задач — это механизм для параллельного запуска группы задач и получения результатов отдельно для каждой из них.
Ниже приведён пример, в котором мы покупаем молоко и хлеб. Эти задачи группируются для параллельного выполнения.
Цепочки задач
Цепочки задач — это механизм для последовательного запуска задач, где результат одной задачи передаётся в качестве аргумента следующей.
В примере ниже — результат задачи fetch_user_name
передаётся на вход задаче greeting_user
.
Отслеживание состояния задач
Celery позволяет отслеживать статус выполнения задач. При запуске асинхронной задачи с помощью apply_async()
вы получаете объект AsyncResult
, который позволяет отследить состояние задачи и получить её результаты по завершении.
В примере ниже мы получим ошибку, если на вход дадим строку. Такую ошибку можно найти с помощью проверки result.successful()
и атрибута result
.
Таким образом, можно гибко манипулировать задачами, повышая производительность с помощью группировки и выстраивания задач в цепочки, а также повышая отказоустойчивость с помощью отслеживания состояния задач.
📂 Task Queue | Последнее изменение: 17.03.2024 10:36