Task-Сервер на «Flask» или как заставить несколько компьютеров решать одну задачу c Python
Основанием для разработки «Task-Сервера» — послужила задача по выгрузке видеозаписей с видеорегистраторов Hikvision. В работе была использована библиотека Hikload которая подключалась к регистраторам и загружала видеоролики.
Для начала немного о «Task-Сервере» (Рис. 1). В данном случае это небольшой сервер, хранящий в базе данных или файле заранее подготовленные строки, которые будут переданы в качестве параметров клиентам, подключенным к хосту. Клиент же в свою очередь запускает скрипт с полученными данными и после выполнения запрашивает новую задачу до тех пор, пока задачи не закончатся.
В качестве инструмента для создания такого сервера был выбран микрофреймворк Flask. Достаточно небольшого веб-приложения для моей задачи. Из плюсов Flask можно выделить простоту создания веб-приложений, отсутствие сложных настроек и достаточно стабильная и быстрая работа. В качестве хранилища для команд была выбрана таблица Excel. Все задачи были сформированы заранее.
Описание проблемы: Так как количество камер в разных помещениях, разных городов достаточно много, а время выгрузки с одного устройства сильно варьировалось, от нескольких минут до часа (Рис. 2), то было принято решение: на основе параметров для подключения к регистраторам сформировать задачи и распределить их на разных машинах, для чего и потребовался сервер.
Решение проблемы:
Начну работу с создания сервера, для начала создам новое окружение под приложение следующей командой в терминале:
python –m venv venv
Так как решение было использовать Flask то подключаю следующие модули:
- «Flask» — для создания веб-приложения
- «openpyxl» – непосредственно для работы с excel файлом, где находятся команды.
- «multiprocessing» — из этого модуля мне потребуются только очереди, так как они идеально подходят для нашей задачи
Для начала импортирую библиотеку Flask и инициализирую приложение:
from flask import Flask
app = Flask(__name__)
Импортирую остальные библиотеки:
from openpyxl import load_workbook
from multiprocessing import Queue
Создаю и наполняю базу данных задачами, которые сервер будет возвращать клиенту. Задачи сформированы заранее, они состоят из сервера/ ip – адреса, к которому подключаемся, user — логина, password — пароля, cameras — номера камеры. По умолчанию если не задавать дату и время — то Hikload выгружает видео за один текущий день. Полный список параметров для выгрузки видео можно узнать, набрав «hikload —help» (рис. 3)
Далее добавлю функцию, которая возвращает команду из базы данных. Все задачи из таблицы Excel выгружаются в Queue(очередь).
data = load_workbook('test.xlsx', 'wb')
sheet = data.active
que = Queue()
result = []
#выгрузка задач из таблицы Excel и запись в очередь
for row in sheet:
for cell in row:
result.append(cell.value)
que.put(result)
result = []
#выдача команд из очереди
def get_task():
return que.get()
#отправка команд клиентам. Декоратор «app.route» регистрирует URL-адрес, по которому будет доступно представление описанное ниже и вызывается экземпляром приложения app(Flask)
@app.route('/')
@app.route('/index')
def index():
return ' '.join(map(str, get_task()))
Весь код сервера представлен ниже:
from flask import Flask
app = Flask(__name__)
from openpyxl import load_workbook
from multiprocessing import Queue
data = load_workbook('test.xlsx', 'wb')
sheet = data.active
que = Queue()
result = []
#выгрузка задач из таблицы Excel и запись в очередь
for row in sheet:
for cell in row:
result.append(cell.value)
que.put(result)
result = []
#выдача команд из очереди
def get_task():
return que.get()
#отправка команд клиентам
@app.route('/')
@app.route('/index')
def index():
if not que.empty():
return ' '.join(map(str, get_task()))
else:
return
if __name__ == '__main__':
app.run(debug=False)
Сервер готов, очередь заполнена задачами, следующим пунктом будет разработка клиента для правильной интерпретации принятых команд и запуска соответствующих скриптов. Рецепт программы-клиента состоит из следующих ингредиентов:
- «Requests» для обращения к серверу и получения задачи.
def get_task():
for _ in range(2):
resp = rq.get('localhost')
if resp != “the end”:
tasks.append(list((resp.content.decode('utf-8').split(' '))))
else:
return None
return (tasks)
2. «Multiprocessing» для выполнения 2-х задач (можно и другое количество, зависящее от характеристик вашего устройства) параллельно.
if __name__ == '__main__':
while True:
tasks = [] #создаем пустой список для задач
response = get_task()
NUM_CORE = 2 #объявляем количество ядер
read = mp.Queue()
[read.put(x) for x in response] #записываем очередь командами
print("read qsize", read.qsize())
for i in range(NUM_CORE):
mp.Process(target=load, args=(read,)).start() #запускаем скрипт
mp.join()
while True:
time.sleep(1)
3. Модуль «os» непосредственно, для запуска Hikload.
def load(read):
while True:
if not read.empty(): #проверяем заполненность очереди
x = read.get()
name_proc = mp.current_process().name
print(f"{name_proc} stared") #выводим работающие процессы
task = (f'hikload --server={x[2]} '
f'--user={x[0]} '
f'--password={x[1]} '
f'--cameras={x[3]} '
f'--downloads {x[2]+ "_" + name_proc}'
)
print(task)
'''Запуск скрипта с нашими командами'''
os.system(task)
else:
break
Код клиента целиком выглядит следующим образом:
import multiprocessing as mp, os, time, requests as rq
#получаем задачи с сервера
def get_task():
for _ in range(2):
resp = rq.get('http://127.0.0.1:5000/')
if resp != 'the end':
tasks.append(list((resp.content.decode('utf-8').split(' '))))
else:
return None
return (tasks)
#запуск выгрузки видеозаписей
def load(read):
while True:
if not read.empty(): #проверяем заполненность очереди
x = read.get()
name_proc = mp.current_process().name
print(f"{name_proc} stared") #выводим работающие процессы
task = (f'hikload --server={x[2]} '
f'--user={x[0]} '
f'--password={x[1]} '
f'--cameras={x[3] '
f'--downloads {x[2]+ "_" + name_proc}'
)
print(task)
'''Запуск скрипта с нашими командами'''
os.system(task)
else:
break
if __name__ == '__main__':
while True:
tasks = []
response = get_task()
NUM_CORE = 2 #объявляем количество ядер
read = mp.Queue()
get_task() #вызываем функцию загрузки задач
[read.put(x) for x in tasks] #записываем очередь командами
print("read qsize", read.qsize())
for i in range(NUM_CORE):
mp.Process(target=load, args=(read,)).start() #запускаем скрипт
mp.join()
while True:
time.sleep(1)
Для начала запускаю сервер (Рис. 4) на локальном хосте, следующей командой: flask run
Далее запускаю клиент, команда: python main.py:
Теперь видно, что очередь автоматически заполнилась 2-мя задачами (Рис. 5) и клиент начал выгрузку, в это время сервер показывает, что передал 2 команды, код 200 в ответе говорит об успешной обработке запроса (Рис. 6)
Под каждую команду создаются отдельные папки, где хранятся выгруженные видеозаписи, названия формируются параметром —download (рис.7). После отработки/выгрузки команды – клиент снова посылает запрос на сервер и получает новую команду.
В результате реализовано автоматизированное выполнение задачи, разделенной на множество подзадач. На каждом клиенте выполняются 2 выгрузки параллельно (рис.8)
Из явных преимуществ такого подхода можно выделить:
- Загрузка видеозаписей проходит быстрее с каждым подключенным клиентом
- Все задачи распределяются автоматически
- Сервер прост в разработке, многофункционален и его можно использовать для любых, похожих на показанной в примере задач
https://t.me/python_job_interview – python собеседования