Rust как часть микросервисной архитектуры

Как использовать Rust в веб-приложении полного цикла? В этой статье расскажем об альтернативном способе включения Rust в приложение.

В целом можно сказать, что самые очевидные преимущества Rust  —  это скорость решения задач, интенсивно расходующих ресурсы процессора, и очень эффективная работа с памятью (причем без сборщика мусора).

Но есть небольшой недостаток  —  очень строгая модель владения. Впрочем, не такой уж это и недостаток: если ее придерживаться, получается очень стабильный и легкий в сопровождении код.

Однако бывают случаи, когда скорость разработки  —  важный фактор проекта. Тут в первую очередь желательно разделить приложение на несколько частей и для каждой использовать наиболее подходящий язык. Как это реализовать? Одно из решений  —  применение микросервисной архитектуры. Его и рассмотрим.

Для понимания содержания этой статьи необходимы базовые знания Rust и TypeScript.


Общая цель

В качестве простого примера реализуем вместе 3 микросервиса.

  1. main-server с общедоступным API и размещением небольшого клиента, построенного на Vue. Язык  —  TypeScript, и будем использовать популярный фреймворк NestJS.
  2. calc-engine  —  сервер Rust с методами для выполнения вычислений, интенсивно расходующих ресурсы процессора.
  3. rabbitmq  —  это брокер сообщений между двумя предыдущими сервисами, он поддерживается RabbitMQ.

Все это развернем в docker-compose  —  для совместного использования.

Опишем лишь самые важные части реализации. Весь код есть в репозитории  —  он только для образовательных целей.

Calc-engine (Rust)

В этой части  —  типичное бинарное приложение с применением cargo. Зависимости здесь такие:

amiquip = { version = "0.4.2", default-features = false }
serde_json = { version = "1.0.81" }
rayon = { version = "1.5.3" }
num_cpus = { version = "1.13.1" }
dotenv = { version = "0.15.0" }

Важная зависимость  —  amiquip, позволяющая взаимодействовать с RabbitMQ.

Чтобы создать приложение с как можно большим числом параллельных вычислений, применяется крейт rayon. Вот как с ним настраивается пул потоков:

fn setup_pool() -> rayon::ThreadPool {
rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.build()
.unwrap()
}

В задачах, интенсивно расходующих ресурсы процессора, запускать параллельно процессоры в добавок к имеющимся обычно не имеет смысла.

Подключаемся к RabbitMQ вот так:

fn setup_connection() -> Connection {
if let Ok(c) = Connection::insecure_open(&format!(
"amqp://{}:{}@{}:{}",
env::var("RABBITMQ_USER").unwrap(),
env::var("RABBITMQ_PWD").unwrap(),
env::var("RABBITMQ_HOST").unwrap(),
env::var("RABBITMQ_PORT").unwrap()
)) {
println!("Connected to rabbitmq!");
c
} else {
println!("Failed to connect to rabbitmq. Will retry in 2s.");
std::thread::sleep(std::time::Duration::from_secs(2));
setup_connection()
}
}

С ошибками подключения справляемся, принимая определенные значения из файла .env, а также переподключаясь каждые две секунды (2s). К настройкам микросервисов этот подход особенно применим, ведь каждый сервис после временного сбоя других сервисов должен восстанавливаться.

Стилем взаимодействия между сервисами будет RPC (remote-procedure-call, то есть «удаленный вызов процедур»). Такой выбор не всегда лучший, но для наших целей подходит.

Задачей, интенсивно расходующей ресурсы процессора, будет вычисление шагов решения головоломки «Ханойская башня». Отсюда этот «hanoi» в коде ниже:

fn setup_hanoi_queue(pool: &rayon::ThreadPool) {
let mut connection = setup_connection();
let channel = connection.open_channel(None).unwrap();
let queue = channel
.queue_declare("hanoi", QueueDeclareOptions::default())
.unwrap();
let consumer = queue.consume(ConsumerOptions::default()).unwrap();
for message in consumer.receiver().iter() {
match message {
ConsumerMessage::Delivery(delivery) => {
let body = String::from_utf8(delivery.body.clone()).unwrap();
let (reply_to, correlation_id) = match (
delivery.properties.reply_to(),
delivery.properties.correlation_id(),
) {
(Some(r), Some(c)) => (r.clone(), c.clone()),
_ => {
println!("received delivery without reply_to or correlation_id");
continue;
}
};
let channel_for_msg = connection.open_channel(None).unwrap();
pool.spawn(move || {
let exchange = Exchange::direct(&channel_for_msg);
exchange
.publish(Publish::with_properties(
json!(hanoi::hanoi(body.parse().unwrap(), 0, 2))
.to_string()
.as_bytes(),
reply_to,
AmqpProperties::default().with_correlation_id(correlation_id),
))
.unwrap();
});
consumer.ack(delivery).unwrap();
}
other => {
println!("Consumer ended: {:?}", other);
println!("Will try to reset connection in 2s.");
std::thread::sleep(std::time::Duration::from_secs(2));
break;
}
}
}
setup_hanoi_queue(pool);
}

Здесь первым делом убеждаемся, что в RabbitMQ есть очередь с именем hanoi: открываем канал channel поверх соединения connection, а затем просто объявляем очередь queue для создания/потребления сообщений в сервисах.

Чтобы прослушать добавленное в queue сообщение, вызываем queue.receiver(). Это аналогично прослушиванию во внутренних каналах Rust, и код внутри блока выполняется сообщение за сообщением.

Каждое новое сообщение message должно сопоставляться с его типом. То есть, по сути, это либо ConsumerMessage::Delivery, либо ошибка подключения  —  с тем и другим разбираемся в other.

Кроме тела body, у сообщения есть два поля: reply_to и correlation_idbody  —  это фактическое содержимое (количество элементов в игре «Ханойская башня»), reply_to  —  «эксклюзивная» очередь для возвращения результата, а correlation_id применяется для сопоставления запроса с ответом.

Клиент, отправляющий сообщение в очередь hanoi, прикрепляет уникальное correlation_id. Это позволяет потом выбрать результат, записанный из сервера в очередь reply_to.

Чтобы обрабатывать параллельно как можно больше задач, расходующих ресурсы процессора, просто помещаем текущий запрос в замыкании в пул потоков. В каждом таком замыкании содержится логика для возвращения результата через очередь reply_to.

Отправка идет с помощью экземпляра Exchange::direct для канала channel_for_msg. А в other каждые две секунды (2s) просто переустанавливаются конструкции этого слушателя, благодаря чему ошибки подключения или перезапуски RabbitMQ не сказываются на сервисе.


Main server (TypeScript)

Прежде всего, Main server  —  это общедоступный API, позволяющий клиентам запрашивать шаги решения для игры «Ханойская башня»:

@Get('/hanoi')
getHello(@Query('n', ParseIntPipe) n: number): Promise<string> {
...
return this.appService.makeHanoi(n);
}

Не знакомы с NestJS? Не проблема: это самый легкий в освоении веб-фреймворк, основные его концепции есть в этом простом обзоре.

В конечной точке метод makeHanoi делегируется и в итоге оказывается в методе MessageService.sendMessage. В этом сервисе содержится логика подключения к RabbitMQ, но теперь со стороны main-server:

async sendMessage(n: number): Promise<string> {
const channel = await this.ensureChannel();
const replyTo = await this.ensureResponseQueue(channel);
const correlationId = this.generateUuid();
channel.sendToQueue(
this.HANOI_QUEUE,
Buffer.from(`${n}`),
{
correlationId,
replyTo
});
return lastValueFrom(
this.queueResponse.pipe(
filter(
m => m?.properties.correlationId === correlationId
),
first(),
map(m => m.content.toString())
)
);
}

Наши параметры сообщений  —  replyTo и correlationId  —  тут тоже есть. Мы на стороне создателя, поэтому эти значения здесь же создаются и прикрепляются к сообщению, отправляемому в очередь hanoi.

После отправки регистрируем одноразового слушателя в очереди, используемой в качестве replyTo. Если подробнее  —  эта последняя очередь потребляется внутренне для выдачи значений следующему BehaviorSubject:

private queueResponse = new BehaviorSubject<Message>(null);

В этом коде при первом соответствующем correlationId регистрируется подписка и с помощью оператора lastValueFrom преобразуется в Promise.

В основе здесь  —  популярная библиотека реактивного программирования rxjs.

Опуская подробности, кратко рассмотрим реализацию ensureChannel:

private async ensureChannel(): Promise<Channel> {
return this.channel ?? this.creatingChannel.value ?
lastValueFrom(this.creatingChannel.pipe(filter(v => !v), first(), map(() => this.channel))) :
new Promise((res, rej) => {
this.creatingChannel.next(true);
const [host, user, pwd, port] = [
this.configService.get<string>('RABBITMQ_HOST'),
this.configService.get<string>('RABBITMQ_USER'),
this.configService.get<string>('RABBITMQ_PWD'),
this.configService.get<string>('RABBITMQ_PORT')
];
connect(`amqp://${user}:${pwd}@${host}:${port}`, (error0, connection) => {
if (error0) {
rej(error0);
} else {
connection.on('close', () => {
this.channel = null;
this.responseQueue = null;
});
connection.createChannel((error1, channel) => {
if (error1) {
rej(error1);
} else {
channel.assertQueue(this.HANOI_QUEUE, {
durable: false
});
this.channel = channel;
res(this.channel);
}
});
}
this.creatingChannel.next(false);
});
});
}

Главная часть этого кода проста: в connect(...) устанавливается подключение connection к RabbitMQ, в connection.createChannel используется соединение для создания канала channel, с помощью которого подтверждается, что очередь hanoi существует.

Такое нагромождение необходимо для корректной работы этого метода, выполняемого в асинхронном контексте. Установленный канал channel сохраняется в локальном поле  —  так его не нужно создавать при каждой отправке сообщения.


Клиент

Кроме того, в main server размещается небольшое клиентское приложение с поддержкой Vue.js. Весь код для реализации есть в репозитории. Его цель  —  следующий простой пользовательский интерфейс:

Docker-compose

Сборка компонентов идет в контейнере docker-compose, где все они объединяются. Вот содержимое соответствующего docker-compose.yml:

version: "3"
services:
  main-server:
    build: ./main-server
    env_file: .env
    environment:
      - RABBITMQ_HOST=rabbitmq
    ports:
      - "3000:3000"

calc-engine:
    build: ./calc-engine
    env_file: .env
    environment:
      - RABBITMQ_HOST=rabbitmq

rabbitmq:
    image: rabbitmq:3-management

Если клонировать репозиторий и установить докер на компьютере, код можно запустить с терминала следующей командой (она в корневой папке репозитория):

> docker-compose up

После вводим в браузере http://localhost:3000, и пользовательский интерфейс перед нами.


Заключение

Это не единственный подход к разделению приложения на несколько единиц компиляции. В контексте Node.js еще есть перспективный NAPI-RS, позволяющий избежать накладных расходов на сообщения.

С другой стороны, помещая разные части приложения в отдельный сервер (микросервер), получаем преимущество дифференцированной масштабируемости  —  каждая часть может быть масштабирована или сделана с учетом особенностей ее запуска в кластере.

https://t.me/rust_code

источник

Ответить