Как решить задачу классификации в PySpark
BigData плотно входит в нашу жизнь. Датасеты растут и постоянно изменяются, что усложняет задачу кластеризации клиентов. Обычно для задач кластеризации используется библиотека Sklearn, но с большим объёмом данных её использовать не получиться. Spark позволяет реализовать параллельные вычисления на кластерах и имеет в составе своего фреймворка библиотеку машинного обучения MLlib. В случае больших данных, когда привычные инструменты отказываются работать с такими объёмами, PySpark приходит на выручку.
При этом, прежде чем запустить алгоритмы машинного обучения на вашем датасете, необходимо подготовить данные и провести feature engineering, а это достаточно трудозатратная задача, но в то же время необходимая, так как от этого этапа во многом зависит качество конечного результата. Данный этап также необходимо делать на PySpark, опять-таки из-за объёма данных.
Перед нами стояла задача анализа массива данных заёмщиков физических лиц – злостных неплательщиков кредитов, дела по которым уже направлены в суд. Этот массив необходимо было разбить на блоки (кластеры). Цель кластерного анализа – понять, какие группы по общим признакам можно выделить, и в дальнейшем разработать для каждой группы индивидуальную тактику взыскания, и, возможно, найти пути улучшения методологии скоринга.
1. Подготовка данных
1.1. Идеи
Подготовка данных – этап, предшествующий анализу и требующий хорошего понимания предметной области. Предобработка осуществляется если не руками самого эксперта в этой области, то в очень тесном с ним сотрудничестве. Останавливаться на предварительной подготовке данных долго не будем, поскольку общих рекомендаций здесь не выработать, только кратко отметим основные моменты, которые мы произвели с нашим датасетом и которые отличаются от классической борьбы с отсутствующими значениями.
Выбирали признаки, которые:
• непосредственно характеризуют именно самого заёмщика, а не кредитный продукт, не договор и прочее;
• имеют значение до выхода на просрочку (то есть, например, данные по процедурам взыскания в анализ не берём, так как хотим разобраться в причинах, которые к этому привели).
Убрали признаки:
• дублирующие друг друга по существу (например, остаток основного долга (ОД) в валюте и остаток ОД в рублях – достаточно оставить только один показатель);
• по которым слишком много вариантов (например, 100-200 значений для признака «должность на месте работы»).
В результате предобработки датасета количество исходных данных существенно сократилось. В исходном датасете количество признаков достигало 191, после чистки на основе описанных выше идей их осталось 43. Среди них:
• признаки, связанные с первым кредитным договором: вид кредитования, срок кредита, признак реструктуризации, дата выдачи кредита, ставка, валюта и т.д.;
• числовые признаки (итого по всем договорам): сумма обеспечения, сумма общей задолженности в рублях, сумма погашений по основному долгу;
• признаки — индивидуальные характеристики заёмщика: пол, возраст, резидентство, признак vip, наличие заграничного паспорта, категория надёжности, данные, связанные с рабочей деятельности, данные, связанные с собственностью и т.д.
1.2 Реализация на PySpark
После импорта необходимых библиотеки и создания сессии Spark, входной точки каждого PySpark приложения, загружаем исходные данные и подготовленный совместно с экспертом список признаков в объекты Spark DataFrame. DataFrame – одна из двух абстракций массива данных в Spark (вторая абстракция — RDD), которая предоставляет более высокоуровневое API (по сравнению с RDD). Метод загрузки данных зависит от формата файла (в рассматриваемом примере CSV, но может быть JSON, ORC, Parquet и др.).
Далее на основе списка признаков из исходного датасета отбираем нужные колонки. Затем группируем полученный массив данных по идентификатору заёмщика, при этом для колонок с числовыми значениями данные суммируем, а для колонок с категориальными значениями оставляем только первое значение.
PySpark позволяет работать не только с большими данными (Big data), но и создавать модели машинного обучения (Machine Learning). Сегодня мы расскажем вам о модуле ML и покажем, как обучить модель Machine Learning для решения задачи классификации. Читайте у нас: подготовка данных, применение логистической регрессии, а также использование метрик качеств в PySpark.
Датасет с домами на продажу
В качестве примера мы будем использовать датасет Kaggle, который содержит данные о домах на продажу в Бруклине с 2003 по 2017 года и доступен для скачивания. Он содержит 111 атрибутов (столбцов) и 390883 записей (строк). В атрибуты включены: дата продажи, дата постройки, цена на дом, налоговый класс, соседние регионы, долгота, ширина и др.
# Если у вас Google Colab, то раскомментируйте
# import findspark
# findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
data = spark.read.csv(
'brooklyn_sales_map.csv',
inferSchema=True, header=True)
О Kaggle API и о том, как установить PySpark в Google Colab, читайте здесь.
Готовим атрибут для последующей бинарной классификации
Допустим, требуется классифицировать налоговый класс на дом (tax_class). Всего имеется 10 таких классов. Поскольку данные распределены неравномерно (например, в классе 1 имеется 198969 записей, а в 3-м — только 18), мы разделим их на 2 категории: те, которые принадлежат классу 1, и остальные. В Python это делается очень просто, нужно просто вызвать метод replace
:
by_1 = ['1', '1A', '1B', '1C']by_others = ['2', '2A', '2B', '2C', '3', '4']data = data.replace(by_others, '0', ['tax_class'])
data = data.replace(by_1, '1', ['tax_class'])
Кроме того, алгоритмы Machine Learning в PySpark работают с числовым значениями, а не со строками. Поэтому преобразуем значения столбца tax_class
в тип int:
data = data.withColumn('tax_class', data.tax_class.cast('int'))
Подбор признаков и преобразование категорий
Выберем следующие признаки для обучения модели Machine Learning: год постройки (year_of_sale
), цена на дом (sale_price)
и соседние регионы (neighborhood
). Последние атрибут является категориальным признаком — в данных имеется 20 соседних регионов. Но опять же все значения этих категорий являются строковыми, поэтому нужно преобразовать их в числовые.
Можно воспользоваться методом replace
, как это сделано выше, но придётся сначала извлечь названия всех 20 регионов. А можно использовать специальный класс StringIndexer из PySpark-модуля ML, который выполнит за нас всю работу. Объект этого класса принимает в качестве аргументов: название атрибута, который нужно преобразовать (inputCol
), и название, которое будет иметь преобразованный атрибут (outputCol
). Вот так это выглядит в Python:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="neighborhood", outputCol="neighborhood_id")
data = indexer.fit(data).transform(data)
Преобразованные категории имеют вид:
data.groupBy('neighborhood_id').count().show()
#
+---------------+-----+
|neighborhood_id|count|
+---------------+-----+
| 8.0|13215|
| 0.0|27279|
| 7.0|13387|
| 49.0| 2271|
| 29.0| 5074|
| 47.0| 2422|
| 42.0| 3086|
| 44.0| 2802|
| 35.0| 4000|
| 62.0| 2|
| 18.0| 7342|
| 1.0|21206|
| 39.0| 3396|
| 37.0| 3894|
| 34.0| 4037|
| 25.0| 5809|
| 36.0| 3984|
| 41.0| 3138|
| 4.0|14608|
| 23.0| 6374|
+---------------+-----+
Теперь выберем необходимые признаки, а также отбросим строки с пустыми значениями с помощью метода dropna
в PySpark:
features = ['year_of_sale', 'sale_price', 'neighborhood_id']target = 'tax_class'
attributes = features + [target]sample = data.select(attributes).dropna()
Векторизация признаков
Поскольку алгоритмы машинного обучения в PySpark принимают на вход только вектора, то нужно провести векторизацию. Для преобразования признаков в вектора используется класс VectorAssembler. Объект этого класса принимает в качестве аргументов список с названиями признаков, которые нужно векторизовать (inputCols), и название преобразованного признака (outputCol). После создания объекта VectorAssembler вызывается метод transform
.
Для начала выберем в качестве признака для преобразования — цену на дом. Код на Python:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['sale_price'],
outputCol='features')
output = assembler.transform(sample)
Полученный после векторизации DataFrame выглядит следующим образом:
output.show(5)
+------------+----------+---------------+---------+--------------+
|year_of_sale|sale_price|neighborhood_id|tax_class| features|
+------------+----------+---------------+---------+--------------+
| 2008| 499401179| 48.0| 0|[4.99401179E8]|
| 2016| 345000000| 41.0| 0| [3.45E8]|
| 2016| 340000000| 27.0| 0| [3.4E8]|
+------------+----------+---------------+---------+--------------+
Разделение датасета и обучение модели
Для решения задач Machine Learning всегда нужно иметь, как минимум, две выборки — обучающую и тестовую. На обучающей мы будем обучать модель, а на тестовой проверять эффективность обученной модели. В PySpark сделать это очень просто, нужно просто вызвать метод randomSplit
, который разделит исходный датасет в заданной пропорции. Мы разделим в пропорции 80:20, в Python это выглядит так:
train, test = output.randomSplit([0.8, 0.2])
Теперь воспользуемся логистической регрессией (Logistic Regression) [1], которая есть в PySpark, в качестве алгоритма Machine learning. Для этого нужно указать признаки, на которых модель обучается, и признак, который нужно классифицировать. Мы преобразовали цену на дом (sale price) в вектор под названием features
, поэтому именно его и указываем в аргументе:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features',
labelCol='tax_class')
model = lr.fit(train)
Осталось только получить предсказания. Для этого вызывается метод transform
, который принимает тестовую выборку:
predictions = model.transform(test)
Проверим эффективность модели, используя метрику качества. И в этом случае PySpark нас выручает, поскольку у него есть класс BinaryClassificationEvaluator. Нужно лишь указать целевой признак (tax class), а затем вызвать метод evaluate
и передать в него наши предсказания. В Python это выглядит так:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='tax_class')
print('Evaluation:', evaluator.evaluate(predictions))
# Evaluation: 0.5242388483600111
Как видим, мы получили точность только 52%, что очень мало. Попробуем добавить ещё несколько признаков для обучения.
Добавление признаков
Векторизуем также год постройки (year_of_sale) и соседние регионы (neighborhood_id). Для этого нужно только в VectorAssembler указать выбранные признаки:
features = ['year_of_sale', 'sale_price', 'neighborhood_id']assembler = VectorAssembler(inputCols=features,
outputCol='features')
output = assembler.transform(sample)
Python-код для остальных шагов — разделение на тестовую и обучающую выборки, обучение и оценивание модели — остаётся все тем же. В итоге, мы смогли повысить точность до 60%:
print('Evaluation:', evaluator.evaluate(predictions))
# Evaluation: 0.6019972898385996
Отметим также, что при большем количестве классов в качестве метрики следует использовать MultilabelClassificationEvaluator, вместо BinaryClassificationEvaluator.
Feature engineering и кластерный анализ клиентов на PySpark