Написание быстрых асинхронных HTTP-Запросов с Python
НАПИСАНИЕ БЫСТРОГО ASYNC HTTP-ЗАПРОСОВ В PYTHON
В свободное время я много занимаюсь парсингом веб-страниц c Python, я искал различные форматы и фрагменты кода, чтобы локально выполнять большое количество сетевых запросов с элементами управления. Этап 1 Перый этап я пробовал старые-добрые запросы. Нужно сделать 10 запросов? Обернем запросы в цикл for и сделаем их итеративно.
import requests
results = {}
for i in range(10):
resp = requests.get('https://jsonplaceholder.typicode.com/todos/1')
results[i] = resp.json()
Совсем неплохо - 40 запросов за 2,8 с или 1 запрос / 70 мс. Это нормально, когда вам нужно подобрать трехзначный пароль - вы можете выполнить 1000 запросов за 70 секунд. Не очень хорошо, но достаточно быстро и не требует дополнительных библиотек или каких-либо исследований. Однако, как только вы дойдете до чего-то из четырех символов, этой скорости не хватит. Этап 2 Следующим шагом я решил найти способы делать запросы с помощью потоков. Создадим собственный поток для каждого запроса. Настроим очередь и пул для извлечения URL-адресов, и запустим код. Очередь и рабочие потоки определяются довольно просто:
from queue import Queue
from threading import Thread
class Worker(Thread):
""" Thread executing tasks from a given tasks queue """
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as e:
# An exception happened in this thread
print(e)
finally:
# Mark this task as done, whether an exception happened or not
self.tasks.task_done()
class ThreadPool:
""" Pool of threads consuming tasks from a queue """
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
""" Add a task to the queue """
self.tasks.put((func, args, kargs))
def map(self, func, args_list):
""" Add a list of tasks to the queue """
for args in args_list:
self.add_task(func, args)
def wait_completion(self):
""" Wait for completion of all the tasks in the queue """
self.tasks.join()
Фактический код запроса довольно прост - просто определим функцию, которая будет заполнять глобальную переменную с использованием некоторого уникального идентификатора, и пусть она выполнит запрос в своем собственном потоке.
urls = [f"https://jsonplaceholder.typicode.com/todos/{i}" for i in range(40)]
pool = ThreadPool(40)
r = requests.session()
def get(url):
resp = r.get(url)
results[i] = resp.json()
pool.map(get, urls)
pool.wait_completion()
Теперь уже быстрее - 40 запросов за 365 мск, или 9,125 мск на запрос. Те же 1000 запросов, которые раньше занимали 1 минуту 10 секунд, теперь выполняются чуть более чем за девять секунд или примерно в 7 раз быстрее. Неплохо для довольно простой реализации многопоточности. Но можем ли мы сделать это еще быстрее? Этап 3 Несколько лет назад я познакомился с библиотекой aiohttp - асинхронным HTTP-клиентом / сервером для asyncio и Python. В aiohttp используется новые (на тот момент) асинхронные возможности python. Как только мы начинаем рассматривать пул из тысяч запросов, мы также хотим иметь возможность ограничивать потоки - наши программы могут открывать только определенное количество TCP-соединений одновременно и запускать определенное количество битов в заданную секунду. Я определил меотд под названием gather_with_concurrency - это способ использования asyncios gather с семафором, чтобы ограничить количество задач, над которыми мы работаем в любую заданную секунду. Это немного замедляет нас из-за расходов памяти на семафор, но если вы делаете код в алфавитно-цифровом 4-значном пространстве, вы используете на 36 ^ 4 или 1,7 млн запросов, которые, вероятно, следует объединить и немного отрегулировать, но расходы памяти семафора того стоят.
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
Затем мы настраиваем коннектор и настраиваемый сессию.
conn = aiohttp.TCPConnector(limit=None, ttl_dns_cache=300)
session = aiohttp.ClientSession(connector=conn)
Мы увеличиваем TTL кеша DNS. Мы определяем нашу фактическую асинхронную функцию, которая будет выглядеть довольно знакомой, если вы уже привыкли к запросам. Мы также отключили проверку SSL для небольшого увеличения скорости.
async def get(url):
async with session.get(url, ssl=False) as response:
obj = await response.read()
all_offers[url] = obj
Теперь мы действительно ускорились! 40 запросов за 100 мс, или 4 мс на запрос. Мы можем выполнять около 250 запросов в секунду - однако на этой скорости расходы на первоначальную настройку функции и jupyter notebook на самом деле составляют значительную часть общих затрат мощности. Если мы увеличим кол-во запросов до 4000 запросов, мы увидим, что на самом деле мы приближаемся к времени выполнения 1,574 с, это примерно 56% времени, которое потребовалось нам для выполнения 10 запросов итеративно. Мы можем выполнять один запрос каждые 0,393 мс или 393 микросекунды. Мы можем пройти через все буквенно-цифровое пространство для 4-символьной перестановки примерно за 660 миллионов микросекунд, или 11 минут, и все это с моего MacBook Pro.
Наша реализация Threading также выигрывает от увеличения пула - около 100 потоков (таких же, как семафор в версии asyncio) дает нам время до завершения 8 секунд для 4000 URL-адресов или чуть более 2 мс на URL.
Однако те же 36 ^ 4 запросов с использованием ThreadPool займут 48 минут. Мы можем почистить код и немного оптимизировать его:
Оптимальный размер семафора? Если мы увеличим количество одновременных запросов до 4 КБ, мы увидим резкую потерю производительности
Это замедление почти в 3 раза из-за проблем с конкуренцией за ресурсы.
Интересно, что оптимальное значение семафора было около 60. Тем не менее - распараллеливания и нескольких хостов может дать вам числа, которые на порядок лучше, чем это, но цель этой статьи - увидеть, что мы можем сделать на локальной машине с ноутбуком jupyter. Этап 4 HTTPX HTTPX - это современная реализация веб-клиента на Python. К сожалению, в моем тестировании он был строго медленнее, чем aiohttp. Я использовал их асинхронную библиотеку с тем же семафором, ограничивающим количество запущенных процессов, но он все равно был медленнее.
Я также пробовал использовать собственный сборщик с переключением параллелизма - это тоже не помогло.
PyCurl PyCurl отличается тем, что выглядит довольно сырой оболочкой для работы с запросми. Написание кода было больше похоже на открытие сокетов, чем на работу с красивой библиотекой http.
Результаты были впечатляющими, но библиотека aiohttp по-прежнему работала быстрее. Это был мой первый раз, когда я писал реализацию pycurl на основе этого шаблона - внедрение нативной потоковой передачи могло бы ускорить ее, но я до сих пор не видел ничего быстрее, чем работа кода за 393 микросекунды.
UVLoop Python
Еще одна попытка с UVLoop
я не думаю, что на данном этапе UVLoop сильно поможет, потому что большая часть времени связана с сетевым вызовом сайта.Скорость работы осталась примерно на том же уровне.
+1
+1
3
+1
+1
+1