Celery в помощь

Долго выполняющиеся задачи в вашем веб-приложении на Python? Celery – это распределенная очередь задач и очень мощный инструмент, используемый в щадящем режиме. Вот история о том, как мы использовали Celery для решения важной проблемы.

Мы создали Python-приложение с FastAPI. Один из методов занимал примерно 10-20 секунд на каждый запрос. На пике он должен был обрабатывать 250 тыс. запросов как можно быстрее.

В рабочем процессе метода предполагается чтение/запись из/в БД. Из-за ограниченного количества соединений с БД любые облачные сервисы, такие как AWS lambda, облачные функции GCP и функции Azure, не работают.

Рассмотренные решение:

  • Многопоточность: Это может решить небольшое количество задач. GIL ограничивает его и не может использовать многопоточность, а также использует только одно ядро процессора. Приложение может аварийно завершиться из-за переполнения памяти из-за созданных и невыполненных потоков.
  • Фоновая задача FastAPI: Она также ограничена системой. Даже после передачи нескольких задач, она обрабатывала только восемь за раз, а остальные терялись.

Мы не можем позволить себе терять обработку между развертываниями и перезапусками приложений.

Рассмотренное решение:

  • Использование очереди сообщений: Это потребует написания отдельной кодовой базы для потребителя, чтобы обрабатывать поставленные в очередь задачи.

Это был критичный по времени проект, и окончательным решением было что-то, что могло бы обрабатывать запросы параллельно и использовать какой-то механизм очередей, чтобы избежать потери обработки данных. Это должно быть что-то с наименьшим количеством рефакторинга кода.

С Celery это было очень просто. Он поддерживает различные бэкенды очередей, такие как AWS SQS, Redis и RabbitMQ.

Для поддержки Celery нам потребовался минимальный рефакторинг кода. Нам нужен файл для инициализации Celery tasks.py . Импортировали долго выполняющиеся задачи под celery tasks.

# tasks.py

from celery import Celery
from services.tasks import long_running_task

app = Celery('hello', broker='redis://localhost:6379/0')

@app.task
def process_data():
    long_running_task() 

В маршрутизаторе мы должны импортировать задачу celery и вызвать метод delay(). Это поставит задачу в очередь сообщений. process_data.delay()

С другой стороны, нам нужно запустить приложение celery

celery -A tasks worker --loglevel=info concurrency 4
Celery в помощь

Все запросы от маршрутизатора ставятся в очередь и обрабатываются рабочим. Параллельность должна быть выбрана оптимально, в зависимости от конфигурации системы.

Даже если приложение аварийно завершается, оно подхватывает запросы из очереди при запуске. Оно обнаруживает других рабочих, подключенных к тому же бэкенду очереди, и распределяет задачи между всеми рабочими, что обеспечивает нам масштабируемость.

Так мы построили отказоустойчивую и распределенную систему обработки с минимальным рефакторингом кода.

+1
0
+1
0
+1
0
+1
0
+1
0

Ответить

Ваш адрес email не будет опубликован. Обязательные поля помечены *