Task-Сервер на «Flask» или как заставить несколько компьютеров решать одну задачу c Python

Основанием для разработки «Task-Сервера» — послужила задача по выгрузке видеозаписей с видеорегистраторов Hikvision. В работе была использована библиотека Hikload которая подключалась к регистраторам и загружала видеоролики.

Для начала немного о «Task-Сервере» (Рис. 1). В данном случае это небольшой сервер, хранящий в базе данных или файле заранее подготовленные строки, которые будут переданы в качестве параметров клиентам, подключенным к хосту. Клиент же в свою очередь запускает скрипт с полученными данными и после выполнения запрашивает новую задачу до тех пор, пока задачи не закончатся.

Рис. 1 – схема Task-Сервера

В качестве инструмента для создания такого сервера был выбран микрофреймворк Flask. Достаточно небольшого веб-приложения для моей задачи. Из плюсов Flask можно выделить простоту создания веб-приложений, отсутствие сложных настроек и достаточно стабильная и быстрая работа.  В качестве хранилища для команд была выбрана таблица Excel. Все задачи были сформированы заранее.

Описание проблемы: Так как количество камер в разных помещениях, разных городов достаточно много, а время выгрузки с одного устройства сильно варьировалось, от нескольких минут до часа (Рис. 2), то было принято решение: на основе параметров для подключения к регистраторам сформировать задачи и распределить их на разных машинах, для чего и потребовался сервер.

Рис. 2 Процесс выгрузки видео

Решение проблемы:

Начну работу с создания сервера, для начала создам новое окружение под приложение следующей командой в терминале:

python –m venv venv

Так как решение было использовать Flask то подключаю следующие модули:

  1. «Flask» — для создания веб-приложения
  2. «openpyxl» – непосредственно для работы с excel файлом, где находятся команды.
  3. «multiprocessing» — из этого модуля мне потребуются только очереди, так как они идеально подходят для нашей задачи

Для начала импортирую библиотеку Flask и инициализирую приложение:

from flask import Flask
app = Flask(__name__)

Импортирую остальные библиотеки:

from openpyxl import load_workbook
from multiprocessing import Queue

Создаю и наполняю базу данных задачами, которые сервер будет возвращать клиенту. Задачи сформированы заранее, они состоят из сервера/ ip – адреса, к которому подключаемся, user — логина, password — пароля, cameras — номера камеры. По умолчанию если не задавать дату и время — то Hikload выгружает видео за один текущий день. Полный список параметров для выгрузки видео можно узнать, набрав «hikload —help» (рис. 3)

Рис. 3 Все параметры для выгрузки с Hikload

Далее добавлю функцию, которая возвращает команду из базы данных. Все задачи из таблицы Excel выгружаются в Queue(очередь).

data = load_workbook('test.xlsx', 'wb') 
sheet = data.active
que = Queue()
result = []

#выгрузка задач из таблицы Excel и запись в очередь
for row in sheet:
    for cell in row:
        result.append(cell.value)
    que.put(result)
    result = []

#выдача команд из очереди    
def get_task():
    return que.get()

#отправка команд клиентам. Декоратор «app.route» регистрирует URL-адрес, по которому будет доступно представление описанное ниже и вызывается экземпляром приложения app(Flask)   
@app.route('/')
@app.route('/index')
def index():
	return ' '.join(map(str, get_task()))

Весь код сервера представлен ниже:

from flask import Flask

app = Flask(__name__)

from openpyxl import load_workbook
from multiprocessing import Queue

data = load_workbook('test.xlsx', 'wb') 
sheet = data.active
que = Queue()
result = []

#выгрузка задач из таблицы Excel и запись в очередь
for row in sheet:
    for cell in row:
        result.append(cell.value)
    que.put(result)
    result = []

#выдача команд из очереди    
def get_task():
    return que.get()

#отправка команд клиентам    
@app.route('/')
@app.route('/index')
def index():
    if not que.empty():
        return ' '.join(map(str, get_task()))
    else:
        return

if __name__ == '__main__':
    app.run(debug=False)

Сервер готов, очередь заполнена задачами, следующим пунктом будет разработка клиента для правильной интерпретации принятых команд и запуска соответствующих скриптов. Рецепт программы-клиента состоит из следующих ингредиентов:

  1. «Requests» для обращения к серверу и получения задачи.
def get_task():
    for _ in range(2):
        resp = rq.get('localhost')
        if resp != “the end”:
            tasks.append(list((resp.content.decode('utf-8').split(' '))))
        else:
            return None
    return (tasks)

2. «Multiprocessing» для выполнения 2-х задач (можно и другое количество, зависящее от характеристик вашего устройства) параллельно.

if __name__ == '__main__':
    while True:
	   tasks = [] #создаем пустой список для задач
        response = get_task()
        NUM_CORE = 2 #объявляем количество ядер
        read = mp.Queue()
        [read.put(x) for x in response] #записываем очередь командами
        print("read qsize", read.qsize()) 
        for i in range(NUM_CORE):   
            mp.Process(target=load, args=(read,)).start() #запускаем скрипт
		 mp.join()
        while True:
            time.sleep(1)

3. Модуль «os» непосредственно, для запуска Hikload.

def load(read):
    while True:
        if not read.empty(): #проверяем заполненность очереди
            x = read.get() 
            name_proc = mp.current_process().name
            print(f"{name_proc} stared") #выводим работающие процессы
            task = (f'hikload --server={x[2]} ' 
                      f'--user={x[0]} '
                      f'--password={x[1]} '
                      f'--cameras={x[3]} '
                      f'--downloads {x[2]+ "_" + name_proc}'
                    )
            print(task)
            '''Запуск скрипта с нашими командами'''
            os.system(task)
        else: 
            break

Код клиента целиком выглядит следующим образом:

import multiprocessing as mp, os, time, requests as rq

#получаем задачи с сервера
def get_task():
    for _ in range(2):
        resp = rq.get('http://127.0.0.1:5000/')
        if resp != 'the end':
            tasks.append(list((resp.content.decode('utf-8').split(' '))))
        else:
            return None
    return (tasks)

#запуск выгрузки видеозаписей      
def load(read):
    while True:
        if not read.empty(): #проверяем заполненность очереди
            x = read.get() 
            name_proc = mp.current_process().name
            print(f"{name_proc} stared") #выводим работающие процессы
            task = (f'hikload --server={x[2]} ' 
                      f'--user={x[0]} '
                      f'--password={x[1]} '
                      f'--cameras={x[3] '
                      f'--downloads {x[2]+ "_" + name_proc}'
                    )
            print(task)
            '''Запуск скрипта с нашими командами'''
            os.system(task)
        else: 
            break

if __name__ == '__main__':
    while True:
        tasks = []
        response = get_task()
        NUM_CORE = 2 #объявляем количество ядер
        read = mp.Queue()
        get_task() #вызываем функцию загрузки задач
        [read.put(x) for x in tasks] #записываем очередь командами
        print("read qsize", read.qsize()) 
        for i in range(NUM_CORE):   
            mp.Process(target=load, args=(read,)).start() #запускаем скрипт
            mp.join()
        while True:
            time.sleep(1)

Для начала запускаю сервер (Рис. 4) на локальном хосте, следующей командой: flask run

Рис. 4 Запуск сервера на локальном хосте

Далее запускаю клиент, команда: python main.py:

Рис. 5 Запуск клиента и начало загрузки видеозаписей

Теперь видно, что очередь автоматически заполнилась 2-мя задачами (Рис. 5) и клиент начал выгрузку, в это время сервер показывает, что передал 2 команды, код 200 в ответе говорит об успешной обработке запроса (Рис. 6)

Рис. 6 Ответ сервера на запрос

Под каждую команду создаются отдельные папки, где хранятся выгруженные видеозаписи, названия формируются параметром —download (рис.7). После отработки/выгрузки команды – клиент снова посылает запрос на сервер и получает новую команду.

Рис. 7 Папки с выгруженными данными

В результате реализовано автоматизированное выполнение задачи, разделенной на множество подзадач. На каждом клиенте выполняются 2 выгрузки параллельно (рис.8)

Рис. 8 Выполнение задач в командной строке

Из явных преимуществ такого подхода можно выделить:

  1. Загрузка видеозаписей проходит быстрее с каждым подключенным клиентом
  2. Все задачи распределяются автоматически
  3. Сервер прост в разработке, многофункционален и его можно использовать для любых, похожих на показанной в примере задач

https://t.me/python_job_interview – python собеседования

источник

Ответить