Backpressure в Spring WebFlux (Java Reactor)

Оригинальные источники: StackOverflow, Medium [Jay Phelps], StackOverflow

Backpressure (back pressure) – это то, с чем рано или поздно сталкивается почти любой разработчик, а для некоторых это становится частой и серьезной проблемой.

В мире программирования термин “backpressure” является аналогией, позаимствованной из физики. В двух словах, backpressure – это сопротивление или некоторая сила, действующая в направлении, противоположном желаемому направлению движению частиц в трубе. В контексте программирования эту фразу можно переиначить: …это сопротивление желаемому потоку данных приложения.

Цель любого приложения – это получение входных данных и приведение их к некоторому желаемому виду. Когда возникает backpressure, между входом и выходом возникает некое сопротивление. В большинстве случаев, узкое место возникает из-за ограничений в вычислительных мощностях – данные не успевают обрабатываться с всё нарастающей скоростью их поступления. Но есть и другие формы backpressure, например, когда приложение ожидает реакции пользователя на некоторое событие, которая, по понятным причинам, может запаздывать.

Иногда под backpressure понимается не само явление как таковое, а конкретный механизм его обработки.

Пример чтения и записи файлов

Запись файла процесс более медленный, чем чтение. Если комбинация ОС, жесткого диска и библиотек обеспечивает эффективную скорость чтения в 150 мб/с, а записи – в 100 мб/с, то при чтении файла с последующей его записью обратно на диск, вы должны записывать в буфер “лишние” 50 мб каждую секунду.

Решение очевидно: читать ровно столько, сколько сможете записать. Практически все библиотеки имеют соответствующие абстракции для работы с этими случаями.

Взаимодействие между серверами

Микросервисная архитектура, где каждый микросервис может разворачиваться на отдельном сервере, пользуется все большей популярностью. Backpressure в этом случае возникает, когда один сервер отправляет запросы другому слишком быстро и второй сервер не успевает их обрабатывать.

Предположим, что Сервер A отправляет Серверу B 100 запросов в секунду, но Сервер B может обрабатывать только 75 из них. Имеем дефицит в 25 запросов в секунду. В любом случае, Сервер B должен каким-то образом работать с backpressure. Один из вариантов – это буферизация избыточных запросов, но если они будут сыпаться с прежней скоростью, однажды свободная память сервера закончится. Можно игнорировать избыточные запросы, но во многих случаях это запрещается требованиями к системе.

Идеальный случай, когда Сервер B может сам контролировать поток запросов от Сервера А, но и это не всегда возможно. Например, если Сервер А генерирует запросы от имени пользователя. Мы же не можем сказать пользователям “притормозите!” (хотя иногда стОит!).

Стратегии backpressure

Так как же работать с backpressure? Если опустить вариант увеличения производительности серверного железа, остаётся всего три опции:

  • Контролировать источник (увеличивать частоту или снижать – решает приёмник)
  • Буферизация (временно хранить запросы, которые не были обработаны “на лету”)
  • Игнорировать (пропускать обработку части запросов)

Контроль источника – безусловно лучший вариант. Если говорить о случаях реального применения, то единственный возможный оверхэд – реализация самого механизма контроля.

К сожалению этот вариант не всегда реализуем. Например случай с валом пользовательских запросов самый очевидный, ведь не так-то просто договориться с пользователями!

Буферизация обычно выбирается в качестве следующего пути решения проблемы. Но надо держать в уме, что неограниченная буферизация опасна, так как приводит к утечкам памяти с последующим падением сервера. Зачастую лучше начать игнорировать запросы, чем продолжать складывать их в буфер, отнимая остатки памяти у сервера.

Игнорирование входящих запросов это последняя стратегия, которая часто сочетается с буферизацией. Обычно “сбрасывают” какую-то часть запросов ежесекундно, чтобы увеличить время существования буфера.

Укрощаем backpressure

Существует ряд паттернов, которые чаще всего используются при работе с backpressure вне зависимости от используемого языка программирования.

Pull

С использованием pull-стримов потребитель контролирует источник. Обычно это вариант работы 1 к 1, то есть запрос – ответ, но есть паттерны и для request(n) (например Flowables в RxJava).

Push

В push-стримах источник направляет сообщение в сторону потребителя если последний доступен и готов обработать запрос. Обычно такие стримы используются при обработке действий пользователя. Существует множество библиотек, самая популярная из которых – RxJava.

А что там в WebFlux?

WebFlux использует спецификацию Reactive Streams для построения неблокирующих приложений. Для их масштабирования достаточно небольшого количества потоков, в отличие от классического подхода, например сервлетов, где на каждый HTTP-запрос выделяется отдельный поток. Если в ходе запроса выполняется вызов в базу данных или другая продолжительная процедура, выделенный поток остаётся заблокированным.

В реактивных приложениях запрос рассматривается как событие, обрабатываемое заданным пулом потоков. Если текущий запрос зависит от внешнего сервиса, то поток-обработчик не ожидает результат, а переходит к обработке других запросов. Когда внешний сервис (а он в данном случае обязан поддерживать асинхронные запросы) возвращает ответ, возникает новое событие, которое будет обработано свободным потоком из пула, а результат – передан исходному HTTP-запросу.

Немного сетевой теории

Для понимания механизма работы backpressure в фреймворке WebFlux нужно вспомнить, какой транспортный протокол используется при взаимодействии по умолчанию. Обычно серверы общаются между собой через TCP-соединения. TCP скорее взаимодействует с байтами, чем с элементами логики приложения. Обычно, когда говорят о backpressure, подразумевают контроль количества принятых или отправленных по сети логических объектов и, хотя TCP имеет собственные средства для контроля потоков, они всё же работают для байтов, а не для объектов.

TCP Flow control

Flow control (далее – контроль потока) означает, что TCP гарантирует, что отправитель не подавляет получателя , отправляя пакеты быстрее, чем тот может их обработать. Это и есть backpressure в контексте транспортного уровня модели OSI. Идея в том, что получатель даёт отправителю обратную связь с данными о своём текущем статусе.

Что происходит, когда мы отправляем данные по сети? Отправитель записывает данные в сокет, транспортный уровень (в нашем случае TCP) упаковывает их в сегмент и передаёт их в сетевой уровень (IP), который каким-то путём доставляет пакет получателю.

На принимающей стороне сетевой уровень доставляет эти данные до уровня TCP, что делает их доступными для получателя. TCP хранит данные для отправки в общем буфере отправки, а данные для получения – в общем буфере получения. Когда приложение готово, оно читает данные из буфера получения.

Контроль потока гарантирует, что мы не отправляем новых пакетов, если буфер получения уже полон, так как получатель все равно не сможет обработать эти запросы и будет вынужден сбросить (проигнорировать) их. Для управления объемом данных для отправки через TCP, получатель объявляет своё окно (receive window), то есть объем свободного места в буфере получения.

При получении нового пакета TCP отправляет ack сообщение отправителю, подтверждающее, что пакет получен корректно. Вместе с ack сообщением передаётся и размер текущего окна получения, поэтому отправитель знает, что он может или не может отправлять новые данные.

Вернёмся к WebFlux

В текущей реализации WebFlux backpressure регулируется транспортным протоколом, но не отражает реального состояния получателя.

Диаграмма выше отражает взаимодействие между двумя микросервисами, где левый отправляет потоки данных, а правый – их принимает.

  1. WebFlux берёт на себя работу по преобразованию объектов в байты и обратно для дальнейшей передачи через TCP.
  2. Начало длительной обработки элемента, которая по окончании запрашивает следующий элемент.
  3. В этой точке WebFlux удерживает байты, приходящие по сети, без отправки ack, так как бизнес-логика принимающего микросервиса занята обработкой предыдущего запроса.
  4. Из-за природы контроля потока TCP, левый микросервис может по-прежнему отправлять данные в сеть.

Как видно из диаграммы, demand получателя отличается от demand отправителя (demand измеряем в логических объектах). Это значит, что их demand изолирован и работает только в случае взаимодействия WebFlux с бизнес-уровнем приложения. То есть контроль backpressure в WebFlux не так справедлив, как бы мы хотели.

А если backpressure, но для HTTP-запросов?

Информация о backpressure не отправляется по протоколу HTTP (он просто-напросто не поддерживает такой функционал). Поэтому всё снова упирается в TCP flow control и буферы отправки и получения. Demand’ы отправки/получения в Reactive Streams также опираются на TCP flow control.

В Spring MVC подразумевается, что приложения могут блокировать текущий поток, например для выполнения удалённых вызовов в другие системы. Поэтому контейнеры сервлетов используют относительно большой пул потоков для сглаживания последствий блокировок.

WebFlux и другие неблокирующие серверы предполагают запуск неблокирующих приложений, поэтому используют небольшой пул потоков фиксированного размера (так называемые event loop workers).

Модель потоков Spring WebFlux

На WebFlux сервере можно увидеть следующие потоки:

  1. На оригинальном сервере Spring WebFlux существует один поток для работы сервера и несколько потоков для обработки запросов (обычно их количество равно числу ядер в процессоре).
  2. Реактивный WebClient работает в режиме event loop.
  3. Reactor обеспечивает абстракции пула потоков, которые называются scheduler’ами. Для переключения обработки на другой пул используется метод publishOn(). Планировщики (scheduler’ы) названы в соответствии со стратегией работы, например parallel для CPU-ориентированной параллельной обработки с ограниченным количеством потоков, elastic для I/O – с большим количеством потоков.

Под капотом WebFlux трудится Reactor-Netty – обёртка над Netty с поддержкой backpressure. Reactor-Netty создаёт Runtime.getRuntime().availableProcessors() * 2 потоков для использования во внутреннем EventLoopThreadPool. Если входящий запрос завершается блокирующим удалённым вызовом, то его нужно соответствующим образом обернуть, чтобы текущий поток мог вернуться в event-queue пул для обработки новых запросов.

Для тяжёлых вычислительных операций имеет смысл использовать выделенный пул потоков. Это гарантирует, что EventLoop Netty будет только принимать и отправлять данные по сети, не тратя ресурсы на другие операции:

@PostMapping
public Mono<CpuIntensiveResult> cpuIntensiveProcessingHandler(
    Mono<CpuIntensiveInput> monoInput
) {
    return monoInput
        .publishOn(Schedulers.fromExecutorService(myOwnDedicatedExecutor))
        .map(i -> doCpuIntensiveInImperativeStyle(i));
}

Если интересен ручной контроль количества запросов, можно посмотреть в сторону метода limiRate(int n)