Написание быстрых асинхронных HTTP-Запросов с Python

НАПИСАНИЕ БЫСТРОГО ASYNC HTTP-ЗАПРОСОВ В PYTHON
В свободное время я много занимаюсь парсингом веб-страниц c Python, я искал различные форматы и фрагменты кода, чтобы локально выполнять большое количество сетевых запросов с элементами управления.



Этап 1
Перый этап я пробовал старые-добрые запросы. Нужно сделать 10 запросов? Обернем запросы в цикл for и сделаем их итеративно.
import requests

results = {}
for i in range(10):
    resp = requests.get('https://jsonplaceholder.typicode.com/todos/1')
    results[i] = resp.json()
Совсем неплохо - 40 запросов за 2,8 с или 1 запрос / 70 мс.  Это нормально, когда вам нужно подобрать трехзначный пароль - вы можете выполнить 1000 запросов за 70 секунд. Не очень хорошо, но достаточно быстро и не требует дополнительных библиотек или каких-либо исследований.

Однако, как только вы дойдете до чего-то из четырех символов, этой скорости не хватит.

Этап 2
Следующим шагом я решил найти способы делать запросы с помощью потоков. Создадим собственный поток для каждого запроса.

Настроим очередь и пул для извлечения URL-адресов, и запустим код. Очередь и рабочие потоки определяются довольно просто:
from queue import Queue

from threading import Thread


class Worker(Thread):
  """ Thread executing tasks from a given tasks queue """

  def __init__(self, tasks):
    Thread.__init__(self)
    self.tasks = tasks
    self.daemon = True
    self.start()

  def run(self):
    while True:
      func, args, kargs = self.tasks.get()
      try:
        func(*args, **kargs)
      except Exception as e:
        # An exception happened in this thread
        print(e)
      finally:
        # Mark this task as done, whether an exception happened or not
        self.tasks.task_done()


class ThreadPool:
  """ Pool of threads consuming tasks from a queue """

  def __init__(self, num_threads):
    self.tasks = Queue(num_threads)
    for _ in range(num_threads):
      Worker(self.tasks)

  def add_task(self, func, *args, **kargs):
    """ Add a task to the queue """
    self.tasks.put((func, args, kargs))

  def map(self, func, args_list):
    """ Add a list of tasks to the queue """
    for args in args_list:
      self.add_task(func, args)

  def wait_completion(self):
    """ Wait for completion of all the tasks in the queue """
    self.tasks.join()
Фактический код запроса довольно прост - просто определим функцию, которая будет заполнять глобальную переменную с использованием некоторого уникального идентификатора, и пусть она выполнит запрос в своем собственном потоке.
urls = [f"https://jsonplaceholder.typicode.com/todos/{i}" for i in range(40)]
pool = ThreadPool(40)

r = requests.session()


def get(url):
    resp = r.get(url)
    results[i] = resp.json()


pool.map(get, urls)
pool.wait_completion()
Теперь уже быстрее - 40 запросов за 365 мск, или 9,125 мск на запрос. Те же 1000 запросов, которые раньше занимали 1 минуту 10 секунд, теперь выполняются чуть более чем за девять секунд или примерно в 7 раз быстрее. Неплохо для довольно простой реализации многопоточности. Но можем ли мы сделать это еще быстрее?

Этап 3
Несколько лет назад я познакомился с библиотекой aiohttp - асинхронным HTTP-клиентом / сервером для asyncio и Python. В aiohttp используется новые (на тот момент) асинхронные возможности python.

Как только мы начинаем рассматривать пул из тысяч запросов, мы также хотим иметь возможность ограничивать потоки - наши программы могут открывать только определенное количество TCP-соединений одновременно и запускать определенное количество битов в заданную секунду. Я определил меотд под названием gather_with_concurrency - это способ использования asyncios gather с семафором, чтобы ограничить количество задач, над которыми мы работаем в любую заданную секунду.

Это немного замедляет нас из-за расходов памяти на семафор, но если вы делаете код в алфавитно-цифровом 4-значном пространстве, вы используете на 36 ^ 4 или 1,7 млн ​​запросов, которые, вероятно, следует объединить и немного отрегулировать, но расходы памяти семафора того стоят.
async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)

    async def sem_task(task):
        async with semaphore:
            return await task

    return await asyncio.gather(*(sem_task(task) for task in tasks))
Затем мы настраиваем коннектор и настраиваемый сессию.
conn = aiohttp.TCPConnector(limit=None, ttl_dns_cache=300)
session = aiohttp.ClientSession(connector=conn)
 Мы увеличиваем TTL кеша DNS.  Мы определяем нашу фактическую асинхронную функцию, которая будет выглядеть довольно знакомой, если вы уже привыкли к запросам. Мы также отключили проверку SSL для небольшого увеличения скорости.
async def get(url):
    async with session.get(url, ssl=False) as response:
        obj = await response.read()
        all_offers[url] = obj
Теперь мы действительно ускорились! 40 запросов за 100 мс, или 4 мс на запрос. Мы можем выполнять около 250 запросов в секунду - однако на этой скорости расходы на первоначальную настройку функции и jupyter notebook на самом деле составляют значительную часть общих затрат мощности.

Если мы увеличим кол-во запросов до 4000 запросов, мы увидим, что на самом деле мы приближаемся к времени выполнения 1,574 с, это примерно 56% времени, которое потребовалось нам для выполнения 10 запросов итеративно.

Мы можем выполнять один запрос каждые 0,393 мс или 393 микросекунды. Мы можем пройти через все буквенно-цифровое пространство для 4-символьной перестановки примерно за 660 миллионов микросекунд, или 11 минут, и все это с моего MacBook Pro.
Наша реализация Threading также выигрывает от увеличения пула - около 100 потоков (таких же, как семафор в версии asyncio) дает нам время до завершения 8 секунд для 4000 URL-адресов или чуть более 2 мс на URL. 
Однако те же 36 ^ 4 запросов с использованием ThreadPool займут 48 минут.
Мы можем почистить код и немного оптимизировать его:
Оптимальный размер семафора?
Если мы увеличим количество одновременных запросов до 4 КБ, мы увидим резкую потерю производительности
Это замедление почти в 3 раза из-за проблем с конкуренцией за ресурсы.


Интересно, что оптимальное значение семафора было около 60.

Тем не менее -  распараллеливания и нескольких хостов может дать вам числа, которые на порядок лучше, чем это, но цель этой статьи - увидеть, что мы можем сделать на локальной машине с ноутбуком jupyter.

Этап 4


HTTPX
HTTPX - это современная реализация веб-клиента на Python.

К сожалению, в моем тестировании он был строго медленнее, чем aiohttp. Я использовал их асинхронную библиотеку с тем же семафором, ограничивающим количество запущенных процессов, но он все равно был медленнее.
Я также пробовал использовать собственный сборщик с переключением параллелизма  - это тоже не помогло.
PyCurl


PyCurl отличается тем, что выглядит довольно сырой оболочкой для работы с запросми. Написание кода было больше похоже на открытие сокетов, чем на работу с красивой библиотекой http.
Результаты были впечатляющими, но библиотека aiohttp по-прежнему работала быстрее. Это был мой первый раз, когда я писал реализацию pycurl на основе этого шаблона - внедрение нативной потоковой передачи могло бы ускорить ее, но я до сих пор не видел ничего быстрее, чем работа кода за 393 микросекунды.

UVLoop Python

Еще одна попытка с UVLoop

я не думаю, что на данном этапе UVLoop сильно поможет, потому что большая часть времени связана с сетевым вызовом сайта.Скорость работы осталась примерно на том же уровне.

Ответить