10 необходимых функций PySpark

PySpark – это мощный фреймворк для обработки и анализа больших данных, в частности для распределенной обработки данных с помощью Apache Spark. Ниже приведены десять важных тем по PySpark с примерами. Это должно охватить большинство используемых функциональных возможностей Spark.

SparkSession: SparkSession – это точка входа в любую функциональность Spark. Он позволяет взаимодействовать со Spark и выполнять различные операции Spark. Это унифицированный способ работы с различными источниками данных, распределенной обработки данных и выполнения различных операций Spark, таких как создание DataFrames, выполнение SQL-запросов и управление конфигурациями.

Объект SparkSession обеспечивает единую точку доступа ко всей функциональности Spark и служит заменой устаревшим SparkContext и SQLContext. Объединяя оба контекста в один унифицированный интерфейс, SparkSession упрощает использование и управление Spark-приложениями.

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Test SparkSession functionality
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 22)]
df = spark.createDataFrame(data, ["id", "name", "age"])
df.show()

DataFrames и Datasets: DataFrames – это распределенные коллекции данных, организованные в именованные столбцы. Наборы данных являются сильно типизированными, как и фреймы данных, но имеют преимущество в виде безопасности типов во время компиляции. DataFrame похож на таблицу в реляционной базе данных или электронную таблицу с дополнительным преимуществом – он может быть распределен по нескольким узлам кластера. DataFrame являются основной абстракцией данных в PySpark и предназначены для эффективной работы с крупномасштабными структурированными и полуструктурированными данными. DataFrame можно создавать из различных источников данных, таких как RDD, CSV-файлы, JSON, Parquet, таблицы Hive и т.д. DataFrames в PySpark поддерживают как структурированные, так и полуструктурированные данные, что делает их универсальными для решения различных задач обработки данных.

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Test DataFrames functionality
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 22)]
df = spark.createDataFrame(data, ["id", "name", "age"])
df.show()

# Test Datasets functionality
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize(data)
people_ds = rdd.map(lambda x: Row(id=x[0], name=x[1], age=int(x[2]))).toDF()
people_ds.show()

Преобразования и действия: PySpark поддерживает преобразования (например, filter, select, groupBy, map, join, withColumn, drop, orderBy) для изменения DataFrames/Datasets и действия (например, show, count, collect, take, save, foreach) для запуска вычислений.

Преобразования являются “ленивыми”, то есть они не выполняются немедленно. Вместо этого они строят последовательность операций, которые будут выполнены позже, когда будет вызвано действие. Такая “ленивая” оценка является ключевой особенностью Spark и обеспечивает оптимизацию и эффективность.

Действия это операции, которые запускают выполнение всех ранее определенных преобразований и возвращают значение программе-драйверу или записывают данные во внешнюю систему хранения. При вызове действия Spark оценивает последовательность преобразований и вычисляет результат.

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Test transformations and actions
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 22)]
df = spark.createDataFrame(data, ["id", "name", "age"])

# Transformation: Filter people older than 25
filtered_df = df.filter(df.age > 25)

# Action: Show the result
filtered_df.show()

Объединения: PySpark позволяет объединять DataFrames/Datasets на основе заданного условия.

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Test joins
data1 = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
data2 = [(1, 25), (2, 30), (4, 22)]
df1 = spark.createDataFrame(data1, ["id", "name"])
df2 = spark.createDataFrame(data2, ["id", "age"])

# Join df1 and df2 on "id" column
joined_df = df1.join(df2, on="id", how="inner")

# Show the result
joined_df.show()

Агрегирование: PySpark предоставляет различные функции агрегирования, такие как sum, avg, min, max и т.д., для обобщения данных. Агрегации обычно используются для статистического анализа, обобщения и создания отчетов по большим наборам данных.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder.appName("AggregationExample").getOrCreate()

# Sample data in the form of a Python list
data = [(1, "Alice", 100), (2, "Bob", 200), (3, "Charlie", 150)]
df = spark.createDataFrame(data, ["id", "name", "salary"])

# Aggregations
# Calculate the sum of the salary column
sum_salary = df.agg(F.sum("salary")).first()[0]
print("Sum of Salary:", sum_salary)

# Calculate the average salary
avg_salary = df.agg(F.avg("salary")).first()[0]
print("Average Salary:", avg_salary)

# Find the minimum and maximum salary
min_salary = df.agg(F.min("salary")).first()[0]
max_salary = df.agg(F.max("salary")).first()[0]
print("Min Salary:", min_salary)
print("Max Salary:", max_salary)

# Count the number of distinct names
distinct_names = df.agg(F.countDistinct("name")).first()[0]
print("Distinct Names Count:", distinct_names)

Функции, определяемые пользователем (UDFs): PySpark позволяет определять свои пользовательские функции и применять их к фреймам/наборам данных.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Test User-Defined Function (UDF)
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])

# Define a UDF to add a prefix to the name
def add_prefix(name):
    return "Mr. " + name

# Register the UDF
spark.udf.register("add_prefix_udf", add_prefix)

# Apply the UDF to the DataFrame
df.withColumn("prefixed_name", F.expr("add_prefix_udf(name)")).show()

Оконные функции: Оконные функции позволяют выполнять вычисления в диапазоне строк, связанных с текущей строкой.

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Test window functions
data = [(1, "Alice", 100), (2, "Bob", 200), (3, "Charlie", 150)]
df = spark.createDataFrame(data, ["id", "name", "salary"])

# Define a Window specification
window_spec = Window.orderBy(F.col("salary"))

# Calculate the rank based on salary
df.withColumn("rank", F.rank().over(window_spec)).show()

Широковещательные объединения: Широковещательные соединения используются, когда один из DataFrame достаточно мал, чтобы поместиться в памяти. При этом оптимизируется производительность соединения за счет широковещательной рассылки небольшого DataFrame всем узлам.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Test broadcast joins
data1 = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
data2 = [(1, 25), (2, 30), (4, 22)]
df1 = spark.createDataFrame(data1, ["id", "name"])
df2 = spark.createDataFrame(data2, ["id", "age"])

# Broadcast join df1 and df2 on "id" column
joined_df = df1.join(F.broadcast(df2), on="id", how="inner")

# Show the result
joined_df.show()

Кэширование: PySpark позволяет кэшировать DataFrames/Datasets в памяти, чтобы избежать избыточных вычислений и повысить производительность.

MEMORY_ONLY: Это уровень кэширования по умолчанию в PySpark. Он кэширует данные в памяти в виде десериализованных Java-объектов. Если данные не помещаются в памяти, Spark автоматически выкладывает лишние разделы на диск.

MEMORY_AND_DISK: Этот уровень кэширования хранит данные в памяти до тех пор, пока они помещаются, а лишние разделы выгружает на диск. Этот тип кэширования полезен, когда размер DataFrame/RDD превышает объем доступной памяти.

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Test caching
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])

# Cache the DataFrame - Default to Spark's internal memory
df.cache()

# Cache the DataFrame in memory and spill to disk if necessary
df.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK)  

# Count the rows (trigger computation)
print("Row count:", df.count())

# Count again (should be faster since the DataFrame is cached)
print("Row count:", df.count())

Чтобы освободить кэшированные данные и освободить память, можно использовать метод unpersist() для df.

Сериализация и сжатие: PySpark поддерживает различные форматы сериализации и методы сжатия для оптимизации эффективности хранения, передачи и обработки данных. Они играют важную роль в работе с крупномасштабными распределенными данными, особенно при работе с большими данными.

Сериализация – это процесс преобразования данных в формат, который можно легко хранить, передавать или обрабатывать. В PySpark сериализация имеет решающее значение при необходимости передачи данных между узлами кластера или записи их во внешние системы хранения. Использование эффективных форматов сериализации позволяет снизить накладные расходы на передачу данных и повысить общую производительность.

Сжатие – это процесс уменьшения размера данных для экономии места в хранилище или ускорения их передачи. В PySpark можно сжимать данные при хранении или передаче для минимизации занимаемой памяти и повышения производительности ввода-вывода.

from pyspark.sql import SparkSession

# Create a SparkSession with custom serialization and compression settings
spark = SparkSession.builder \
    .appName("example") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
    .config("spark.sql.inMemoryColumnarStorage.batchSize", "10000") \
    .getOrCreate()

# Your Spark operations here...

Эти примеры охватывают широкий спектр основных функций и возможностей PySpark. Изучая и экспериментируя с этими темами, вы сможете получить полное представление о возможностях PySpark для обработки и анализа больших данных.

+1
0
+1
0
+1
0
+1
0
+1
0

Ответить

Ваш адрес email не будет опубликован. Обязательные поля помечены *