UDF-функции Spark для обработки данных

Что такое пользовательские функции в Spark: особенности создания и применения

Пользовательские функции (User Defined Functions, UDF) – это функции, которые не являются встроенными (уже имеющимися) и создаются самим пользователем во время работы с данными при необходимости получить желаемый результат. UDF-функции обычно создаются для дополнительной обработки и могут содержать в себе несколько встроенных функций одновременно. Например, следующий код отвечает за определение функции, которая возводит все указанные числа в квадрат и возвращает результат в виде списка:

def square_array_right(x):

return np.square(x).tolist()

Как видно из кода, при возвращении нужного результата пользовательская функция использует встроенные функции np.square() (функция библиотеки numpy для возведения указанного числа в квадрат) и tolist() (функция для формирования списка из указанных элементов) [1].

Работа с пользовательскими функциями в Spark: несколько практических примеров

Для того, чтобы создать пользовательскую функцию, ее необходимо сначала определить. Это можно сделать с помощью ключевого слова def. Следующий код на языке Python отвечает за определение пользовательской функции для подсчета квадратов чисел:

def square(x):
   return x**2

Для корректной работы с данными в Spark необходимо указывать тип выходных данных. Для этого необходимо зарегистрировать UDF-функцию с помощью метода udf() и указать в качестве параметров регистрируемую функцию и тип выходных данных:

from pyspark.sql.types import IntegerType

square_udf_int = udf(lambda z: square(z), IntegerType())

Созданную пользовательскую функцию можно использовать для того, чтобы получить квадраты всех чисел в датафрейме. Следующий код на языке Python отвечает за создание числового датафрейма и возведение всех его чисел в квадрат:

df_pd = pd.DataFrame(
data={'integers': [1, 2, 3],
'floats': [-1.0, 0.5, 2.7],
'integer_arrays': [[1, 2], [3, 4, 5], [6, 7, 8, 9]]})
df = spark.createDataFrame(df_pd)
df.select('integers','floats',
square_udf_int('integers').alias('int_squared'),
square_udf_int('floats').alias('float_squared')).show()
аналитика больших данных курсы, курсы spark, основы spark, основы hadoop, обучение администраторов spark
Результат работы функции

Из результата видно, что при возведении чисел с плавающей точкой возникла проблема в виде null-результата. Это произошло по причине того, что при создании UDF-функции был указан целый (Integer) тип получаемого результата. Для того, чтобы это исправить, необходимо изменить целый тип возвращаемого результата на тип с плавающей точкой (float):

from pyspark.sql.types import FloatType

square_udf_float = udf(lambda z: square(z), FloatType())

df.select(‘integers’,

‘floats’,square_udf_float(‘integers’).alias(‘int_squared’),

square_udf_float(‘floats’).alias(‘float_squared’)).show()

аналитика больших данных курсы, курсы spark, основы spark, основы hadoop, обучение администраторов spark
Результат изменения типа возвращаемого результата

Теперь возникла проблема в виде null-значений при возведении в квадрат целых чисел. Чтобы избежать этого, необходимо привести возвращаемый результат к типу Float при определении функции следующим образом:

def square(x):

return float(x**2)

Теперь можно без проблем возводить в квадрат числа типов Integer и Float [2]:

square_udf_float = udf(lambda z: square_float(z), FloatType())

аналитика больших данных курсы, курсы spark, основы spark, основы hadoop, обучение администраторов spark
Полученный правильный результат

Можно заметить, что результат возведения в квадрат целых и дробных чисел имеет тип Float. Такой подход отлично решает проблему с конфликтом типов данных и исключает возможность получения некорректного результата.

Таким образом, благодаря поддержке пользовательских функций, фреймворк Apache Spark позволяет пользователям работать с большими данными, не ограничивая их в методике обработки. Все это делает фреймворк Apache Spark весьма полезным средством для Data Scientist’а и разработчика Big Data приложений. В следующей статье мы поговорим про сборку Spark-приложений.

Больше подробностей про применение Apache Spark в проектах анализа больших данных, разработки Big Data приложений и прочих прикладных областях Data Science вы узнаете на практических курсах по Spark в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:

Обработку данных большого объёма можно удобно и быстро производить на кластере с помощью Spark. Но что делать, если встроенных функций не хватает? Написать свою!

Во время работы с таблицами в Spark возникают ситуации, когда для обработки данных набора встроенных функций оказывается недостаточно. В этом случае можно выгрузить таблицу в Pandas DataFrame и обрабатывать данные на Python привычными функциями. Однако, есть способ сделать это быстрее – UDF-функции в Spark.

UDF (User Defined Functions) – это функции, которые не содержатся во встроенных модулях Spark и определяются самим пользователем. UDF позволяют расширить возможности обработки данных и могут содержать в себе комбинацию встроенных функций.

Разберём работу UDF-функций на примере задачи парсинга поля таблицы. Например, есть таблица Hive, состоящая из колонок ID и Comment. В последней содержатся строки с ФИО, датой рождения и номером договора клиента определенной структуры (таблица 1).

Таблица 1 – Пример таблицы в Hive (идентификационные данные пользователей фиктивны, любые совпадения случайны).

Задача – разбить комментарий на отдельные столбцы (ФИО, дата рождения и номер договора).

Записываем таблицу из Hive в Spark DataFrame и выводим результат на экран. По умолчанию, при использовании метода show(), длинные строки отображаются не полностью. Чтобы этого не произошло, используем параметр «truncate» со значением «False».

data = spark.sql('''select * from schema.table''')
data.show(truncate=False)

+---+-------------------------------------------------------------------+
|ID |Comment                                                            |
+---+-------------------------------------------------------------------+
|4  |ФИО: Сидорова Любовь Ивановна, ДР: 01.01.2003, Номер договора: 0104|
|3  |ФИО: Иванова Юлия Сергеевна, ДР: 01.01.2002, Номер договора: 0103  |
|5  |ФИО: Сидоров Василий Петрович, ДР: 01.01.2004, Номер договора: 0105|
|1  |ФИО: Иванов Иван Иванович, ДР: 01.01.2000, Номер договора: 0101    |
|2  |ФИО: Петров Петр Петрович, ДР: 01.01.2001, Номер договора: 0102    |
+---+-------------------------------------------------------------------+

Переходим, непосредственно, к объявлению UDF. Импортируем библиотеки с регулярными выражениями и модуль с функциями pyspark.

import re
import pyspark.sql.functions as f

Для каждого результирующего поля (ФИО, дата рождения, номер договора) будем применять свою функцию. Прежде всего, объявляем их с помощью «def» и описываем внутри выполняемые действия. Здесь для извлечения информации в каждой строке находим нужную подстроку и удаляем сами сущности, указывающие на структуру («ФИО:», «ДР:», «Номер договора:»).

def pars_name(text):
    result = re.findall(r'ФИО:\s[^,]*', text)
    result = re.sub(r'ФИО:\s','', result[0])
    return result

def pars_birthday(text):
    result = re.findall(r'ДР:\s[^,]*', text)
    result = re.sub(r'ДР:\s','', result[0])
    return result

def pars_contract(text):
    result = re.findall(r'Номер договора:\s[^,]*', text)
    result = re.sub(r'Номер договора:\s','', result[0])
    return result

Далее, необходимо провести регистрацию функций, используя метод udf() модуля pyspark.sql.functions. В параметрах указываем саму функцию и тип её выходных данных. Для описанных выше процедур парсинга результатами будут строки (StringType()). Чтобы функция применилась к каждому элементу столбца, а не к одной строке, используем Лямбда-функцию Python.

U_name = f.udf(lambda text :pars_name(text), StringType())
U_birthday = f.udf(lambda text :pars_birthday(text), StringType())
U_contract = f.udf(lambda text :pars_contract(text), StringType())

Применение UDF-функций происходит путём добавления новых столбцов в DataFrame.

data = data.withColumn('Name',U_name('Comment')).withColumn('Birthday', U_birthday('Comment')).withColumn('Contract',U_contract('Comment'))data.show(truncate=False)

Результат – DataFrame Spark с новыми отдельными полями, который, при необходимости, можно легко сохранить как таблицу в Hive. Кроме того, появляется возможность продолжить обработку данных внутри Spark, соединяя результат с другими таблицами по полученным столбцам.

UDF-функции позволяют проводить дополнительную обработку данных в Spark, выходящую за рамки возможностей встроенных функций. Рассмотрели пример парсинга столбца со строками определённой структуры, однако, они могут находить применение в самых различных задачах обработки данных.

https://t.me/data_analysis_ml

источник

spark school

Ответить