ETL: Извлечение, преобразование, загрузка данных с Python!
🔎 Что такое ETL и почему он важен?
Представьте себе владельца перспективного стартапа, вооруженного невероятным алгоритмом искусственного интеллекта, который предсказывает риск развития диабета на основе роста и массы тела. Однако ваши данные разбросаны по файлам CSV и JSON, а измерения производятся в разных единицах. Используйте ETL!
ETL – аббревиатура от Extract, Transform, Load.
Извлечение, преобразование и загрузка данных 🧲 – это сбор огромного количества данных из различных источников, преобразование их в единый формат и загрузка в центральную базу данных или целевой файл. 🗄️
📝 Реализуем ETL с помощью Python
Давайте приступим к практической работе с Python! 🤓 Начнем с определения функции extract, которая предполагает использование функции glob из модуля glob. Эта удобная функция позволяет находить файлы с определенными расширениями (например, .json и .csv) и извлекать из них данные, преобразуя их в фреймы данных для дальнейшей обработки. 📁
📚🔍 Импортируем некоторые важные библиотеки, необходимые для выполнения операций! 💻💡
import glob # this module helps in selecting files
import pandas as pd # this module helps in processing CSV files
import xml.etree.ElementTree as ET # this module helps in processing XML files.
from datetime import datetime
tmpfile = "temp.tmp" # file used to store all extracted data
logfile = "logfile.txt" # all event logs will be stored in this file
targetfile = "transformed_data.csv"
def extract_from_csv(file_to_process):
dataframe = pd.read_csv(file_to_process)
return dataframe
def extract_from_json(file_to_process):
dataframe = pd.read_json(file_to_process,lines=True)
return dataframe
def extract_from_xml(file_to_process):
dataframe = pd.DataFrame(columns=["name", "height", "weight"])
tree = ET.parse(file_to_process)
root = tree.getroot()
for person in root:
name = person.find("name").text
height = float(person.find("height").text)
weight = float(person.find("weight").text)
dataframe = dataframe.append({"name":name, "height":height, "weight":weight}, ignore_index=True)
return dataframe
def extract():
extracted_data = pd.DataFrame(columns=['name','height','weight']) # create an empty data frame to hold extracted data
#process all csv files
for csvfile in glob.glob("*.csv"):
extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
#process all json files
for jsonfile in glob.glob("*.json"):
extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
#process all xml files
for xmlfile in glob.glob("*.xml"):
extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)
return extracted_data
🔄 Преобразование данных для обеспечения их целостности
На этапе преобразования мы совершаем некоторую магию с данными! Мы преобразуем рост в метрические (дюймы в миллиметры), а вес – из фунтов в килограммы, обеспечивая согласованность всего набора данных. Мы увидим, как Python с легкостью справляется со сложными операциями, доводя наши данные до совершенства. 📐
def transform(data):
#Convert height which is in inches to millimeter
#Convert the datatype of the column into float
data.height = data.height.astype(float)
#Convert inches to meters and round off to two decimals(one inch is 0.0254 meters)
data['height'] = round(data.height * 0.0254,2)
#Convert weight which is in pounds to kilograms
#Convert the datatype of the column into float
data.weight = data.weight.astype(float)
#Convert pounds to kilograms and round off to two decimals(one pound is 0.45359237 kilograms)
data['weight'] = round(data.weight * 0.45359237,2)
return data
📥 Загрузка преобразованных данных
Когда данные преобразованы, пришло время загрузить их в нужный нам целевой файл или базу данных. В данном случае мы сохраним наш фрейм данных в CSV-файл с помощью библиотеки pandas. Наше ETL-путешествие приближается к кульминации! 📈
def load(targetfile,data_to_load):
data_to_load.to_csv(targetfile)
📜 Временные метки
Мы реализуем функцию времени, которая будет фиксировать время начала и завершения каждого этапа. Таким образом, мы будем иметь четкое представление о том, когда начинается и завершается каждый этап ETL-процесса. ⏰
# Import the datetime module to work with timestamps
from datetime import datetime
# Define a function called 'log' that takes a 'message' as input
def log(message):
# Define the timestamp format to be used in the log entries
timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second
# Get the current timestamp
now = datetime.now()
# Convert the timestamp to a formatted string using the defined format
timestamp = now.strftime(timestamp_format)
# Open the "logfile.txt" in append mode to add log entries
with open("logfile.txt", "a") as f:
# Write the timestamp, message, and a newline character to the file
f.write(timestamp + ',' + message + '\n')
# Log the start of the ETL (Extract, Transform, Load) job
log("ETL Job Started")
# Log the start of the extract phase
log("Extract phase Started")
# Call the 'extract' function to get the extracted data
extracted_data = extract()
# Log the end of the extract phase
log("Extract phase Ended")
# Display the extracted data (This line doesn't log anything, but it's here to show the data)
# Log the start of the transform phase
log("Transform phase Started")
# Call the 'transform' function with 'extracted_data' as input to get the transformed data
transformed_data = transform(extracted_data)
# Log the end of the transform phase
log("Transform phase Ended")
# Display the transformed data (This line doesn't log anything, but it's here to show the data)
# Log the start of the load phase
log("Load phase Started")
# Call the 'load' function with 'targetfile' and 'transformed_data' as inputs to load the data
load(targetfile, transformed_data)
# Log the end of the load phase
log("Load phase Ended")
# Log the end of the ETL job
log("ETL Job Ended")
✨ Симфония ETL
Когда мы соединяем все части вместе, симфония извлечения, преобразования и загрузки начинает играть в гармонии. Мы будем последовательно вызывать каждую функцию, обеспечивая бесперебойную работу ETL-процесса. Функция протоколирования обеспечивает наличие временных меток, отмечающих этапы пути. 🎹
💡 Основные выводы:
- Понимание процесса ETL и его значения для интеграции данных.
- Практическая реализация ETL с использованием Python, glob, pandas и datetime.
- Добавление прозрачности с помощью протоколирования для отслеживания времени выполнения процесса.
- Возможности Python в манипулировании и преобразовании данных.