Учебник по запуску задач с помощью Celery и Flask.
Что такое Celery?
Celery – это надежная система распределенных очередей задач, широко используемая для управления и выполнения задач в фоновом режиме. Это универсальный инструмент, способный решать как простые асинхронные задачи, так и сложные многоэтапные рабочие процессы по расписанию.
Прежде чем двигаться дальше, давайте познакомимся с важными компонентами сельдерея.
Для эффективного выполнения задачи Celery необходимы два компонента: URL брокера и URL бэкенда.
URL-адрес брокера служит важным коммуникационным звеном между вашим приложением и рабочими станциями Celery. Выполняя роль очереди, он хранит задания из вашего приложения для асинхронной обработки. Например, когда приложение сталкивается с ресурсоемкими задачами, такими как отправка электронной почты или обработка данных, оно помещает эти задачи в брокер. Затем рабочие Celery извлекают и выполняют эти задачи, обеспечивая высокую скорость отклика и эффективность работы приложения.
При этом бэкэнд URL играет роль хранилища результатов выполнения задач. После того как рабочий Celery обработает задание, его результат сохраняется в бэкенде. Эта возможность позволяет приложению впоследствии получать результаты выполнения задачи, обеспечивая бесперебойную работу пользователей. В данном учебном пособии мы будем использовать Redis, однако существуют различные варианты как для брокера, так и для бэкенда.
Redis часто предпочитают использовать в качестве брокера и бэкенда Celery благодаря его быстрому хранению в памяти, возможности постановки задач в очередь (при реализации в виде очереди) и надежному хранению результатов.
Хотя Redis отличается простотой и эффективностью, другие варианты, такие как RabbitMQ, Amazon SQS или Apache Kafka, также могут быть выбраны с учетом таких факторов, как масштабируемость и интеграция с вашей средой. Выбранный вариант существенно влияет на способ управления задачами и хранения результатов, обеспечивая гибкость при настройке Celery под конкретные требования.
Руководство по запуску Celery с помощью Flask:
Шаг 1: Установка Redis:
Сначала нам необходимо установить Redis в нашу систему. Для этого воспользуйтесь данной ссылкой.
Шаг 2: Добавление всех зависимостей для запуска задачи celery
Создайте новую папку с любым именем по вашему выбору. В данном случае мы можем назвать ее CeleryTutorial.
Создадим виртуальную среду и установим в нее все необходимые зависимости.
//Create the virtual environment
python3 -m venv venv
//Activate the virtual environment
source venv/bin/activate
//Install celery
pip install celery
//Install Flask
pip install Flask
Шаг 3: Настройка Celery во Flask.
Создайте python-файл config.py и добавьте в него следующий код.
from celery import Celery, Task
from flask import Flask
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
В функции celery_init_app мы выполняем инициализацию и настройку Celery в нашем приложении Flask. Мы определяем пользовательский класс FlaskTask, который расширяет класс Celery’s Task. Этот класс обеспечивает выполнение задач в контексте нашего приложения, позволяя им получать доступ к ресурсам приложения. Затем мы создаем экземпляр Celery с именем celery_app, указывая имя приложения и наш класс FlaskTask для выполнения задач. Мы загружаем конфигурацию Celery из настроек CELERY нашего приложения и устанавливаем celery_app в качестве экземпляра Celery по умолчанию. Храня celery_app в расширениях нашего приложения, мы возвращаем настроенный экземпляр celery_app.
В том же файле добавьте этот код.
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app
В функции фабричного паттерна create_app мы начинаем с создания экземпляра приложения Flask. Мы настраиваем начальные параметры, включая конфигурации, связанные с Celery, такие как URL-адрес брокера и бэкэнд результатов, использующий Redis. Для большей гибкости конфигурации мы загружаем дополнительные параметры из переменных окружения с префиксом. Далее мы инициализируем Celery, вызывая нашу функцию celery_init_app и передавая ей экземпляр приложения Flask. Наконец, мы возвращаем полностью сконфигурированное приложение Flask.
Шаг 4: Определение задач.
При использовании паттерна фабрики приложений Flask с Celery декоратор @celery_app.task не является идеальным, поскольку не имеет доступа к объекту celery_app и может привязывать задачи к конкретным экземплярам. Вместо этого следует использовать декоратор @shared_task от Celery. Этот декоратор создает задачи, которые адаптируются к контексту “текущего приложения”, аналогично контексту приложений во Flask. Задавая параметр celery_app.set_default(), задачи используют соответствующий контекст приложения, что улучшает совместимость с паттерном фабрики и повышает гибкость тестирования и настройки.
Создайте файл tasks.py
from config import create_app #-Line 1
from celery import shared_task
from time import sleep
flask_app = create_app() #-Line 2
celery_app = flask_app.extensions["celery"] #-Line 3
@shared_task(ignore_result=False) #-Line 4
def long_running_task(iterations) -> int:#-Line 5
result = 0
for i in range(iterations):
result += i
sleep(2)
return result #-Line 6
Давайте разберем этот код:
- Мы импортируем функцию create_app из модуля config, который содержит конфигурации Flask из модуля config.(строка 1)
- В строке 2 мы создаем экземпляр приложения flask с помощью функции create_app() и сохраняем его в объекте flask_app.
- Аналогично, в строке 3 мы создаем экземпляр celery_app, используя объект flask_app.
- В строке 4 мы используем декоратор @shared_task для определения задачи, а также устанавливаем ignore_result=False, поскольку нам необходимо сохранить этот результат в redis.
- В строке 5 мы определяем долго выполняющуюся задачу, которая будет выполняться n раз в зависимости от ввода пользователя и при этом будет спать 2 секунды.
- Эта задача будет возвращать результат, основанный на вводимых данных.(в строке 6)
Шаг 5: Вызов задач с помощью Flask
Теперь создадим новый файл app.py, который будет содержать маршруты во Flask.
from tasks import flask_app, long_running_task #-Line 1
from celery.result import AsyncResult#-Line 2
from flask import request,jsonify
@flask_app.post("/trigger_task")
def start_task() -> dict[str, object]:
iterations = request.args.get('iterations')
print(iterations)
result = long_running_task.delay(int(iterations))#-Line 3
return {"result_id": result.id}
@flask_app.get("/get_result")
def task_result() -> dict[str, object]:
result_id = request.args.get('result_id')
result = AsyncResult(result_id)#-Line 4
if result.ready():#-Line 5
# Task has completed
if result.successful():#-Line 6
return {
"ready": result.ready(),
"successful": result.successful(),
"value": result.result,#-Line 7
}
else:
# Task completed with an error
return jsonify({'status': 'ERROR', 'error_message': str(result.result)})
else:
# Task is still pending
return jsonify({'status': 'Running'})
if __name__ == "__main__":
flask_app.run(debug=True)
- В строке 1 импортируются объект flask_app и функция long_running_task из модуля tasks. Объект flask_app, созданный в модуле tasks, хранит конфигурацию приложения Flask, позволяя нам определять маршруты в приложении.
- В строке 2 импортируется класс AsyncResult из модуля celery.result. Класс AsyncResult используется для получения результата задачи Celery по ее идентификатору.
- Строка 3 инициирует асинхронное выполнение задачи Celery long_running_task. Метод .delay() планирует выполнение задачи и возвращает экземпляр AsyncResult, который представляет собой состояние и результат выполнения задачи. В качестве аргумента задаче передается количество итераций.
- Здесь (строка 4) экземпляр AsyncResult создается по result_id, полученному из параметров запроса. Это позволяет отслеживать состояние и получать результат выполнения конкретной задачи по ее идентификатору.
- В строке 5 проверяется, готова ли задача, представленная экземпляром result, к сбору. Метод ready() возвращает True, если выполнение задачи завершено и результат может быть получен.
- В строке 6 проверяется, успешно ли завершилось выполнение задачи. Она возвращает True, если задача выполнена без каких-либо исключений или ошибок.
- В строке 7 результат выполнения задачи присваивается ключу value в словаре ответа с помощью result.result.
В целом данный код демонстрирует, как создать маршруты в приложении Flask для запуска и мониторинга долго выполняющихся задач с помощью Celery. Задача long_running_task запускается асинхронно, и ее ход и результат можно отследить с помощью класса AsyncResult. Ответы предоставляют информацию о статусе выполнения задачи и ее результате.
Шаг 7: Посмотрим, как это запустить.
- Сначала мы запустим наш сервер redis. Используйте команду, соответствующую вашей операционной системе.
redis-server
В MacOS для запуска сервера используется команда redis-server.
2. После запуска сервера redis запустим рабочий celery.
celery -A tasks worker --loglevel INFO
В качестве имени рабочего используйте tasks, так как в нем хранятся задачи, которые нам нужно запустить, а также конфигурация celery.
- Запустите приложение flask.
После запуска приложения flask мы можем проверить API с помощью Postman или любого другого инструмента по вашему выбору.
Удачного кодирования!