Как масштабировать многопроцессорность Python до кластера с помощью одной строчки кода

Программы начинаются с малого. Будь то исследовательский анализ данных или построение модели машинного обучения, важно как можно быстрее заставить что-то простое работать. Однако со временем требования меняются, и некогда небольшие программы необходимо масштабировать, чтобы обрабатывать больше данных или использовать больше вычислительных ресурсов. К сожалению, изменение программы для масштабирования на несколько ядер или нескольких машин часто требует переписывания ее с нуля, не говоря уже о решении множества сложностей, связанных с параллелизмом и распределенными системами.

Многопроцессорность Python предлагает одно решение этой проблемы, предоставляя набор удобных API-интерфейсов, которые позволяют программам Python использовать преимущества нескольких ядер на одной машине. Однако, хотя это может помочь масштабировать приложение в 10 или даже 50 раз, оно все же ограничено параллелизмом одной машины, и выход за рамки этого потребует переосмысления и переписывания приложения.

В этом сообщении в блоге я расскажу, как можно преодолеть это ограничение, беспрепятственно масштабируясь до многоузлового кластера с помощью ray.util.multiprocessing.Pool API, выпущенного вместе с Ray – без переписывания своей программы!

Поддержка multiprocessing.Pool API на Ray

Примечание. Более подробное введение в Ray и его модель программирования см. в Луч для любопытных.

Ray – это мощная платформа с открытым исходным кодом, которая позволяет легко писать распределенные программы Python и легко масштабировать их с вашего ноутбука на кластер. Ray разработан с нуля для обеспечения производительности и масштабируемости и предлагает простой, но выразительный API.

Хотя с Ray API легко работать, он все же требует некоторой модификации программы. В описанном выше сценарии, когда инженер или специалист по анализу данных хочет масштабировать свою существующую программу, это может оказаться нестандартным.

Однако Ray поставляется с поддержкой multiprocessing.Pool API из коробки (тонкая оболочка, которая занимает всего ~ 500 строк кода). Когда вы импортируете ray.util.multiprocessing и запускаете новый Pool, Ray инициализируется на локальном узле или прозрачно подключается к работающему кластеру. Затем каждый процесс в пуле создается как луч-актор в кластере, и функции распараллеливаются между ними.

Хотя для некоторых рабочих нагрузок это может привести к увеличению производительности даже при работе на одном узле, реальная сила заключается в том, чтобы позволить существующим программам легко масштабироваться от одного узла до кластера.

Пример использования: оценка Пи Монте-Карло

Давайте рассмотрим пример масштабирования приложения от последовательной реализации Python к параллельной реализации на одном компьютере с использованием multiprocessing.Pool и к распределенной реализации на 10-узловом кластере Ray с использованием того же API. Все машины, используемые в следующих экспериментах, являются экземплярами AWS m4.4xlarge под управлением Ubuntu 18.04 (ami-06d51e91cea0dac8d). Полный пример кода для запуска каждой из следующих реализаций доступен здесь (обратите внимание, что на момент публикации этого поста для запуска примеров кода требуется установка ночных колес Ray).

Цель нашей примерной программы – оценить значение π. Мы сделаем это, используя метод Монте-Карло, который работает путем случайной выборки точек в квадрате 2×2. Мы можем использовать пропорцию точек, которые содержатся в единичном круге с центром в начале координат, чтобы оценить отношение площади круга к площади квадрата. Учитывая, что мы знаем, что истинное отношение равно π / 4, мы можем умножить наше оценочное отношение на 4, чтобы приблизиться к значению π. Чем больше точек мы отбираем для расчета этого приближения, тем ближе значение должно быть к истинному значению π (3,1415926…).

Шаг 1. Последовательный Python

import math
import random
import time
def sample(num_samples):
num_inside = 0
for _ in range(num_samples):
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
if math.hypot(x, y) <= 1:
num_inside += 1
return num_inside
def approximate_pi(num_samples):
start = time.time()
num_inside = sample(num_samples)
print(“pi ~= {}”.format((4*num_inside)/num_samples))
print(“Finished in: {:.2f}s”.format(time.time()-start))

view rawmonte_carlo_pi.py hosted with ❤ by GitHub

Выше представлена ​​простая реализация метода Монте-Карло на Python. Функция sample случайным образом выбирает num_samples точек в квадрате 2×2 и возвращает число, которое было внутри круга (используя math.hypot для вычисления расстояния от начала координат). Мы вызываем эту функцию из approximate_pi, которая печатает оценочное значение π, используя соотношение выборок внутри круга.

> python monte_carlo_pi.py --num-samples 100_000_000
pi ~= 3.141646
Finished in 94.64s

Запустив эту реализацию, мы видим, что выборка сотен миллионов точек занимает несколько минут. Это может показаться большим, но даже при 100 миллионах выборок наша оценка π имеет точность только до 3 десятичных знаков – нам придется масштабировать нашу реализацию, чтобы повысить точность.

Шаг 2. Параллельная работа на одном компьютере с использованием многопроцессорной обработки.Пул

import math
import random
import time
def sample(num_samples):
num_inside = 0
for _ in range(num_samples):
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
if math.hypot(x, y) <= 1:
num_inside += 1
return num_inside
def approximate_pi_parallel(num_samples):
from multiprocessing.pool import Pool
pool = Pool()
start = time.time()
num_inside = 0
sample_batch_size = 100000
for result in pool.map(sample, [sample_batch_size for _ in range(num_samples//sample_batch_size)]):
num_inside += result
print(“pi ~= {}”.format((4*num_inside)/num_samples))
print(“Finished in: {:.2f}s”.format(time.time()-start))

view rawparallel_monte_carlo_pi.py hosted with ❤ by GitHub

Чтобы увеличить количество выборок, давайте изменим код, чтобы распараллелить выборку по ядрам на одной машине. Сначала мы создаем экземпляр многопроцессорного пула, который по умолчанию запускает столько процессов, сколько процессоров установлено на машине. Затем вместо прямого вызова sample мы вызываем его в пуле процессов с помощью pool.map() и агрегируем результаты.

> python parallel_monte_carlo_pi.py --num-samples 100_000_000
pi ~= 3.141432
Finished in 12.52s
> python parallel_monte_carlo_pi.py --num-samples 1_000_000_000
pi ~= 3.141532
Finished in 124.02s

Теперь, когда мы распараллелили код по ядрам на одной машине, мы можем отобрать 100 миллионов образцов за 12,54 секунды, тогда как в последовательной реализации потребовалось 94,64 секунды. Мы также можем выполнить масштабирование до миллиарда отсчетов всего за несколько минут, что дает оценку π с точностью до 4 знаков после запятой. Очевидно, что это улучшение, но если мы хотим добиться более высокой точности за разумный промежуток времени, нам придется распределиться по кластеру.

Шаг 3. Распределение в кластере из 10 узлов с использованием Ray

import math
import random
import time
def sample(num_samples):
num_inside = 0
for _ in range(num_samples):
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
if math.hypot(x, y) <= 1:
num_inside += 1
return num_inside
def approximate_pi_distributed(num_samples):
from ray.util.multiprocessing.pool import Pool # NOTE: Only the import statement is changed.
pool = Pool()
start = time.time()
num_inside = 0
sample_batch_size = 100000
for result in pool.map(sample, [sample_batch_size for _ in range(num_samples//sample_batch_size)]):
num_inside += result
print(“pi ~= {}”.format((4*num_inside)/num_samples))
print(“Finished in: {:.2f}s”.format(time.time()-start))

view rawdistributed_monte_carlo_pi.py hosted with ❤ by GitHub

Для изменения нашего сценария для запуска в кластере Ray требуется только изменить оператор Pool import, чтобы он указывал на реализацию Ray, которая будет распараллеливать нашу sample функцию в акторах Ray в кластере, а не в локальных процессах.

Чтобы выполнить оценку на кластере, мы сначала используем Инструмент автоматической настройки кластера Ray, чтобы запустить кластер из 10 узлов (с использованием файла минимальной конфигурации). После запуска кластера мы копируем сценарий на головной узел кластера, а затем подключаемся к удаленному сеансу SSH. Затем мы запускаем наш измененный сценарий, устанавливая переменную среды RAY_ADDRESS, чтобы Ray подключался к существующему многоузловому кластеру вместо того, чтобы запускать новый на машине.

# Start the cluster and copy the script to it.
> ray up -y ray-cluster.yaml
> ray rsync-up ray-cluster.yaml monte_carlo_pi.py monte_carlo_pi.py

# Attach to a remote shell in the head node and run the script.
> ray attach ray-cluster.yaml
[remote] > RAY_ADDRESS=auto python monte_carlo_pi.py --distributed --num-samples 100_000_000
pi ~= 3.141634
Finished in 1.92
[remote] > RAY_ADDRESS=auto python monte_carlo_pi.py --distributed --num-samples 1_000_000_000
pi ~= 3.141541
Finished in 13.75
[remote] > RAY_ADDRESS=auto python monte_carlo_pi.py --distributed --num-samples 10_000_000_000
pi ~= 3.141599
Finished in 131.37s

# Clean up the cluster.
> ray down -y ray-cluster.yaml

Теперь, когда наша выборка распределена по кластеру из 10 узлов, мы видим значительное ускорение: выборка 100 миллионов точек (которая первоначально занимала 94,64 секунды) теперь занимает чуть менее 2 секунд, выборка 1 миллиарда точек (что заняло 2 минуты на одной машине с multiprocessing.Pool ) занимает всего 14 секунд, и теперь мы можем масштабировать до 10 миллиардов отсчетов и достигать точности 5 десятичных знаков всего за 2 минуты. Хотя это ускорение на порядок уже впечатляет, одна из замечательных особенностей использования Ray заключается в том, что мы можем легко продолжить масштабирование программы, увеличив количество узлов в кластере, если это необходимо.

Заключение

В этом сообщении блога я исследовал, как можно масштабировать многопроцессорные рабочие нагрузки Python на кластер, не переписывая свою программу, используя поддержку Ray для multiprocessing.Pool API.

Это только одна из многих мощных библиотек, созданных для масштабирования с использованием Ray, включая Tune, масштабируемую библиотеку настройки гиперпараметров, RLlib, масштабируемую библиотеку обучения с подкреплением, и Modin, масштабируемый механизм DataFrame.

https://t.me/data_analysis_ml

источник

Ответить