7 способов поделиться массивом Numpy между процессами

Вы можете обмениваться массивами numpy между процессами в Python.
Существует множество способов совместного использования массива numpy между процессами, например, в качестве аргумента функции, в качестве унаследованной глобальной переменной, через очередь или канал, в виде массива, поддерживаемого RawArray или SharedMemory, или через прокси объект.
В этом руководстве вы познакомитесь с набором подходов, которые можно использовать для совместного использования массива numpy между процессами Python.
Давайте начнем!
Необходимо разделить массив Numpy между процессами
Python предлагает параллелизм на основе процессов через модуль multiprocessing .
Параллелизм на основе процессов подходит для задач, связанных с ЦП, в отличие от параллелизма на основе потоков в Python, который обычно подходит для задач, связанных с вводом-выводом, при наличии глобальной блокировки интерпретатора (GIL).
Рассмотрим ситуацию, когда нам нужно разделить массивы numpy между процессами.
Это может случиться по многим причинам, например:
- Данные загружаются в виде массива в одном процессе и по-разному анализируются в разных подпроцессах.
- Многие дочерние процессы загружают небольшие данные в виде массивов, которые отправляются родительскому процессу для обработки.
- Массивы данных загружаются в родительский процесс и обрабатываются в наборе дочерних процессов.
Совместное использование объектов и данных Python между процессами происходит медленно.
Это связано с тем, что любые данные, такие как массивы numpy, совместно используемые между процессами, должны передаваться с использованием межпроцессного взаимодействия (ICP), требующего, чтобы данные сначала были собраны отправителем, а затем удалены получателем.
Это означает, что если мы разделяем массивы numpy между процессами, предполагается, что мы получаем некоторую выгоду, такую как ускорение, которое преодолевает медленную скорость передачи данных.
Например, это может быть случай, когда массивы относительно малы и быстро передаются, тогда как вычисления, выполняемые для каждого массива, медленны и могут выиграть от выполнения в отдельных процессах.
В качестве альтернативы подготовка массива может быть дорогостоящей в вычислительном отношении и выгодна, если выполняется в отдельном процессе, а после подготовки массивы становятся небольшими и быстрыми для передачи другому процессу, который в них нуждается.
Учитывая эти ситуации, как мы можем обмениваться данными между процессами в Python?
Как поделиться массивом Numpy между процессами
Есть много способов поделиться массивом numpy между процессами.
Тем не менее, есть, пожалуй, 7 основных подходов, которые мы можем реализовать с помощью стандартной библиотеки Python:
- Передать массив NumPy в качестве аргумента новому процессу
- Наследовать массив NumPy как глобальную переменную
- Поделиться массивом NumPy через очередь
- Поделиться массивом NumPy через канал
- Поделиться массивом NumPy с поддержкой RawArray
- Поделиться массивом NumPy с поддержкой SharedMemory
- Поделиться прокси-сервером массива NumPy, размещенного в диспетчере
Некоторые подходы работают медленно, требуя, чтобы массив numpy передавался с использованием межпроцессного взаимодействия, например, передача массива в качестве аргумента или использование очереди или канала.
Другие подходы работают быстро, используя прямую копию памяти, например, наследование массива с помощью глобальной переменной.
Предпочтительный подход часто заключается в использовании симулированной модели общей памяти, которая является быстрой и позволяет отражать изменения в каждом процессе, например массиве numpy, поддерживаемом RawArray или SharedMemory , или массиве, размещенном в процессе сервера Manager .
Мы внимательно рассмотрим каждый подход на рабочем примере.
Давайте начинать!
Пример передачи массива Numpy в качестве аргумента
Мы можем делиться массивами numpy с дочерними процессами через аргументы функции.
Напомним, что когда мы запускаем дочерний процесс, мы можем настроить его для запуска целевой функции с помощью аргумента « target ». Мы также можем передавать аргументы целевой функции с помощью ключевого слова «args ».
Таким образом, мы можем определить целевую функцию, которая принимает массив numpy в качестве аргумента и выполняет ее в дочернем процессе.
Например:
...
# create a child process
child = Process(target=task, args=(data,))
Передача массива numpy целевой функции, выполняемой в дочернем процессе, происходит медленно.
Для этого требуется, чтобы массив numpy был погружен в родительский процесс, а затем распакован в дочернем процессе. Это означает, что он может подходить только для массивов скромного размера, которые быстро выбираются и распаковываются.
Тем не менее, это простой и эффективный метод.
Мы можем исследовать случай совместного использования массива numpy с дочерним процессом через аргумент функции.
В этом примере мы создадим массив в родительском процессе, а затем передадим его задаче, выполняемой в дочернем процессе. Массив будет скопирован и передан дочернему процессу, поэтому его изменения не будут отражены в родительском процессе.
Во-первых, мы можем определить функцию task, которая будет выполняться в дочернем процессе.
Функция будет принимать массив numpy в качестве аргумента. Она сообщит о некотором содержимом массива, изменит все значения на ноль, а затем сообщит, что содержимое массива было изменено.
Функция task() ниже реализует это.
# task executed in a child process
def task(data):
# check some data in the array
print(data[:5,:5])
# change data in the array
data.fill(0.0)
# confirm the data was changed
print(data[:5,:5])
Далее в основном процессе мы можем создать массив скромного размера, заполненный единицами.
...
# define the size of the numpy array
n = 10000
# create the numpy array
data = ones((n,n))
Затем мы можем создать и настроить дочерний процесс для выполнения нашей функции task() и передать массив numpy в качестве аргумента.
...
# create a child process
child = Process(target=task, args=(data,))
Далее мы можем запустить дочерний процесс и заблокировать его, пока он не завершится.
...
# start the child process
child.start()
# wait for the child process to complete
child.join()
Наконец, мы можем проверить содержимое массива.
...
# check some data in the array
print(data[:5,:5])
Ниже приведён полный пример:
# share numpy array via function argument
from multiprocessing import Process
from numpy import ones
# task executed in a child process
def task(data):
# check some data in the array
print(data[:5,:5])
# change data in the array
data.fill(0.0)
# confirm the data was changed
print(data[:5,:5])
# protect the entry point
if __name__ == '__main__':
# define the size of the numpy array
n = 10000
# create the numpy array
data = ones((n,n))
# create a child process
child = Process(target=task, args=(data,))
# start the child process
child.start()
# wait for the child process to complete
child.join()
# check some data in the array
print(data[:5,:5])
При запуске примера сначала создается массив numpy, заполненный одним значением.
Затем создается, настраивается и запускается дочерний процесс. Далее основной процесс блокируется до тех пор, пока дочерний процесс не будет завершен.
Запускается дочерний процесс, подтверждающий, что массив заполнен одним значением. Затем он заполняет массив нулевыми значениями и подтверждает, что содержимое массива было изменено.
Дочерний процесс завершается, а основной процесс возобновляется. Он подтверждает, что содержимое массива в родительском процессе не изменилось.
Это показывает, как мы можем поделиться копией массива с дочерним процессом и как изменения в копии массива в дочернем процессе не отражаются в родительском процессе.
[[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]]
[[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]]
[[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]]
Пример наследования массива NumPy через глобальную переменную
Мы можем поделиться массивом numpy с другими процессами через глобальную переменную.
То есть процесс может загрузить или создать массив numpy, сохранить его в глобальной переменной, а другие процессы могут получить к нему прямой доступ.
Преимущество этого подхода в том, что он действительно быстрый.
Это может быть до 34 раз быстрее, чем отправка массива между процессами с использованием других методов, таких как аргумент функции.
Однако у этого подхода есть некоторые ограничения, а именно:
- Массив должен быть сохранен в глобальной переменной родительским процессом.
- Доступ к массиву имеют только дочерние процессы.
- Дочерние процессы должны быть созданы с использованием метода запуска «fork».
- Изменения в массиве, сделанные дочерними процессами, не будут отражаться ни друг в друге, ни в родительском процессе.
Рассмотрим эти опасения подробно.
Программа на Python может объявить явную глобальную переменную с помощью ключевого слова « global».
Например:
...
# declare the global variable
global data
Это не требуется в родительском процессе при подготовке массива numpy, но может понадобиться в функции, выполняемой в дочернем процессе, чтобы гарантировать, что программа ссылается на унаследованную глобальную переменную, а не на вновь созданную локальную переменную.
Это называется наследованием глобальной переменной.
Дочерние процессы могут быть созданы либо путем порождения нового экземпляра интерпретатора Python, либо путем разветвления существующего экземпляра интерпретатора Python. В настоящее время по умолчанию в Windows и macOS используется порождение, а в Linux по умолчанию используется разветвление.
Мы можем явно указать метод, используемый для запуска дочерних процессов, с помощью функции multiprocessing.set_start_method() и указать метод в виде строки, например «fork».
Например:
...
# ensure we are using fork start method
set_start_method('fork')
На момент написания статьи метод запуска fork не поддерживается в Windows.
Дочерний процесс наследует копию всех глобальных переменных от родительского процесса.
Поскольку они являются копиями, любые изменения, внесенные в глобальную переменную в дочернем процессе, будут доступны только дочернему процессу. Точно так же любые изменения глобальной переменной, сделанные в родительском процессе после создания дочерних процессов, не будут отражены в дочерних процессах.
Таким образом, такой подход к совместному использованию массива между процессами уместен в определенных ситуациях, например:
- Массив большой, но машина может себе позволить хранить в памяти несколько копий одновременно.
- Дочерние процессы что-то вычисляют, используя массив, но изменения самого массива не требуются в родительском процессе или других процессах.
Типичным примером, который может подойти для этого подхода, может быть ситуация, когда один или несколько массивов подготавливаются или загружаются, и для каждого из них необходимо вычислить набор статистических данных, требующих значительных вычислительных ресурсов. Каждую статистику или набор статистик можно рассчитать, используя копию массива в отдельном процессе.
Мы можем изучить, как совместно использовать массив numpy как унаследованную глобальную переменную с дочерними процессами.
В этом примере мы создадим массив numpy небольшого размера в родительском процессе и сохраним его как глобальную переменную. Затем мы запустим дочерний процесс, используя метод запуска fork, и получим доступ к унаследованному массиву numpy.
Во-первых, мы можем определить задачу, которая будет выполняться в дочернем процессе.
В этом случае задача не принимает никаких аргументов. Вместо этого он объявляет унаследованную глобальную переменную, а затем напрямую обращается к ее части.
Затем он изменяет содержимое массива, присваивая всем значениям ноль и подтверждая, что изменение вступило в силу.
Функция task() ниже реализует это.
# task executed in a child process
def task():
# declare the global variable
global data
# check some data in the array
print(data[:5,:5])
# change data in the array
data.fill(0.0)
# confirm the data was changed
print(data[:5,:5])
Затем мы можем установить метод запуска fork.
...
# ensure we are using fork start method
set_start_method('fork')
Далее мы можем создать массив numpy скромного размера, хранящийся в переменной.
...
# define the size of the numpy array
n = 10000
# create the numpy array
data = ones((n,n))
Затем мы можем создать дочерний процесс, сконфигурированный для выполнения нашей функции task() , а затем запустить этот процесс.
Это создаст копию родительского процесса, включая массив numpy, хранящийся в переменной data.
...
# create a child process
child = Process(target=task)
# start the child process
child.start()
Наконец, мы можем дождаться завершения дочернего процесса, а затем сообщить содержимое массива.
...
# wait for the child process to complete
child.join()
# check some data in the array
print(data[:5,:5])
Связывая это вместе, полный пример приведен ниже.
Обратите внимание, что этот пример может не работать в Windows, так как он требует поддержки метода запуска “fork”.
# share a numpy array as a global variable
from multiprocessing import set_start_method
from multiprocessing import Process
from numpy import ones
from numpy import zeros
# task executed in a child process
def task():
# declare the global variable
global data
# check some data in the array
print(data[:5,:5])
# change data in the array
data.fill(0.0)
# confirm the data was changed
print(data[:5,:5])
# protect the entry point
if __name__ == '__main__':
# ensure we are using fork start method
set_start_method('fork')
# define the size of the numpy array
n = 10000
# create the numpy array
data = ones((n,n))
# create a child process
child = Process(target=task)
# start the child process
child.start()
# wait for the child process to complete
child.join()
# check some data in the array
print(data[:5,:5])
При запуске примера сначала задается метод запуска fork для создания дочерних процессов.
Затем массив numpy создается и инициализируется значением.
Далее создается и запускается дочерний процесс, а основной процесс блокируется до его завершения.
Создается разветвленная копия родительского процесса и выполняется функция task(). Унаследованная глобальная переменная объявляется, а затем к ней обращаются, сообщая о небольшом подмножестве данных.
Это подтверждает, что массив унаследован правильно и содержит данные, инициализированные в родительском процессе.
Далее дочерний процесс заполняет массив нулевыми значениями и обращается к ним, чтобы подтвердить, что данные были изменены.
Дочерний процесс завершается, а основной процесс возобновляется. Он сообщает содержимое небольшой части массива, подтверждая, что изменение данных, сделанное в дочернем процессе, не было распространено на родительский процесс.
Это подчеркивает, как разделить массив numpy с дочерними процессами по наследству и что изменения, внесенные в дочерний процесс в унаследованный массив, не распространяются.
[[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]]
[[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]]
[[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]]
Пример совместного использования массива NumPy через очередь
Другой способ поделиться массивами numpy между процессами Python — использовать очередь.
Python предоставляет процессно-безопасную очередь в классе multiprocessing.Queue .
Очередь — это структура данных, в которую элементы могут быть добавлены вызовом put() и из которых элементы могут быть извлечены вызовом get() .
Multiprocessing.Queue обеспечивает очередь FIFO «первым пришёл— первым вышел » , что означает, что элементы извлекаются из очереди в том порядке, в котором они были добавлены. Первые элементы, добавленные в очередь, будут первыми извлеченными элементами. Это отличается от других типов очередей, таких как очереди «последним пришел», «первым вышел» и очереди с приоритетом.
Вы можете узнать больше о том, как использовать класс multiprocessing.Queue в учебнике:
Мы можем изучить пример отправки массивов numpy дочерним процессам для обработки с использованием общей очереди.
В этом примере родительский процесс создаст множество массивов разной размерности, заполненных случайными числами. Массивы будут помещены в общую очередь. Затем несколько дочерних процессов будут использовать массивы из очереди и выполнять над ними математическую операцию.
Это может быть хорошим шаблоном для тех случаев, когда загрузка или подготовка массивов numpy относительно проста, а массивы имеют скромный размер, но каждый массив требует выполнения нетривиальных вычислений (которые мы будем моделировать в этом случае).
Во-первых, мы можем определить задачу, выполняемую в дочерних процессах.
Эта задача будет выполняться в цикле. На каждой итерации он будет извлекать один массив из очереди, а затем выполнять над ним вычислительную операцию. В этом случае он просто вычислит максимальное значение в массиве и сообщит об этом. Цикл завершается, если получено сообщение None , сигнализирующее об отсутствии дальнейших массивов.
Функция task() ниже реализует это, принимая общий массив в качестве аргумента.
# read numpy arrays from the queue and compute something
def task(queue):
# read arrays
while True:
# read one array
data = queue.get()
# check for final item
if data is None:
# report message
print('Task done.', flush=True)
# push signal back into queue
queue.put(data)
# exit the task
break
# compute max of array
result = data.max()
# report a message
print(f'Task read an array {data.shape}, max={result}', flush=True)
Основной процесс сначала создаст общую очередь, а затем запустит 4 дочерних процесса, каждый из которых выполняет функцию task().
...
# create the shared queue
queue = Queue()
# issue task processes
tasks = [Process(target=task, args=(queue,)) for _ in range(4)]
for t in tasks:
t.start()
Затем основной процесс зацикливается 20 раз, каждая итерация создает пустой массив со случайными размерами и заполняется случайными значениями, которые затем помещаются в общую очередь.
...
# generate many arrays of random numbers
for _ in range(20):
# generate random dimensions
dim = (randint(500,2000),randint(500,2000))
# generate array of random floats with random dimensions
data = random(dim)
# push into queue
queue.put(data)
Далее основной процесс сигнализирует о том, что никаких дополнительных массивов не ожидается, с помощью значения None , а затем ожидает завершения всех дочерних процессов.
...
# signal no further arrays
queue.put(None)
# wait for task processes to be done
for t in tasks:
t.join()
# report a final message
print('Done.', flush=True)
Ниже приведён полный код:
# create arrays in a parent process and read in children via queue
from multiprocessing import Process
from multiprocessing import Queue
from random import randint
from numpy.random import random
# read numpy arrays from the queue and compute something
def task(queue):
# read arrays
while True:
# read one array
data = queue.get()
# check for final item
if data is None:
# report message
print('Task done.', flush=True)
# push signal back into queue
queue.put(data)
# exit the task
break
# compute max of array
result = data.max()
# report a message
print(f'Task read an array {data.shape}, max={result}', flush=True)
# protect the entry point
if __name__ == '__main__':
# create the shared queue
queue = Queue()
# issue task processes
tasks = [Process(target=task, args=(queue,)) for _ in range(4)]
for t in tasks:
t.start()
# generate many arrays of random numbers
for _ in range(20):
# generate random dimensions
dim = (randint(500,2000),randint(500,2000))
# generate array of random floats with random dimensions
data = random(dim)
# push into queue
queue.put(data)
# signal no further arrays
queue.put(None)
# wait for task processes to be done
for t in tasks:
t.join()
# report a final message
print('Done.', flush=True)
Запуск программы сначала создает общую очередь.
Затем создаются и настраиваются 4 дочерних процесса для выполнения нашей функции task() перед запуском. Каждый процесс ожидает появления элементов в очереди.
Далее основной процесс генерирует и помещает в очередь 20 пустых массивов.
Затем он добавляет сигнал, чтобы не ожидать дальнейших массивов, и ожидает завершения дочернего процесса.
Каждый дочерний процесс зацикливается, извлекая один массив из очереди. Он вычисляет максимальное значение, а затем сообщает его вместе с размерами массива.
Это повторяется всеми дочерними процессами до тех пор, пока не будут доступны другие массивы.
Каждый дочерний процесс встречает сообщение None, добавляет его обратно в очередь для других дочерних процессов, выходит из цикла и сообщает о сообщении, завершающем дочерний процесс.
Как только все дочерние процессы завершаются, основной процесс возобновляется и сообщает последнее сообщение перед завершением.
Task read an array (827, 1388), max=0.999999824661523
Task read an array (1745, 1231), max=0.9999998440051578
Task read an array (823, 1081), max=0.999999653298093
Task read an array (1364, 1886), max=0.9999994826187178
Task read an array (1960, 1190), max=0.9999999890428217
Task read an array (1272, 1634), max=0.9999992473453926
Task read an array (1596, 1742), max=0.9999996946372763
Task read an array (1035, 1511), max=0.9999988748078102
Task read an array (1358, 951), max=0.9999996488860812
Task read an array (586, 564), max=0.999998583415731
Task read an array (1852, 673), max=0.9999997326673665
Task read an array (1273, 1114), max=0.9999999614124049
Task read an array (1920, 775), max=0.9999997603830041
Task read an array (1175, 1762), max=0.9999998728441418
Task read an array (1567, 843), max=0.9999978316858225
Task read an array (1579, 1028), max=0.9999996224604828
Task read an array (784, 1873), max=0.9999998776214708
Task read an array (1770, 1099), max=0.9999998975119301
Task read an array (1511, 788), max=0.9999996664224969
Task done.
Task done.
Task done.
Task read an array (1851, 1027), max=0.9999997882123821
Task done.
Done.
Пример совместного использования массива NumPy через канал
Мы можем разделить массив numpy между процессами, используя канал.
Это очень похоже на совместное использование массива numpy с использованием очереди. Пустой массив должен быть записан в канал в одном процессе и прочитан из канала в другом процессе. Копия массива передается между процессами с помощью межпроцессного взаимодействия, которое может быть медленным для больших массивов.
Класс multiprocessing.Pipe — это простая структура данных для обмена данными между процессами. Мы можем создать канал, который создаст соединение для отправки и получения.
...
# create the shared pipe
conn1, conn2 = Pipe()
Затем мы можем поделиться отправляющим соединением с дочерним процессом и принимающим соединением с родительским процессом и использовать их для отправки массива обратно от дочернего процесса к родительскому процессу.
...
# send the array via a pipe
pipe.send(data)
Вы можете узнать больше о том, как использовать каналы для межпроцессного взаимодействия в учебнике:
Мы можем исследовать случай имитации возврата массива numpy из дочернего процесса с использованием канала.
В этом примере мы создадим канал и разделим отправляющее соединение с дочерним процессом, а принимающее соединение сохраним в родительском процессе. Затем дочерний процесс создаст массив numpy и отправит его родительскому процессу через канал.
Во-первых, мы можем определить функцию задачи, которая принимает соединение канала в качестве аргумента, создает и передает массив.
Сначала функция создает массив, инициализированный одним значением. Затем он подтверждает, что содержимое массива содержит одно значение, и отправляет его по конвейеру.
Функция task() ниже реализует это.
# task executed in a child process
def task(pipe):
# define the size of the numpy array
n = 10000
# create the numpy array
data = ones((n,n))
# check some data in the array
print(data[:5,:5])
# send the array via a pipe
pipe.send(data)
Далее основной процесс создаст канал.
...
# create the shared pipe
conn1, conn2 = Pipe()
Затем он создает дочерний процесс и настраивает его для выполнения нашей функции задачи, передавая ему отправляющее соединение в качестве аргумента.
...
# create a child process
child = Process(target=task, args=(conn2,))
Затем запускается дочерний процесс, а основной процесс блокируется, ожидая получения массива через канал.
...
# start the child process
child.start()
# read the data from the pipe
data = conn1.recv()
Наконец, основной процесс подтверждает содержимое полученного массива numpy.
...
# check some data in the array
print(data[:5,:5])
Полный пример приведён ниже:
# share numpy array via pipe
from multiprocessing import Process
from multiprocessing import Pipe
from numpy import ones
# task executed in a child process
def task(pipe):
# define the size of the numpy array
n = 10000
# create the numpy array
data = ones((n,n))
# check some data in the array
print(data[:5,:5])
# send the array via a pipe
pipe.send(data)
# protect the entry point
if __name__ == '__main__':
# create the shared pipe
conn1, conn2 = Pipe()
# create a child process
child = Process(target=task, args=(conn2,))
# start the child process
child.start()
# read the data from the pipe
data = conn1.recv()
# check some data in the array
print(data[:5,:5])
При выполнении примера сначала создается канал с отправляющими и принимающими соединениями.
Затем дочерний процесс создается, настраивается и запускается.
Далее основной процесс блокируется, ожидая чтения массива numpy из канала.
Дочерний процесс запускается, сначала создавая массив numpy, инициализированный одним значением.
Затем он подтверждает содержимое массива и передает его родительскому процессу через канал. Затем дочерний процесс завершается.
Основной процесс считывает массив numpy из канала и подтверждает, что он содержит одно значение.
[[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]]
[[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1.]]
Пример совместного использования массива NumPy с поддержкой RawArray
Мы можем поделиться массивом numpy между процессами через модуль ctypes.
Модуль ctypes предоставляет инструменты для работы с типами данных C.
Python предоставляет возможность совместного использования ctypes между процессами в одной системе.
Вы можете узнать больше о совместном использовании ctypes между процессами в учебнике:
Мы можем использовать общий массив ctype в качестве буфера, который поддерживает массив numpy.
Массивы Numpy можно создавать в каждом процессе Python, поддерживаемом одним и тем же общим массивом ctype, и обмениваться данными напрямую.
Этого можно достичь, сначала создав multiprocessing.sharedctypes.RawArray с требуемым типом и достаточно большой для хранения данных, требуемых массивом numpy.
Например:
...
# create the shared array
array = RawArray('d', 10000)
Затем можно создать новый массив numpy с заданным типом, указав RawArray в качестве буфера, например, предварительно выделенную память для массива.
Этого можно добиться, напрямую создав новый numpy.ndarray и указав « buff » в качестве буфера массива.
Например:
...
# create a new numpy array that uses the raw array
data = ndarray((len(array),), dtype=numpy.double, buffer=array)
Обычно не рекомендуется создавать ndarrays напрямую.
Вместо этого мы можем использовать метод nump y.frombuffer() для создания массива для нас с заданным размером, типом и поддержкой RawArray.
Например:
# create a new numpy array backed by the raw array
data = frombuffer(array, dtype=double, count=len(array))
Затем экземпляр RawArray можно сделать доступным для других процессов Python и использовать для создания пустых массивов, поддерживаемых той же памятью.
Это позволяет каждому процессу напрямую читать и записывать одни и те же данные массива, не копируя их между процессами.
Вы можете узнать больше о совместном использовании массива numpy, поддерживаемого RawArray, между процессами в учебнике:
Мы можем изучить случай создания массива numpy, поддерживаемого RawArray, и совместного использования его между процессами Python.
В этом примере мы создадим RawArray , а затем создадим пустой массив из RawArray . Затем мы заполним его данными в основном процессе. Далее RawArray будет использоваться совместно с дочерним процессом, который будет использовать его для создания другого массива numpy, поддерживаемого теми же данными . Затем он изменит данные и прекратит работу. Наконец, основной процесс подтвердит, что данные в массиве были изменены дочерним процессом.
Во-первых, мы можем определить функцию для выполнения в дочернем процессе.
Функция будет принимать RawArray в качестве аргумента и использовать RawArray для создания нового массива numpy. Затем она подтверждает содержимое массива, увеличивает все значения в массиве и подтверждает, что данные в массиве были изменены.
Функция task() ниже реализует это.
# task executed in a child process
def task(array):
# create a new numpy array backed by the raw array
data = frombuffer(array, dtype=double, count=len(array))
# check the contents
print(f'Child {data[:10]}')
# increment the data
data[:] += 1
# confirm change
print(f'Child {data[:10]}')
Далее в основном процессе будет создан новый RawArray из двойных значений, содержащий 10 000 000 элементов.
...
# define the size of the numpy array
n = 10000000
# create the shared array
array = RawArray('d', n)
Затем основной процесс создаст новый массив numpy из RawArray , заполнит его одним значением и подтвердит, что данные в массиве были изменены.
...
# create a new numpy array backed by the raw array
data = frombuffer(array, dtype=double, count=len(array))
# populate the array
data.fill(1.0)
# confirm contents of the new array
print(data[:10], len(data))
После этого дочерний процесс настраивается на выполнение нашей функции task() и передачу ей массива RawArray . Процесс запускается, и основной процесс блокируется до завершения дочернего процесса.
...
# create a child process
child = Process(target=task, args=(array,))
# start the child process
child.start()
# wait for the child process to complete
child.join()
Наконец, основной процесс подтверждает, что дочерний процесс изменил содержимое общего массива.
...
# check some data in the shared array
print(data[:10])
Ниже приведён полный пример:
# share numpy array between processes via a shared raw array
from multiprocessing import Process
from multiprocessing.sharedctypes import RawArray
from numpy import frombuffer
from numpy import double
# task executed in a child process
def task(array):
# create a new numpy array backed by the raw array
data = frombuffer(array, dtype=double, count=len(array))
# check the contents
print(f'Child {data[:10]}')
# increment the data
data[:] += 1
# confirm change
print(f'Child {data[:10]}')
# protect the entry point
if __name__ == '__main__':
# define the size of the numpy array
n = 10000000
# create the shared array
array = RawArray('d', n)
# create a new numpy array backed by the raw array
data = frombuffer(array, dtype=double, count=len(array))
# populate the array
data.fill(1.0)
# confirm contents of the new array
print(data[:10], len(data))
# create a child process
child = Process(target=task, args=(array,))
# start the child process
child.start()
# wait for the child process to complete
child.join()
# check some data in the shared array
print(data[:10])
При выполнении примера сначала создается RawArray с 10 миллионами элементов типа double.
Затем создается массив numpy, который содержит двойные значения и поддерживается RawArray. То есть он не выделяет новую память для массива, а вместо этого повторно использует память, уже выделенную RawArray.
Затем основной процесс заполняет массив numpy одним значением и подтверждает, что содержимое массива изменилось и форма массива соответствует нашим ожиданиям.
Затем запускается дочерний процесс для выполнения нашей функции task() , а основной процесс блокируется до тех пор, пока процесс не завершится.
Дочерний процесс запускается. Сначала он создает новый массив numpy, используя RawArray, переданный в качестве аргумента.
Передача RawArray дочернему процессу не создает копию массива. Вместо этого он передает в процесс ссылку на массив RawArray .
Дочерний процесс подтверждает, что массив содержит одно значение, установленное родительским процессом. Затем он увеличивает все значения в массиве и подтверждает, что значения были изменены.
Это подчеркивает как то, что дочерний массив может напрямую обращаться к тому же массиву, что и родительский процесс, так и то, что он может читать и записывать эти данные.
Дочерний процесс завершается, а основной процесс возобновляется. Это подтверждает, что дочерний процесс изменил содержимое массива.
Это подчеркивает, что изменения, внесенные в массив в дочернем процессе, отражаются в других процессах, таких как родительский процесс.
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] 10000000
Child [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
Child [2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]
[2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]
Пример совместного использования массива NumPy с поддержкой SharedMemory
Мы можем обмениваться массивами numpy между процессами, используя общую память.
Процессы Python не имеют разделяемой памяти. Вместо этого процессы должны имитировать разделяемую память.
Начиная с Python 3.8 в Python был добавлен новый подход к разделяемой памяти в модуле multiprocessing.shared_memory .
Преимущество разделяемой памяти заключается в скорости и эффективности.
Межпроцессное взаимодействие не требуется. Вместо этого процессы могут напрямую читать и записывать один и тот же блок общей памяти, хотя и с ограничениями.
Модуль предоставляет три класса для поддержки разделяемой памяти:
- multiprocessing.shared_memory.SharedMemory
- multiprocessing.shared_memory.ShareableList
- multiprocessing.managers.SharedMemoryManager
Класс SharedMemory — это основной класс для предоставления общей памяти.
Он позволяет создавать блоки общей памяти, называть их и присоединять к ним. Он предоставляет атрибут « buf » для чтения и записи данных в массивной структуре и может быть закрыт и уничтожен.
Вы можете узнать больше об общей памяти в учебнике:
SharedMemory можно создать в процессе, вызвав конструктор и указав « size» в байтах и аргумент « create» равным True.
Например:
...
# create a shared memory
shared_mem = SharedMemory(size=1024, create=True)
Объекту разделяемой памяти можно присвоить осмысленное имя через атрибут name конструктора.
Например:
...
# create a shared memory with a name
shared_mem = SharedMemory(name='MyMemory', size=1024, create=True)
Другой процесс может получить доступ к разделяемой памяти через его имя. Это называется подключением к общей памяти.
Этого можно добиться, указав имя уже созданной разделяемой памяти и установив для аргумента « create » значение False (по умолчанию).
Например:
...
# attach to a shared memory
shared_mem = SharedMemory(name='MyMemory', create=False)
После создания данные могут быть сохранены в общей памяти с помощью атрибута « buf », который действует как массив байтов.
Например:
...
# write data to shared memory
shared_mem.buf[0] = 1
Вы можете узнать больше о создании и использовании класса SharedMemory в учебнике:
Мы можем создать общую память и использовать ее в качестве основы для массива numpy.
Это означает, что несколько процессов могут взаимодействовать с одним и тем же массивом numpy напрямую через общую память, а не передавать копии массива. Это значительно быстрее и имитирует общую память, доступную между потоками.
Можно создать новый экземпляр numpy.ndarray , и мы можем настроить его для использования общей памяти в качестве буфера с помощью аргумента « buffer».
Например:
...
# create a new numpy array that uses the shared memory
data = ndarray((n,), dtype=numpy.double, buffer=shared_mem.buf)
Мы должны убедиться, что SharedMemory имеет « size», достаточно большой для нашего массива numpy.
Например, если мы создадим массив numpy для хранения двойных значений, каждое двойное значение будет занимать 8 байт. Поэтому мы можем установить размер SharedMemory равным 8, умноженным на размер нужного нам массива.
Например:
...
# define the size of the numpy array
n = 10000
# bytes required for the array (8 bytes per value for doubles)
n_bytes = n * 8
# create the shared memory
shared_mem = SharedMemory(name='MyMemory', create=True, size=n_bytes)
Вы можете узнать больше о совместном использовании массива numpy с поддержкой SharedMemory между процессами в учебнике:
Мы можем изучить пример совместного использования массива numpy между процессами, использующими общую память.
В этом примере мы создадим разделяемую память, достаточно большую для хранения нашего массива, а затем создадим массив, поддерживаемый общей памятью. Затем мы запустим дочерний процесс и подключим его к той же общей памяти, создадим массив, поддерживаемый общей памятью, и получим доступ к тем же данным.
В этом примере показано, как несколько процессов могут напрямую взаимодействовать с одним и тем же массивом numpy в памяти.
Во-первых, мы определим задачу для выполнения в дочернем процессе.
task не будет принимать никаких аргументов. Сначала он подключится к общей памяти, а затем создаст новый массив двойных значений numpy, поддерживаемый общей памятью.
...
# define the size of the numpy array
n = 10000
# attach another shared memory block
sm = SharedMemory('MyMemory')
# create a new numpy array that uses the shared memory
data = ndarray((n,), dtype=numpy.double, buffer=sm.buf)
Далее task сообщит о первых 10 значениях массива, увеличит все данные в массиве, а затем подтвердит, что данные в массиве были изменены.
...
# check the contents
print(f'Child {data[:10]}')
# increment the data
data[:] += 1
# confirm change
print(f'Child {data[:10]}')
Наконец, task закроет доступ к общей памяти.
...
# close the shared memory
sm.close()
Функция task() ниже реализует это.
# task executed in a child process
def task():
# define the size of the numpy array
n = 10000
# attach another shared memory block
sm = SharedMemory('MyMemory')
# create a new numpy array that uses the shared memory
data = ndarray((n,), dtype=numpy.double, buffer=sm.buf)
# check the contents
print(f'Child {data[:10]}')
# increment the data
data[:] += 1
# confirm change
print(f'Child {data[:10]}')
# close the shared memory
sm.close()
Далее основной процесс создаст разделяемую память необходимого размера.
Учитывая, что мы знаем, что нам нужен массив numpy заданного размера (10 000 элементов), содержащий двойные значения, мы можем рассчитать размер требуемой памяти напрямую как 8 байтов (на двойное значение), умноженных на размер массива.
...
# define the size of the numpy array
n = 10000
# bytes required for the array (8 bytes per value for doubles)
n_bytes = n * 8
# create the shared memory
sm = SharedMemory(name='MyMemory', create=True, size=n_bytes)
Двойные значения не всегда могут быть 8 байтами, это зависит от платформы.
Вы можете узнать больше о типах данных numpy здесь:
Обратите внимание, что мы можем создать разделяемую память с большим пространством, чем требуется для нашего массива, но не меньшим. Убедитесь, что вы выделили достаточно памяти для своего массива, и сначала выполните некоторые базовые проверки и вычисления, если вы не уверены.
Если буфер слишком мал для хранения массива, вы получите сообщение об ошибке, например:
TypeError: buffer is too small for requested array
Затем основной процесс создаст новый массив numpy заданного размера, содержащий двойные значения и поддерживаемый нашей общей памятью определенного размера.
...
# create a new numpy array that uses the shared memory
data = ndarray((n,), dtype=numpy.double, buffer=sm.buf)
Далее мы заполним массив одним значением, а затем подтвердим, что данные в массиве были изменены и что массив имеет ожидаемые размеры, например, 10 000 элементов.
...
# populate the array
data.fill(1.0)
# confirm contents of the new array
print(data[:10], len(data))
Далее мы создадим, настроим и запустим дочерний процесс для выполнения нашей функции task() и блокировки до ее завершения.
...
# create a child process
child = Process(target=task)
# start the child process
child.start()
# wait for the child process to complete
child.join()
Если вы новичок в запуске дочернего процесса для выполнения целевой функции, вы можете узнать больше в этом руководстве:
Наконец, мы подтвердим, что содержимое общего массива было изменено дочерним процессом, затем закроем доступ к общей памяти и освободим общую память.
...
# check some data in the shared array
print(data[:10])
# close the shared memory
sm.close()
# release the shared memory
sm.unlink()
Ниже приведён полный код:
# share numpy array via a shared memory
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
from numpy import ones
from numpy import ndarray
import numpy
# task executed in a child process
def task():
# define the size of the numpy array
n = 10000
# attach another shared memory block
sm = SharedMemory('MyMemory')
# create a new numpy array that uses the shared memory
data = ndarray((n,), dtype=numpy.double, buffer=sm.buf)
# check the contents
print(f'Child {data[:10]}')
# increment the data
data[:] += 1
# confirm change
print(f'Child {data[:10]}')
# close the shared memory
sm.close()
# protect the entry point
if __name__ == '__main__':
# define the size of the numpy array
n = 10000
# bytes required for the array (8 bytes per value for doubles)
n_bytes = n * 8
# create the shared memory
sm = SharedMemory(name='MyMemory', create=True, size=n_bytes)
# create a new numpy array that uses the shared memory
data = ndarray((n,), dtype=numpy.double, buffer=sm.buf)
# populate the array
data.fill(1.0)
# confirm contents of the new array
print(data[:10], len(data))
# create a child process
child = Process(target=task)
# start the child process
child.start()
# wait for the child process to complete
child.join()
# check some data in the shared array
print(data[:10])
# close the shared memory
sm.close()
# release the shared memory
sm.unlink()
При выполнении примера сначала создается разделяемая память с именем « MyMemory » и размером 80 000 байт (например, 10 КБ * 8 байт на двойное значение).
Затем создается новый массив numpy с заданным размером.
Массив заполняется одним значением и подтверждается содержимое и размер массива.
Затем создается дочерний процесс и настраивается для выполнения нашей функции task() . Процесс запускается, и основной процесс блокируется до завершения дочернего процесса.
Дочерний процесс запускается и подключается к общей памяти по имени, например « MyMemory ». Затем он создает новый массив numpy с тем же размером и типом данных, что и родительский процесс, и поддерживает его в общей памяти.
Дочерний процесс сначала подтверждает, что массив содержит одно значение, установленное родительским процессом. Это подтверждает, что дочерний процесс работает с той же памятью (тот же массив), что и родительский процесс.
Он увеличивает все значения в массиве и подтверждает, что данные были изменены.
Дочерний процесс закрывает доступ к разделяемой памяти и завершается.
Основной процесс возобновляет работу и подтверждает, что содержимое массива было изменено дочерним процессом. Это подтверждает, что изменения, внесенные в массив дочерним процессом, отражаются в родительском процессе.
Наконец, основной процесс закрывает доступ к разделяемой памяти, а затем освобождает разделяемую память.
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] 10000
Child [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
Child [2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]
[2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]
Пример совместного использования прокси массива NumPy, размещенного в Manager
Мы можем эффективно использовать массив numpy между процессами, используя Manager.
Multiprocessing Manager предоставляет способ создания централизованных объектов Python, которые можно безопасно совместно использовать между процессами.
Объекты Manager создают серверный процесс, который используется для размещения объектов Python. Затем Managers возвращают прокси-объекты, используемые для взаимодействия с размещенными объектами.
Вы можете узнать больше о многопроцессорных менеджерах в учебнике:
Массив numpy можно разместить, определив собственный менеджер и настроив его для поддержки массивов numpy.
Для этого необходимо сначала определить собственный менеджер , который расширяет BaseManager .
Например:
# custom manager to support custom classes
class CustomManager(BaseManager):
# nothing
pass
Затем мы можем зарегистрировать наш массив numpy в пользовательском менеджере с помощью функции register() .
Один из подходов заключается в регистрации функции numpy, используемой для создания массива numpy в серверном процессе, такой как функция numpy.ones() .
Например:
...
# register a function for creating numpy arrays on the manager
CustomManager.register('ones', ones)
Затем мы можем создать собственный менеджер для запуска серверного процесса.
...
# create and start the custom manager
with CustomManager() as manager:
# ...
Далее массив numpy может быть создан в процессе сервера путем вызова функции register, после чего возвращается прокси-объект. Затем с размещенным массивом numpy можно взаимодействовать через прокси-объект.
Например:
...
# create a shared numpy array
data_proxy = manager.shared_array((10,10))
Далее прокси-объект может передаваться между процессами, что позволяет нескольким процессам манипулировать одним и тем же размещенным массивом numpy.
Мы можем изучить случай совместного использования массива numpy, хранящегося в менеджере, между процессами.
В этом примере мы создадим массив, размещенный в процессе менеджера, и сообщим сумму значений в массиве. Затем мы запустим дочерний процесс и передадим ему прокси-объекты для массива, и он выполнит ту же операцию суммирования над массивом.
Это покажет, насколько легко нескольким процессам эффективно работать с одним и тем же массивом через прокси-объекты.
Во-первых, мы определим пользовательский класс менеджера, чтобы мы могли регистрировать массивы numpy.
# custom manager to support custom classes
class CustomManager(BaseManager):
# nothing
pass
Далее мы определим функцию для выполнения в дочернем процессе. Функция возьмет прокси-объект для массива numpy и рассчитает сумму значений в массиве.
# task executed in a child process
def task(data_proxy):
# report details of the array
print(f'Array sum (in child): {data_proxy.sum()}')
Далее в основном процессе мы зарегистрируем функцию для создания размещенного массива numpy с помощью пользовательского менеджера.
...
# register a function for creating numpy arrays on the manager
CustomManager.register('shared_array', ones)
Затем мы создадим и запустим пользовательский менеджер и создадим размещенный массив numpy со 100 000 000 элементов.
...
# create and start the custom manager
with CustomManager() as manager:
# define the size of the numpy array
n = 100000000
# create a shared numpy array
data_proxy = manager.shared_array((n,))
print(f'Array created on host: {data_proxy}')
Затем мы вычислим сумму значений в массиве серверного процесса, которая, как мы ожидаем, будет равна 100 000 000, поскольку все значения равны единице.
...
# confirm content
print(f'Array sum: {data_proxy.sum()}')
Наконец, мы создадим и настроим дочерний процесс, настроенный на выполнение нашей функции task(), затем запустим его и дождемся его завершения.
...
# start a child process
process = Process(target=task, args=(data_proxy,))
process.start()
process.join()
Связывая это воедино, полный пример приведен ниже:
# share a numpy array between processes using a manager
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from numpy import ones
# custom manager to support custom classes
class CustomManager(BaseManager):
# nothing
pass
# task executed in a child process
def task(data_proxy):
# report details of the array
print(f'Array sum (in child): {data_proxy.sum()}')
# protect the entry point
if __name__ == '__main__':
# register a function for creating numpy arrays on the manager
CustomManager.register('shared_array', ones)
# create and start the custom manager
with CustomManager() as manager:
# define the size of the numpy array
n = 100000000
# create a shared numpy array
data_proxy = manager.shared_array((n,))
print(f'Array created on host: {data_proxy}')
# confirm content
print(f'Array sum: {data_proxy.sum()}')
# start a child process
process = Process(target=task, args=(data_proxy,))
process.start()
process.join()
Запуск примера сначала регистрирует массив numpy в пользовательском менеджере.
Далее создается и запускается пользовательский менеджер.
Массив numpy создается в серверном процессе менеджера и возвращается прокси-объект. Печатается прокси-объект, предоставляющий строковое представление размещенного массива, подтверждающее, что он имеет одно значение.
Затем родительский процесс сообщает сумму значений в массиве. Это вычисляется в серверном процессе, и затем значение указывается как 100 миллионов, что соответствует нашим ожиданиям.
Затем настраивается дочерний процесс для выполнения нашей функции task() и передачи прокси-объекта. Затем родительский процесс блокируется до завершения дочернего процесса.
Дочерний процесс запускается и выполняет функцию task() . Сумма массива вычисляется серверным процессом и сообщается через дочерний процесс.
Это подчеркивает, что оба процесса легко могут работать непосредственно с одним и тем же размещенным массивом.
Array created on host: array([1., 1., 1., ..., 1., 1., 1.])
Array sum: 100000000.0
Array sum (in child): 100000000.0
Вывод
Теперь вы знаете набор подходов, которые можно предпринимать для совместного использования массива numpy между процессами Python.