Учебник по запуску задач с помощью Celery и Flask.

Что такое Celery?

Celery – это надежная система распределенных очередей задач, широко используемая для управления и выполнения задач в фоновом режиме. Это универсальный инструмент, способный решать как простые асинхронные задачи, так и сложные многоэтапные рабочие процессы по расписанию.

Прежде чем двигаться дальше, давайте познакомимся с важными компонентами сельдерея.

Для эффективного выполнения задачи Celery необходимы два компонента: URL брокера и URL бэкенда.

URL-адрес брокера служит важным коммуникационным звеном между вашим приложением и рабочими станциями Celery. Выполняя роль очереди, он хранит задания из вашего приложения для асинхронной обработки. Например, когда приложение сталкивается с ресурсоемкими задачами, такими как отправка электронной почты или обработка данных, оно помещает эти задачи в брокер. Затем рабочие Celery извлекают и выполняют эти задачи, обеспечивая высокую скорость отклика и эффективность работы приложения.

При этом бэкэнд URL играет роль хранилища результатов выполнения задач. После того как рабочий Celery обработает задание, его результат сохраняется в бэкенде. Эта возможность позволяет приложению впоследствии получать результаты выполнения задачи, обеспечивая бесперебойную работу пользователей. В данном учебном пособии мы будем использовать Redis, однако существуют различные варианты как для брокера, так и для бэкенда.

Redis часто предпочитают использовать в качестве брокера и бэкенда Celery благодаря его быстрому хранению в памяти, возможности постановки задач в очередь (при реализации в виде очереди) и надежному хранению результатов.

Хотя Redis отличается простотой и эффективностью, другие варианты, такие как RabbitMQ, Amazon SQS или Apache Kafka, также могут быть выбраны с учетом таких факторов, как масштабируемость и интеграция с вашей средой. Выбранный вариант существенно влияет на способ управления задачами и хранения результатов, обеспечивая гибкость при настройке Celery под конкретные требования.

Руководство по запуску Celery с помощью Flask:

Шаг 1: Установка Redis:

Сначала нам необходимо установить Redis в нашу систему. Для этого воспользуйтесь данной ссылкой.

Шаг 2: Добавление всех зависимостей для запуска задачи celery

Создайте новую папку с любым именем по вашему выбору. В данном случае мы можем назвать ее CeleryTutorial.

Создадим виртуальную среду и установим в нее все необходимые зависимости.

//Create the virtual environment
python3 -m venv venv

//Activate the virtual environment
source venv/bin/activate

//Install celery 
pip install celery

//Install Flask
pip install Flask

Шаг 3: Настройка Celery во Flask.

Создайте python-файл config.py и добавьте в него следующий код.

from celery import Celery, Task
from flask import Flask

def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app

В функции celery_init_app мы выполняем инициализацию и настройку Celery в нашем приложении Flask. Мы определяем пользовательский класс FlaskTask, который расширяет класс Celery’s Task. Этот класс обеспечивает выполнение задач в контексте нашего приложения, позволяя им получать доступ к ресурсам приложения. Затем мы создаем экземпляр Celery с именем celery_app, указывая имя приложения и наш класс FlaskTask для выполнения задач. Мы загружаем конфигурацию Celery из настроек CELERY нашего приложения и устанавливаем celery_app в качестве экземпляра Celery по умолчанию. Храня celery_app в расширениях нашего приложения, мы возвращаем настроенный экземпляр celery_app.

В том же файле добавьте этот код.

def create_app() -> Flask:
    app = Flask(__name__)
    app.config.from_mapping(
        CELERY=dict(
            broker_url="redis://localhost",
            result_backend="redis://localhost",
            task_ignore_result=True,
        ),
    )
    app.config.from_prefixed_env()
    celery_init_app(app)
    return app

В функции фабричного паттерна create_app мы начинаем с создания экземпляра приложения Flask. Мы настраиваем начальные параметры, включая конфигурации, связанные с Celery, такие как URL-адрес брокера и бэкэнд результатов, использующий Redis. Для большей гибкости конфигурации мы загружаем дополнительные параметры из переменных окружения с префиксом. Далее мы инициализируем Celery, вызывая нашу функцию celery_init_app и передавая ей экземпляр приложения Flask. Наконец, мы возвращаем полностью сконфигурированное приложение Flask.

Шаг 4: Определение задач.

При использовании паттерна фабрики приложений Flask с Celery декоратор @celery_app.task не является идеальным, поскольку не имеет доступа к объекту celery_app и может привязывать задачи к конкретным экземплярам. Вместо этого следует использовать декоратор @shared_task от Celery. Этот декоратор создает задачи, которые адаптируются к контексту “текущего приложения”, аналогично контексту приложений во Flask. Задавая параметр celery_app.set_default(), задачи используют соответствующий контекст приложения, что улучшает совместимость с паттерном фабрики и повышает гибкость тестирования и настройки.

Создайте файл tasks.py

from config import create_app #-Line 1
from celery import shared_task 
from time import sleep

flask_app = create_app()  #-Line 2
celery_app = flask_app.extensions["celery"] #-Line 3

@shared_task(ignore_result=False) #-Line 4
def long_running_task(iterations) -> int:#-Line 5
    result = 0
    for i in range(iterations):
        result += i
        sleep(2) 
    return result #-Line 6

Давайте разберем этот код:

  1. Мы импортируем функцию create_app из модуля config, который содержит конфигурации Flask из модуля config.(строка 1)
  2. В строке 2 мы создаем экземпляр приложения flask с помощью функции create_app() и сохраняем его в объекте flask_app.
  3. Аналогично, в строке 3 мы создаем экземпляр celery_app, используя объект flask_app.
  4. В строке 4 мы используем декоратор @shared_task для определения задачи, а также устанавливаем ignore_result=False, поскольку нам необходимо сохранить этот результат в redis.
  5. В строке 5 мы определяем долго выполняющуюся задачу, которая будет выполняться n раз в зависимости от ввода пользователя и при этом будет спать 2 секунды.
  6. Эта задача будет возвращать результат, основанный на вводимых данных.(в строке 6)

Шаг 5: Вызов задач с помощью Flask

Теперь создадим новый файл app.py, который будет содержать маршруты во Flask.

from tasks import flask_app, long_running_task #-Line 1
from celery.result import AsyncResult#-Line 2
from flask import request,jsonify 

@flask_app.post("/trigger_task")
def start_task() -> dict[str, object]:
    iterations = request.args.get('iterations')
    print(iterations)
    result = long_running_task.delay(int(iterations))#-Line 3
    return {"result_id": result.id}

@flask_app.get("/get_result")
def task_result() -> dict[str, object]:
    result_id = request.args.get('result_id')
    result = AsyncResult(result_id)#-Line 4
    if result.ready():#-Line 5
        # Task has completed
        if result.successful():#-Line 6
    
            return {
                "ready": result.ready(),
                "successful": result.successful(),
                "value": result.result,#-Line 7
            }
        else:
        # Task completed with an error
            return jsonify({'status': 'ERROR', 'error_message': str(result.result)})
    else:
        # Task is still pending
        return jsonify({'status': 'Running'})

if __name__ == "__main__":
    flask_app.run(debug=True)
  1. В строке 1 импортируются объект flask_app и функция long_running_task из модуля tasks. Объект flask_app, созданный в модуле tasks, хранит конфигурацию приложения Flask, позволяя нам определять маршруты в приложении.
  2. В строке 2 импортируется класс AsyncResult из модуля celery.result. Класс AsyncResult используется для получения результата задачи Celery по ее идентификатору.
  3. Строка 3 инициирует асинхронное выполнение задачи Celery long_running_task. Метод .delay() планирует выполнение задачи и возвращает экземпляр AsyncResult, который представляет собой состояние и результат выполнения задачи. В качестве аргумента задаче передается количество итераций.
  4. Здесь (строка 4) экземпляр AsyncResult создается по result_id, полученному из параметров запроса. Это позволяет отслеживать состояние и получать результат выполнения конкретной задачи по ее идентификатору.
  5. В строке 5 проверяется, готова ли задача, представленная экземпляром result, к сбору. Метод ready() возвращает True, если выполнение задачи завершено и результат может быть получен.
  6. В строке 6 проверяется, успешно ли завершилось выполнение задачи. Она возвращает True, если задача выполнена без каких-либо исключений или ошибок.
  7. В строке 7 результат выполнения задачи присваивается ключу value в словаре ответа с помощью result.result.

В целом данный код демонстрирует, как создать маршруты в приложении Flask для запуска и мониторинга долго выполняющихся задач с помощью Celery. Задача long_running_task запускается асинхронно, и ее ход и результат можно отследить с помощью класса AsyncResult. Ответы предоставляют информацию о статусе выполнения задачи и ее результате.

Шаг 7: Посмотрим, как это запустить.

  1. Сначала мы запустим наш сервер redis. Используйте команду, соответствующую вашей операционной системе.
redis-server

В MacOS для запуска сервера используется команда redis-server.

2. После запуска сервера redis запустим рабочий celery.

celery -A tasks worker --loglevel INFO

В качестве имени рабочего используйте tasks, так как в нем хранятся задачи, которые нам нужно запустить, а также конфигурация celery.

  1. Запустите приложение flask.

После запуска приложения flask мы можем проверить API с помощью Postman или любого другого инструмента по вашему выбору.

Удачного кодирования!

+1
0
+1
0
+1
0
+1
0
+1
1

Ответить

Ваш адрес email не будет опубликован. Обязательные поля помечены *