AlterNats — эффективный PubSub-клиент среды .NET. Как реализовать оптимизированное программирование сокетов в .NET 6
В прошлом месяце я выпустил новую библиотеку .NET PubSub для NATS, облачной высокопроизводительной системы обмена сообщениями с открытым исходным кодом.
AlterNats более чем в три раза быстрее официальных клиентов и в пять раз быстрее PubSub в StackExchange.Redis, а все обычные методы PubSub имеют нулевое распределение.
С PubSub легко работать на C# с помощью функции Redis PubSub. Есть проверенная библиотека StackExchange.Redis, а AWS, Azure и GCP предлагают управляемые сервисы, так что на первый взгляд все просто замечательно. Но я немного сомневался в том, что использование этого инструмента — правильный путь.
Redis — это в первую очередь KVS (хранилище “ключ-значение”), а PubSub — скорее бонусная функция, имеющая следующие особенности:
- отсутствие специализированного мониторинга;
- поддержка кластеризации;
- несбалансированное ценообразование на управляемые сервисы (PubSub не нужна память);
- высокая производительность.
Поскольку технология NATS специально разработана для PubSub, она располагает обширной системой для достижения высокой производительности, которая кажется просто идеальной. Единственным минусом является отсутствие управляемых сервисов, но если вы используете NATS как чистый PubSub-продукт, вам не нужно заботиться о процессе сохраняемости.
По моему мнению, NATS — один из самых простых промежуточных программных продуктов. NATS может обеспечить обмен сообщениями по крайней мере ровно один раз с помощью функции NATS JetStream, но для ее поддержки может потребоваться больший объем хранилища.
Однако официальный клиент nats.net имеет свои недостатки: он не поддерживает async
/await
, а его API сильно устарел.
Причина этого четко указана в документе ReadMe. Дело в том, что для удобства обслуживания клиент nats.net основан на той же кодовой базе, что и клиент Go. Поэтому там есть много мест, которые не типичны для C#, а поскольку Go и C# написаны совершенно по-разному в целях высокой производительности, эффективность клиента nats.net оставляет желать лучшего.
Учитывая эту ситуацию, мы разработали собственную версию клиента, полностью основанную на C#. Она поддерживает не все функции по сравнению с официальным клиентом (отсутствует поддержка JetStream и TLS, который, как ожидается, будет необходим для работы с Leaf Nodes). Тем не менее наша версия специально разработана для NATS Core PubSub, что позволяет достигать максимальной скорости. При использовании PubSub не должно быть недостатка в функциональности инструментария.
Введение в AlterNats
API AlterNats полностью поддерживает async
/await
и сохраняет нативный стиль C#.
// создание соединения (по умолчанию соединение с nats://localhost:4222)
await using var conn = new NatsConnection();
// для подписчика: ожидание регистрации на сервере NATS (не означает завершение ожидания)
var subscription = await conn.SubscribeAsync<Person>("foo", x =>
{
Console.WriteLine($"Received {x}");
});
// для издателя
await conn.PublishAsync("foo", new Person(30, "bar"));
// отписка
subscription.Dipose();
// ---
public record Person(int Age, string Name);
NatsOptions
/ConnectOptions
— неизменяемая запись, которая может использовать новое выражение C# with
.
// С помощью опций можно настраивать оператор with
var options = NatsOptions.Default with
{
Url = "nats://127.0.0.1:9999",
LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Information),
Serializer = new MessagePackNatsSerializer(),
ConnectOptions = ConnectOptions.Default with
{
Echo = true,
Username = "foo",
Password = "bar",
}
};
await using var conn = new NatsConnection(options);
NATS также предоставляет стандартный протокол для получения результатов. В некоторых случаях можно использовать его как простой RPC (удаленный вызов процедуры) между серверами.
// Сервер
await conn.SubscribeRequestAsync("foobar", (int x) => $"Hello {x}");
// Клиент(ответ: "Hello 100")
var response = await conn.RequestAsync<int, string>("foobar", 100);
Высокопроизводительное программирование сокетов
Использование лучшего API сокета
Socket — это класс, который может заниматься сетевой обработкой на низшем уровне в C#. Если нужна асинхронная высокопроизводительная обработка, вам следует применять обратные вызовы, повторно используя класс SocketAsyncEventArgs
.
Однако теперь существует простой в использовании метод async
/await
, который не требует сложных действий с SocketAsyncEventArgs
. Есть также множество методов с поддержкой async
, и нужно выбирать тот, что удовлетворяет ваши требования.
Простой способ определить это — использовать возвращенное значение ValueTask
.
// используйте это
public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken)
public ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken)
public ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken))
// НЕ используйте это
public Task ConnectAsync(string host, int port)
public Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
public Task<int> SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
API для возврата ValueTask
является внутренним классом AwaitableSocketAsyncEventArgs
, который используется в качестве содержимого ValueTask
. При ожидании происходит его внутренний возврат для эффективной асинхронной обработки без распределения Task
. Это существенное улучшение по сравнению с тем, как функционирует класс SocketAsyncEventArgs
, который очень сложно использовать. Настоятельно рекомендую воспользоваться AwaitableSocketAsyncEventArgs
.
Также обратите внимание, что синхронный API может получать Span, а асинхронный — только Memory (для удобства размещения состояния в куче). Это касается не только программирования сокетов, но является общим правилом для асинхронных API, и если система плохо организована в целом, невозможность использовать Span может стать существенным препятствием. Убедитесь в том, что сможете обойтись Memory.
Определение двоичного кода для текстовых протоколов
Протокол NATS — это текстовый протокол, подобный Redis и другим. С ним легко работать посредством обработки строк. Протокол можно без проблем реализовать с помощью StreamReader, поскольку вам потребуется лишь ReadLine. Однако, поскольку по сети передаются двоичные данные (UTF8), а обработка строк связана с затратами, то строки следует обрабатывать как двоичные данные с целью увеличения производительности.
NATS может определить тип входящего сообщения по ведущей строке (INFO, MSG, PING, +OK, -ERR и т. д.). Идея разделить обработку строки пробелами if (msg == “INFO”)
и т. д. кажется очевидной и простой, но подобные затраты неприемлемы с точки зрения производительности.
Поскольку INFO имеет значение [73, 78, 70, 79], для его определения можно воспользоваться Slice(0, 4).SequenceEqual
. SequenceEqual
для (ReadOnlySpan<byte>)
довольно хорошо оптимизирован (при внушительной длине данных используется SIMD). Следует заметить, что это не то же самое, что SequenceEqual
в проекте LINQ.
Если мы посмотрим на ситуацию внимательнее, то заметим, что все идентификаторы протокола, отправляемые сервером, находятся в пределах 4 символов. Другими словами, это состояние легко преобразовать в Int! Вот что представляет собой код подсева сообщений AlterNats:
// msg = ReadOnlySpan<byte>
if(Unsafe.ReadUnaligned<int>(ref MemoryMarshal.GetReference<byte>(msg)) == 1330007625) // INFO
{
}
Вероятно, это самый быстрый путь. За трехсимвольными инструкциями всегда следует пробел или перенос строки. Для их оценки используются следующие константы (вместе с самими символами).
internal static class ServerOpCodes
{
public const int Info = 1330007625; // Encoding.ASCII.GetBytes("INFO") |> MemoryMarshal.Read<int>
public const int Msg = 541545293; // Encoding.ASCII.GetBytes("MSG ") |> MemoryMarshal.Read<int>
public const int Ping = 1196312912; // Encoding.ASCII.GetBytes("PING") |> MemoryMarshal.Read<int>
public const int Pong = 1196314448; // Encoding.ASCII.GetBytes("PONG") |> MemoryMarshal.Read<int>
public const int Ok = 223039275; // Encoding.ASCII.GetBytes("+OK\r") |> MemoryMarshal.Read<int>
public const int Error = 1381123373; // Encoding.ASCII.GetBytes("-ERR") |> MemoryMarshal.Read<int>
}
Автоматическая конвейеризация
Записи и чтения в протоколе NATS выполняются конвейерно (пакетно). Это легко объяснить на примере конвейеризации в Redis. Например, если вы отправляете три сообщения, по одному за раз, и каждый раз ждете ответа, то множество циклов отправки и получения может стать серьезной проблемой.
При отправке сообщений AlterNats автоматически конвейеризирует их. Сообщения упаковываются один раз в очередь с помощью System.Threading.Channels, а цикл записи извлекает их все сразу и пакетно обрабатывает. После завершения передачи данных по сети цикл записи используется для достижения максимально быстрой обработки записи путем пакетной обработки сообщений, которые накопились в ожидании очередного завершения процесса передачи.
Речь идет не только об оптимизации времени прохождения сообщений (хотя в случае NATS стороны “издатель” и “подписчик” являются независимыми, поэтому ждать ответа не приходится), но и о высокой эффективности посредством сокращения количества последовательных системных вызовов.
Самый быстрый регистратор .NET — ZLogger — использует тот же подход.
Множество функциональных возможностей в одном объекте
Чтобы реализовать такой метод PublishAsync
, нужно поместить данные в объект write message
для канала очередей и хранить его в куче. Нам также нужен промис для асинхронного метода, который будет ожидать завершения записи.
await connection.PublishAsync(value);
Для успешной реализации подобного API упакуем все функции в один объект message
(внутреннее имя — Command
), который должен быть распределен.
class AsyncPublishCommand<T> :
ICommand,
IValueTaskSource,
IThreadPoolWorkItem,
IObjectPoolNode<AsyncPublishCommand<T>>
internal interface ICommand
{
void Write(ProtocolWriter writer);
}
internal interface IObjectPoolNode<T>
{
ref T? NextNode { get; }
}
Сам этот объект (AsyncPublishCommand<T>
) имеет роль (ICommand
) для хранения данных T и записи их в виде двоичных данных в сокет.
Кроме того, будучи IValueTaskSource
, этот объект сам становится ValueTask
.
Затем, в качестве обратного вызова во время await
, необходимо передать поток в ThreadPool
, чтобы не тормозить цикл записи. При использовании традиционной операции ThreadPool.QueueUserWorkItem(callback)
происходит дополнительное распределение, поскольку внутри создается ThreadPoolWorkItem
, который заносится в очередь. Реализовав IThreadPoolWorkItem
в .NET Core 3.0, можно избежать внутреннего создания ThreadPoolWorkItem
.
Наконец, у нас есть один объект для совместного распределения. Мы можем провести в отношении этого объекта операцию пулинга для достижения нулевого распределения. Пулинг объектов легко реализуется с помощью ConcurrentQueue<T>
или аналогичной функции. Посредством этой технологии можно избежать распределения массива, так как используется превращение в подобие узла в стеке. Использование стека может обеспечить безблокировочную реализацию, оптимизированную для такого использования кэша.
Архитектура нулевого копирования
Данные для издателя/подписчика обычно сериализуются типами C# в JSON, MessagePack и так далее. В этом случае мы неизбежно обмениваемся байтами. Так, содержимое RedisValue
в StackExchange.Redis
на самом деле представляет собой байты, и независимо от того, отправители мы или получатели, нам приходится генерировать и владеть байтами.
Чтобы избежать этого, есть хитрый обходной путь — перемещение байтов в пул ArrayPool
и обратно с целью достижения нулевого распределения. Тем не менее такой способ все равно влечет за собой затраты на копирование. Наша цель — нулевое распределение, но что если задействовать нулевое копирование?
Сериализатор AlterNats
требует IBufferWriter<byte>
для записи и ReadOnlySequence<byte>
для чтения.
public interface INatsSerializer
{
int Serialize<T>(ICountableBufferWriter bufferWriter, T? value);
T? Deserialize<T>(in ReadOnlySequence<byte> buffer);
}
public interface ICountableBufferWriter : IBufferWriter<byte>
{
int WrittenCount { get; }
}
// ---
// например, реализация для MessagePack для C#
public class MessagePackNatsSerializer : INatsSerializer
{
public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
{
var before = bufferWriter.WrittenCount;
MessagePackSerializer.Serialize(bufferWriter, value);
return bufferWriter.WrittenCount - before;
}
public T? Deserialize<T>(in ReadOnlySequence<byte> buffer)
{
return MessagePackSerializer.Deserialize<T>(buffer);
}
}
Метод Serialize
в System.Text.Json
, как и MessagePack
для C#, имеет перегрузку, которая позволяет принимать IBufferWriter<byte>
. Сериализатор напрямую обращается к буферу, предоставленному для записи в сокет через IBufferWriter<byte>
, и записывает в него данные, тем самым устраняя копирование байтов между сокетом и сериализатором.
На стороне чтения необходим класс ReadOnlySequence<byte>
, поскольку данные, полученные из сокета, часто фрагментированы.
Общим паттерном является обработка прочитанного с помощью PipeReader
из System.IO.Pipelines — библиотеки, разработанной для упрощения высокопроизводительного процесса ввода/вывода. Однако AlterNats не использует конвейеры, а задействует собственный механизм чтения и ReadOnlySequence<byte>
.
Методы Serialize
в System.Text.Json
и MessagePack
для C# обеспечивают перегрузку, которая позволяет принимать IBufferWriter<byte>
. Методы Deserialize
принимают ReadOnlySequence<byte>
. Другими словами, современные сериализаторы должны поддерживать IBufferWriter<byte>
и ReadOnlySequence<byte>
.