Python В SQL — Теперь я могу загружать данные в 20 раз быстрее
Python В SQL — Теперь я могу загружать данные в 20 раз быстрее
Работа с массивными наборами данных – повседневное дело для большинства специалистов по обработке данных. Не было бы никаких проблем, если бы они сразу передавались потоком в базу данных.
Но, зачастую, случается так, что загрузка данных происходит очень долго. В таких случаях программистам приходится занимать себя другими делами, дожидаясь, пока процесс полностью завершится. Такой вариант подходит далеко не всем!
Если вы действительно хотите сократить это время, вам нужен наиболее оптимальный способ загрузки данных в БД.
Если это предварительно отформатированный файл, я бы предпочёл использовать для него клиентские библиотеки. Например, следующая команда может загрузить ваш CSV-файл в удалённую базу данных Postgres:
psql \
-h $hostname -d $dbname -U $username \
-c "\copy mytable (column1, column2) from '/path/to/local/file.csv' with delimiter as ','"
Но такое случается редко (по крайней мере, в моей памяти).
Поскольку Python является моим основным языком программирования, я должен загрузить их с Python (возможно, после некоторой предварительной обработки). Поэтому я провёл небольшой эксперимент, чтобы найти самый быстрый способ.
Как оказалось позже, самый быстрый способ не являлся тем, который я использовал на повседневной основе.
Самый ужасный способ: Как не стоит загружать данные
Хотя теперь я уверен, что мне никогда не следует использовать этот метод, изначально я думал, что он сможет мне помочь.
Вот что мы делаем: у меня есть набор данных размером около 500 МБ на диске. Сначала я попытался загрузить его с помощью команды insert
в модуле psycopg2
:
import psycopg2
import csv
import time
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"Function {func.__name__} took {end_time - start_time} seconds to run.")
return result
return wrapper
# Establish a connection to the PostgreSQL database
conn = psycopg2.connect(
host="localhost",
database="playground",
user="<your-user-name>",
password="<your-password>",
)
# Define the path to the CSV file and the name of the table to load it into
csv_file_path = "./data.csv"
table_name = "temp_table"
@timing_decorator
def load_csv_with_insert():
# Create a cursor object
cur = conn.cursor()
# Open the CSV file and read its contents
with open(csv_file_path, 'r') as f:
csv_reader = csv.reader(f)
next(csv_reader) # skip header row
# Define the query to insert data into the table
insert_query = f"INSERT INTO {table_name} VALUES ({','.join(['%s']*len(next(csv_reader)))})"
# Iterate over the CSV rows and execute the insert query for each row
for row in csv_reader:
cur.execute(insert_query, row)
# Commit the changes to the database
conn.commit()
# Close the cursor and connection
cur.close()
load_csv_with_insert()
conn.close()
Я использую декоратор синхронизации, чтобы измерить, сколько времени потребуется для загрузки. Это один из пяти моих любимых декораторов на языке python.
Кроме того, я намеренно сохранил базу данных на своём локальном хостинге. Таким образом, пропускная способность не будет учитывается в уравнении.
Ранее я искренне думал, что это может быть самым быстрым способом загрузки данных. Это потому, что мы используем курсор для вставки данных и фиксации одновременно. Но весь процесс занял очень много времени:
Function load_csv_with_insert took 1046.109834432602 seconds to run.
Далее давайте попробуем самый популярный способ.
Плохой способ: использование Pandas to_sql для загрузки массивных наборов данных.
Если вы часто используете Pandas и to_sql
API, этот способ может вас удивить.
Я использовал его и продолжаю использовать. Но это всё ещё не лучший способ для больших наборов данных.
Вот мой код. Я использую ту же базу данных и CSV, что и раньше. Я сократил таблицу перед тем, как начать загрузку датасета.
import pandas as pd
from sqlalchemy import create_engine
import time
...
conn = create_engine("postgresql+psycopg2://thuwa:Flora1990@localhost:5432/playground")
@timing_decorator
def load_csv_with_pandas():
df = pd.read_csv(csv_file_path)
df.to_sql(table_name, conn, if_exists="append", chunksize=100, index=False)
load_csv_with_pandas()
В приведённом выше коде я не использую популярный аргумент method
. Установка значения multi
ускорит загрузку данных в аналитическую базу данных, такую как Redshift. Но в нашем случае мы используем транзакционную базу данных.
Возвращаясь к главному, загрузка этого метода заняла у нас 376 секунд.
Function load_csv_with_pandas took 376.70790338516235 seconds to run.
Вывод таков: данный способ почти в 3 раза быстрее, чем предыдущий. Но, как вы уже поняли, есть способы и получше!
Хороший способ: Использование метода COPY
Существует собственный способ загрузки текстовых данных в таблицы SQL. И такие библиотеки, как psycopg2, могут предложить нам его.
Да, мы можем скопировать файл как есть в SQL-таблицы на Python.
...
@timing_decorator
def load_csv_with_copy():
# Create a cursor object
cur = conn.cursor()
# Use the COPY command to load the CSV file into the table
with open(csv_file_path, "r") as f:
next(f) # skip header row
cur.copy_from(f, table_name, sep=",")
conn.commit()
# Close the cursor and connection
cur.close()
Метод copy_from
в курсоре использует метод COPY
в SQL client API и напрямую загружает файл в SQL. Вместе с этим мы передаём дополнительные аргументы.
Здесь, в данном случае, мы указываем, что столбцы разделяются запятой. Мы также использовали следующий метод, чтобы пропустить первую строку, которая является заголовком таблицы.
Вот результаты:
Function load_csv_with_copy took 50.60594058036804 seconds to run.
Поразительные 50 секунд — в 20 раз быстрее, чем при использовании курсора, и почти в 8 раз быстрее, чем метод to_sql
Pandas.
Но подождите!
Я сказал, что использую Python, потому что часто выполняю предварительную обработку данных. Два других метода просты. Но как мы будем загружать существующий набор данных в Python runtime с помощью этого метода?
Запись набора данных, существующего в памяти, с использованием COPY
Вот где мы можем извлечь выгоду из буферного метода. Ниже представлен код для быстрой загрузки существующего фрейма данных Pandas в базу данных SQL:
import io
...
@timing_decorator
def load_dataframe_with_copy(df):
# Create a cursor object
cur = conn.cursor()
# Convert the DataFrame to a CSV file-like object
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False, header=False)
# Use the COPY command to load the CSV file into the table
csv_buffer.seek(0)
cur.copy_from(csv_buffer, table_name, sep=",")
conn.commit()
# Close the cursor and connection
cur.close()
df = pd.read_csv(csv_file_path)
# Do data processing on df
load_dataframe_with_copy(df)
Этот код создаёт объект StringIO
с именем csv_buffer
, который представляет собой файлоподобный объект, ведущий себя как CSV-файл. Фрейм данных записывается в этот объект с использованием метода to_csv()
с index=False
и header=False
, чтобы исключить индекс и заголовок из выходных данных CSV.
Затем для объекта csv_buffer
вызывается метод seek(0)
, чтобы переместить указатель на файл обратно в начало файлоподобного объекта.
Заключение
Работа с большими наборами данных – это не то же самое, что работа с обычным датасетом. И одной из сложнейших задач здесь является загрузка таких гигантских наборов данных в БД.
Если бы предварительная обработка не требовалась, я бы почти всегда использовал метод Pandas to_sql
. Тем не менее, мой эксперимент показывает, что это не лучший способ для больших наборов данных.
Метод COPY
– самый быстрый из всех, что я видел до сих пор. Хотя я предлагаю его для пакетной загрузки в контролируемой среде, для повседневных задач to_sql
имеет фантастический интерфейс для настройки нескольких режимов загрузки.