Celery в помощь

Долго выполняющиеся задачи в вашем веб-приложении на Python? Celery – это распределенная очередь задач и очень мощный инструмент, используемый в щадящем режиме. Вот история о том, как мы использовали Celery для решения важной проблемы.
Мы создали Python-приложение с FastAPI. Один из методов занимал примерно 10-20 секунд на каждый запрос. На пике он должен был обрабатывать 250 тыс. запросов как можно быстрее.
В рабочем процессе метода предполагается чтение/запись из/в БД. Из-за ограниченного количества соединений с БД любые облачные сервисы, такие как AWS lambda, облачные функции GCP и функции Azure, не работают.
Рассмотренные решение:
- Многопоточность: Это может решить небольшое количество задач. GIL ограничивает его и не может использовать многопоточность, а также использует только одно ядро процессора. Приложение может аварийно завершиться из-за переполнения памяти из-за созданных и невыполненных потоков.
- Фоновая задача FastAPI: Она также ограничена системой. Даже после передачи нескольких задач, она обрабатывала только восемь за раз, а остальные терялись.
Мы не можем позволить себе терять обработку между развертываниями и перезапусками приложений.
Рассмотренное решение:
- Использование очереди сообщений: Это потребует написания отдельной кодовой базы для потребителя, чтобы обрабатывать поставленные в очередь задачи.
Это был критичный по времени проект, и окончательным решением было что-то, что могло бы обрабатывать запросы параллельно и использовать какой-то механизм очередей, чтобы избежать потери обработки данных. Это должно быть что-то с наименьшим количеством рефакторинга кода.
С Celery это было очень просто. Он поддерживает различные бэкенды очередей, такие как AWS SQS, Redis и RabbitMQ.
Для поддержки Celery нам потребовался минимальный рефакторинг кода. Нам нужен файл для инициализации Celery tasks.py . Импортировали долго выполняющиеся задачи под celery tasks.
# tasks.py
from celery import Celery
from services.tasks import long_running_task
app = Celery('hello', broker='redis://localhost:6379/0')
@app.task
def process_data():
long_running_task()
В маршрутизаторе мы должны импортировать задачу celery и вызвать метод delay(). Это поставит задачу в очередь сообщений. process_data.delay()
С другой стороны, нам нужно запустить приложение celery
celery -A tasks worker --loglevel=info concurrency 4

Все запросы от маршрутизатора ставятся в очередь и обрабатываются рабочим. Параллельность должна быть выбрана оптимально, в зависимости от конфигурации системы.
Даже если приложение аварийно завершается, оно подхватывает запросы из очереди при запуске. Оно обнаруживает других рабочих, подключенных к тому же бэкенду очереди, и распределяет задачи между всеми рабочими, что обеспечивает нам масштабируемость.
Так мы построили отказоустойчивую и распределенную систему обработки с минимальным рефакторингом кода.