[스프링] 스프링 웹플럭스

리액티브 프로그래밍

리액티브 프로그래밍(reactive programming)이란 논블로킹 역압(non-blocking backpressure)을 통해 비동기적으로 데이터 스트림(data stream)을 처리하는 프로그램 작성 패러다임이다. 필요한 데이터를 요청한 후 데이터가 준비되면(사용 가능한 상태가 되면) 처리하는, 비동기적 데이터 처리 방식을 사용한다. 이때 데이터 처리 수요 조절을 위한 역압(backpressure)이라는 개념이 적용된다. 리액티브 프로그래밍을 데이터 스트림을 시간 경과에 따른 값(시간이 지남에 따라 전달되는 값)을 처리하는 것으로 표현하기도 한다. 이러한 데이터 스트림을 비동기 데이터 스트림(asynchronous data stream)이라고 한다. 기존 명령형 프로그래밍은 특정 시점에 완성된 데이터를 동기적으로 사용하는 반면 리액티브 프로그래밍에서는 데이터를 시간을 두고 받으면서 데이터 스트림을 통해 전달되는 데이터를 비동기적으로 사용한다. 여기서 비동기적이라는 의미는 요청한 데이터가 모두 준비될 때까지 기다리지 않는 것을 의미한다.

리액티브 프로그래밍은 비동기 데이터 스트림을 다루는 것이다. 데이터 스트림이란 시간 순으로 발생 및 처리되는 일련의 데이터를 의미한다. 데이터 스트림을 데이터 항목 스트림(data element stream) 또는 이벤트 스트림(event stream)이라고도 한다. 여기서 데이터는 단순 값 뿐만 아니라 버튼 클릭과 같은 사용자 입력 이벤트, 메시지, 메서드 호출, 캐시 데이터, 데이터 구조, 이벤트 실패 등 발생 및 변화 가능한 모든 것들을 의미하며 이를 사용하여 데이터 스트림을 구성할 수 있다. 데이터가 연산 처리를 위해 데이터 스트림의 밖으로 전달되는 것을 데이터 스트림으로부터 데이터 항목이 방출(emit)된다고 표현한다. 데이터를 방출하는 주체를 발행자(publisher)라고 한다. 방출된 데이터 항목은 연산자(operator)에 의해 변환 및 필터링되거나 조작된 후 새로운 데이터 스트림의 데이터 항목이 되며, 최종적으로 데이터를 요청한 구독자(subscriber)에게 전달된다. 발행자는 데이터 항목을 구성하며 구독자는 발행자에 의해 구성된 데이터 스트림으로부터 데이터 항목을 전달 받는 것이다. 이렇게 다양한 종류의 데이터를 전달하는 데이터 스트림을 구성하고 나면 방출된 데이터 스트림 데이터 항목들의 발생 및 변화 이벤트에 반응(react)하는 것이 필요하며 이를 디자인 패턴의 옵저버 패턴(observer pattern)을 사용하여 구현한다.

리액티브 프로그래밍에서 데이터 스트림은 시그널(신호) 전파(signal propagation)라는 특성도 가지고 있다. 시그널(signal)이란 데이터 스트림의 데이터 흐름과 관련된 상태를 표현하기 위해 사용하는 개념이다. 한 데이터 스트림이 시그널을 다른 데이터 스트림으로 전파할 수 있고 데이터를 처리하는 연산자에게 전파할 수도 있다. 시그널을 하위 데이터 스트림인 다운스트림(downsteram)으로 전파하여 구독자(subscriber)가 특정 시그널에 대한 처리를 수행할 수 있으며 상위 스트림인 업스트림(upstream)으로 전파하여 발행자(publisher)가 특정 시그널에 대한 처리를 수행할 수 있다. 업스트림으로의 시그널 전파는 역압을 통한 수요 조절 기능 구현에 사용된다. 시그널의 종류는 다음과 같다.

  1. onNext(): 데이터 스트림으로부터 데이터 항목을 방출하는데 사용되는 시그널이다. 방출되는 데이터 항목이 인자로 전달된다. doOnNext() 연산자는 onNext() 시그널을 받으면 수행할 콜백을 실행한다.
  2. onError(): 데이터 스트림에 에러가 발생했음을 나타내는데 사용되는 시그널이다. 에러(발생 예외) 객체가 인자로 전달된다. doOnError() 연산자는 onError() 시그널을 받으면 수행할 콜백을 실행한다.
  3. onComplete(): 데이터 스트림이 완료되었음을 나타내는데 사용되는 시그널이다. 인자는 사용되지 않는다. doOnComplete() 연산자는 onComplete() 시그널을 받으면 수행할 콜백을 실행한다.


리액티브 프로그래밍은 다양한 소스에서 들어오는 데이터 스트림을 비동기적으로 처리하여 사용자에게 높은 응답성을 제공한다. 비동기적 데이터 스트림 처리라는 것은 메인 애플리케이션을 실행하는 메인 스레드가 데이터 스트림 처리 작업을 요청한 후 데이터 준비 작업이 끝날 때까지 기다리지 않는 것을 의미한다. 원하는 데이터가 준비되었을 때까지 기다린 후 동기적으로 그다음 작업을 수행하는 코드를 작성하는 명령형 또는 순차적 프로그래밍 방식을 사용하는 대신, 비동기적으로 전체 데이터 중 일부 데이터로 처리할 수 있는 작업들을 수행하도록 정의한다. 데이터 스트림으로 전달되는 데이터 항목에 대한 변환 및 가공, 그리고 여러 데이터 스트림의 병합 및 새로운 데이터 스트림 생성과 같은 처리는 리액티브 프로그래밍의 연산자들을 사용하여 수행할 수 있다. 제한된 리소스 내에서 성능 저하를 방지하기 위해 이러한 연산 또한 스레드 논블로킹, 비동기 방식으로 수행될 수 있어야 한다.

비동기적 데이터 스트림 처리는 데이터가 언제 생성되는지 예측하기 힘든 상황에 적합하다는 특성도 있다. 원하는 데이터가 언제 준비되어 전달될지 모르는 상황이라면 무작정 요청한 데이터 조회 작업의 완료를 기다리는 것보다 비동기, 논블로킹, 그리고 이벤트 기반(event-driven) 방식으로 데이터를 리액티브하게 처리하는 것이 시스템의 리소스 측면에서 효율이 높다는 장점이 있다. 이러한 이유로 리액티브 선언문링크에 기재된, 리액티브 시스템이 가져야하는 특성들은 바로 리액티브 프로그래밍의 특성과 관련이 있는 것을 알 수 있다. 하나의 애플리케이션에 리액티브 특성을 도입하는 것과 여러 애플리케이션(서비스)로 구성된 시스템에 리액티브 특성을 도입하는 것은 구체적인 구현 방법의 차이일 뿐 개념적으로 동일하다.

시스템의 제한된 리소스인 스레드를 동시적(concurrently)으로 사용하여 작업들을 동시 처리하기 위해서도 작업의 비동기 처리 및 스레드 논블로킹 처리가 필요하다. 작업이 끝날 때까지 기다리는 동기적 방식으로 인해 메인 애플리케이션 실행이 중단되면 문제가 될 수 있으므로 요청한 작업이 끝날 때까지 기다리지 않도록 하기 위해 해당 스레드를 블로킹하지 않도록 하여 다른 작업을 수행할 수 있게 만든다. 스레드 논블로킹은 데이터 스트림을 처리하는 스레드의 리소스 효율성과 관련이 있다. 만약 애플리케이션이 제한된 수의 스레드를 사용하여 동작한다면 스레드가 하나의 데이터 스트림을 처리하는 도중 블로킹되는 경우 다른 데이터 스트림을 처리할 수 없는 상황이 된다. 따라서 데이터 스트림을 처리하는 스레드는 블로킹되지 않고 다른 데이터 스트림 처리 작업을 수행할 수 있어야 한다.

데이터 스트림 처리 작업이 블로킹 IO 작업을 수반하는 경우 별도의 스레드로 처리하는 것도 중요하다. 특히 이벤트 루프 모델을 사용하는 서버 애플리케이션 환경에서는 더욱 그러하다. 파일 시스템이나 데이터베이스로부터 입출력을 수행하는 요청 작업이 데이터 스트림을 처리하는 스레드를 블로킹시킨다면 위에서 설명한 것과 같이 스레드의 데이터 스트림 처리가 효율적으로 이루어지지 못한다. 블로킹 IO와 관련 작업에 한해 별도의 스레드를 할당하여 해당 작업을 수행하도록 함으로써 스레드 블로킹 없이 작업을 수행해야 하는 데이터 스트림 처리 스레드가 방해 받지 않도록 할 수 있다. RxJava, Akka 같은 리액티브 프레임워크는 별도로 지정된 스레드 풀에서 블로킹 IO 작업을 실행하도록 한다. 하지만 이 방법은 내부 스레드풀을 사용하는 것이다. 위 내용은 블로킹 CPU 작업에 대해서도 동일하다.

스레드는 제한된 값비싼 리소스이므로 효율적으로 사용해야 한다. 스레드 논블로킹 방식과 하나의 작업이 완료되지 않더라도 그 다음 작업을 수행할 수 있는 비동기 방식으로 동작하도록 하여 적은 수의 스레드로 많은 작업을 수행할 수 있으며 이에 따라 애플리케이션의 확장성과 반응성을 높일 수 있다. 리액티브 프로그래밍을 통한 동시 처리 또는 병렬 처리는 직렬화 포인트를 감소시킴으로써 CPU를 효율적으로 사용하고 작업 처리 속도를 높인다. 하지만 제한된 스레드로 많은 요청을 처리하기 위한 과정에서 여러가지 오버헤드가 발생할 수 있기 때문에 이러한 동작이 항상 빠른 것은 아니며 작업량에 따라 애플리케이션의 적절한 스레드 사용 전략을 세워야 한다. CPU의 물리적 코어 수보다 많은 수의 스레드를 사용하면 CPU 컨텍스트 스위칭(context switching)으로 인한 오버헤드를 발생하게 되고 리소스 효율이 떨어지는 한계가 존재한다.

리액티브 프로그래밍이 다루는 데이터 스트림의 특징은 데이터를 요청한 쪽(구독자)이 데이터를 제공하는 쪽(발행자)에게 요청한 데이터가 모두 준비될 때까지 기다리는 것이 아니라(모두 준비된 후에 받는 것이 아니라), 전체 데이터 중 하나라도 준비되면 받기 시작한다는 것이다. 전체 데이터가 언제 준비되는지는 중요하지 않으며 데이터의 일부 또는 전체가 준비되었을 때 어떤 처리가 이루어져야 하는지가 중요하다.

리액티브 프로그래밍의 또다른 주요 특징은 역압(또는 배압)이다. 역압이란 데이터를 받는 구독자가 자신이 처리할 수 있는 만큼의 데이터를 데이터를 제공하는 발행자에게 요청하여 데이터 전송량을 조절하는 기법 및 메커니즘을 의미한다. 즉, 데이터를 전달하는 데이터 스트림에 역압 개념을 적용하여 데이터 전달량을 조절할 수 있으며 이를 통해 수요 조절 기반의 유연성(또는 탄력성)(elastic)있는 애플리케이션 구성이 가능하다.

데이터 스트림을 비동기적으로 처리하기 위한 다양한 인터페이스, 메서드 및 프로토콜이 존재하며 이를 사용하여 하나 이상의 데이터 스트림들을 대상으로 다음과 같은 처리를 할 수 있다.

  1. 하나의 스트림 또는 여러 스트림이 포함하는 항목을 입력으로 사용하여 새로운 스트림을 만든다.
  2. 서로 다른 두 스트림을 하나로 합친다.
  3. 하나의 스트림을 필터링하여 새로운 스트림을 만든다.
  4. 하나의 스트림이 포함하는 항목을 특정 항목으로 매핑하여 새로운 항목을 포함하는 스트림을 만든다.


데이터 스트림에 대한 위와 같은 처리를 하고 나면 데이터 전달을 위해 구독(subscription)이라는 또다른 과정이 반드시 필요하다. 데이터 스트림을 처리할 때 옵저버는 어떤 작업을 콜백으로 수행한다. 이 콜백 메서드는 반드시 데이터 스트림에 대한 구독이 일어나야 호출된다. 구독을 하지 않으면 아무 일도 일어나지 않는다. 리액티브 연산자가 데이터 스트림으로 데이터가 전달될 때 어떤 처리를 할지(어떤 콜백 함수가 실행되어야 할지)를 정의하는 것이라면 구독은 데이터를 요청하는 구독자가 데이터를 제공하는 발행자에게 데이터를 요청하기 시작하는 과정이라고 볼 수 있다. 이러한 리액티브 프로그래밍의 이러한 구독 과정은 지연(lazy) 방식이다. 구독을 통해 원하는 데이터를 받는 요청(pull)을 하기 전까지 아무 일도 일어나지 않는다.

리액티브 프로그래밍은 특성을 정리하면 다음과 같다.

  • 비동기 및 스레드 논블로킹 프로그래밍: 데이터 스트림을 비동기 및 스레드 논블로킹으로 처리하여 시스템 리소스 사용률과 효율성을 극대화한다.
  • 함수형 프로그래밍: 참조 투명성을 갖는 함수를 사용하여 애플리케이션의 부작용을 없앤다. 데이터를 처리하는 로직에 데이터를 전달하는 대신 데이터에 로직을 전달한다.
  • 보다 용이한 동시성 구현: 동기 블록, 경쟁 조건, 데드락 같은 저수준의 멀티스레드 문제를 직접 처리할 필요가 없다. 동시성을 위한 스레드 사용 패턴이 추상화되어 코드 구현의 복잡성이 줄어든다.
  • 역압 및 수요 조절: 발행 및 구독 관계를 이용한 역압 적용을 통해 데이터 스트림 처리에 대한 수요 조절이 가능하다.


옵저버 패턴

리액티브 프로그래밍은 옵저버(observer) 디자인 패턴과 관련이 있다. 옵저버 패턴이란 객체의 상태 변화를 관찰하는 옵저버(관찰자)를 관찰 대상 객체에 등록하고, 객체의 상태 변화가 발생하면 등록된 옵저버에게 통지(notify)하는 디자인 패턴이다. 옵저버를 리스너(listener)라고도 하며 옵저버의 관찰 대상 객체를 서브젝트(subject) 또는 관찰 가능 대상(observable)라고도 한다. 옵저버의 관찰 대상 객체는 상태 변화 시 이벤트를 발생시키며 옵저버는 이 이벤트를 받아 처리한다. 옵저버는 관찰 대상 객체의 이벤트가 발생하면 어떤 일을 처리할지 정의된 콜백(callback)을 전달 받고 이를 실행시킨다. 리액티브 프로그래밍의 주 관심사인 데이터 스트림 자체가 서브젝트(관찰 가능 대상)이며 데이터 스트림 항목들의 발생 및 변화 이벤트에 반응(react)하여 콜백을 실행하는 것이 바로 옵저버이다. 정리하자면 데이터 스트림은 관찰 가능한 대상이며 이 대상에 어떤 변화가 발생하였을 때 특정 코드인 콜백을 실행할 수 있다. 옵저버 패턴을 발행-구독(publish-subsribe) 패턴이라고도 한다.

데이터 스트림을 대상으로 옵저버가 실행할 콜백의 종류는 다음과 같다.

  • 데이터가 하나라도 준비되었을 때 어떤 처리를 할 것인가
  • 데이터를 준비하는 과정에서 에러가 발생하였을 때 어떻게 처리할 것인가
  • 데이터가 모두 준비되었을 때 어떻게 처리할 것인가


데이터 스트림을 통해 데이터가 처리되는 과정은 다음과 같다.

  1. 구독자가 발행자에게 데이터를 요청하면 데이터를 전달할 데이터 스트림이 구성된다. 데이터 스트림 안에 포함되어 전달될 데이터는 언제 모두 준비될지 알 수 없다.
  2. 요청한 데이터가 데이터 소스로부터 제공되어 데이터 스트림을 통해 하나씩 전달되기 시작한다. 데이터가 하나라도 준비되었을 때(데이터 소스가 데이터 항목을 방출할 때) 수행될 반응(콜백)이 옵저버에 의해 실행된다.
  3. 데이터를 준비하는 과정에서 에러가 발생하였다면 이때 수행될 반응(콜백)이 옵저버에 의해 실행된다.
  4. 요청한 모든 데이터 준비가 완료되면 이때 수행될 반응(콜백)이 옵저버에 의해 실행된다.


위 과정에서 스레드는 논블로킹으로 동작하기 때문에, 데이터가 모두 준비될 때까지 데이터 스트림 처리 스레드는 다른 작업을 수행하지 못 한 채 계속 기다리지 않는다. 따라서 다른 데이터 스트림을 처리할 수도 있다.


리액티브 스트림

리액티브 스트림(reactive stream)링크은 논블로킹 역압을 사용한 비동기 스트림 처리의 표준이다. 발행자(publisher)와 구독자(subscriber) 사이의 간단한 계약을 정의하는 명세이기도 하다. 리액티브 프로그래밍을 위한 표준 및 명세가 바로 리액티브 스트림이라고 할 수 있다.

리액티브 프레임워크는 애플리케이션 수준의 리액티브를 가능하게 해주는 애플리케이션 프레임워크이다. 리액티브 프레임워크의 예로 RxJava, Akka, 리액터(Reactor) 등이 있다. 리액티브 프레임워크가 제공하는 다양한 리액티브 연산자를 사용하여 데이터 스트림의 항목들에 반응하는 코드를 작성할 수 있다.

이 중 프로젝트 리액터(Project Reactor)는 VM 웨어에서 만든 리액티브 스트림 구현체다. 스프링 웹플럭스는 리액터를 기반으로 리액티브 프로그래밍을 구현한다.


역압

역압(backpressure)이란 데이터를 받는 구독자가 데이터를 제공하는 발행자에게 자신이 처리할 수 있는 만큼의 데이터를 요청하여 데이터 전송량을 조절하는 기법 및 메커니즘이다. 이때 구독자를 다운스트림, 발행자를 업스트림이라고도 한다. 구독자가 순간적으로 처리할 수 없을 정도로 너무 많은 데이터를 받게 되면 발행자에게 신호를 보내 구독자가 다시 데이터를 처리할 수 있는 상태가 될 때까지 발행자의 데이터 전송량을 조절할 수 있게 한다.

역압을 통한 수요 조절 개념을 애플리케이션 수준의 개념이다. 애플리케이션 내에서 데이터를 전달받는 구독자 컴포넌트가 데이터를 제공하는 발행자 컴포넌트에게 신호를 보내 데이터 전송량을 조절한다. 구독자 및 발행자 컴포넌트는 클래스의 인스턴스, 레이어드 아키텍처(layered architecture)에서 레이어, 서비스 구현체 등이 될 수 있다.


리액티브 시스템

시스템 수준에서 리액티브 특성을 도입한 리액티브 시스템은 데이터를 전달받는 시스템(또는 서비스)인 클라이언트가 데이터를 제공하는 시스템인 서버에게 신호를 보내 데이터 전송량을 조절한다. 시스템 수준의 리액티브는 서로 다른 기술 스택으로 구성된 서비스(또는 애플리케이션)들이 네트워크 상에서 물리적으로 분리된 경우에 적용하는 것을 말한다.

애플리케이션 수준에서 역압을 통해 데이터의 흐름을 수요 기반으로 제어한다는 리액티브 개념을 시스템 수준에 도입하는 데에는 HTTP 프로토콜 기반 네트워크 통신 방식의 기술적인 제약 사항으로 인해 어려움이 있다. 따라서 리액티브 시스템은 메시지 기반(message driven)으로 동작하도록 구성된다. 물리적으로 분리된 시스템 구성 요소들 사이에 시간적인 경계를 설정하고 구성 요소 간 비동기적 메시지 전달(asynchronous message passing)을 통해 데이터를 주고 받는다. 참고로 메시지 기반은 발행자가 명확하게 정해진 수신자에게 메시지를 전송하는 개념인 반면 이벤트 기반(event driven)은 발행자가 이벤트(메시지)를 발생시키고 수신자는 이벤트를 관찰하며 어떤 수신자가 이벤트를 소비하는지 알 수 없다는 차이가 있다. 애플리케이션 수준에서 비동기 데이터 스트림을 처리하는 리액티브 프로그래밍은 이벤트 기반이다.

정리하자면 역압을 통한 수요 조절 개념을 애플리케이션 수준과 시스템 수준 둘 다 적용할 수 있으며 애플리케이션 수준의 역압은 프로그래밍 코드 구현 수준에서 신호 전파를 통해 데이터 전송량을 조절하는 것을 기반으로 하고, 시스템 수준의 역압은 메시지 기반 시스템 구축을 기반으로 한다. 애플리케이션 수준의 역압, 시스템 수준의 역압은 데이터가 어디서 어디로 흘러가는지의 차이이며 데이터의 흐름을 수요 기반으로 제어한다는 개념은 동일하다.


리액티브 시스템과 R소켓

리액티브 시스템은 비동기 메시징 처리를 기반으로 하며 서비스 간 단순 요청/응답 모델을 사용하는 프로토콜을 넘어선 리액티브 원칙을 따른다. 기존 HTTP를 통한 통신은 매우 단순한 요청/응답 모델이며 TCP 연결당 하나의 요청을 처리할 수 있어 리소스 측면에서 비효율적이고 요청의 수가 곧바로 성능 문제로 이어지는 한계가 있다. 바이너리 프로토콜인 HTTP/2.0은 더 적은 크기의 바이너리 데이터를 사용하고 스트림(stream)을 통한 다중화(multiplexing) 처리로 하여 네트워크 통신의 처리량, 지연시간, 속도를 개선하지만 데이터 스트리밍, 양방향 통신, 역압(backpressure) 지원 등 단순한 요청/응답 모델을 넘어선 상호작용 지원에 한계가 있다.

이러한 기존 애플리케이션 프로토콜의 한계를 보완하기 위해 R소켓(RScoket)이라는 프로토콜이 등장하였다. R소켓은 바이트 스트림 전송을 위한 애플리케이션 바이너리 프로토콜이며, 리액티브 원칙을 수용하는 공식 통신 프로토콜이다. R소켓은 HTTP나 웹소켓이 역압 개념을 지원하지 않아 리액티브한 특성을 제공하지 못하는 한계점을 해결하고자 등장하였다. 참고로 API가 비동기 방식이라고 해서 전체 과정이 리액티브한 것은 아니다. R소켓은 리액티브 스트림을 기반으로 한다. R소켓은 비동기 네트워크 통신을 위해 모든 통신을 단일 네트워크 연결 상에서 다중화된 메시지 스트림으로 모델링하며, 요청에 대한 응답을 기다리는 동안 동기적으로 블로킹하지 않는다.


스프링 웹플럭스

스프링 웹플럭스(Webflux)는 스프링 프레임워크 상에서 대용량의 트래픽을 처리하기 위해 리액티브 프레임워크인 리액터를 사용하여 리액티브 프로그래밍을 가능하게 만든 스프링 프로젝트 및 웹 스택이다. 스프링 버전 5에서 스프링 MVC의 대안으로 리액티브 파운데이션 기반 프로젝트인 스프링 웹플럭스를 도입하였다. 스프링 웹플럭스 완벽한 비동기 및 논블로킹(non-blocking) 방식으로, 스레드마다 응답을 처리하는 전통적인 대형 스레드 풀 대신 이벤트 루프 실행 모델을 사용할 수 있도록 한다. 스프링 웹플럭스는 리액티브 스트림(reactive stream) 구현체 중 하나인 프로젝트 리액터(Project Reactor)를 기반으로 한다.

웹 애플리케이션에 스프링 웹플럭스를 도입하면 리액티브 스트림 구현체인 리액터를 사용하여 리액티브 프로그래밍 패러다임을 적용하고 완전한 비동기 및 논블로킹 특성을 가지는 리액티브 스트림 파이프라인을 구축할 수 있다.

스프링 웹플럭스는 기본 웹 컨테이너로 네티를 사용한다. 네티는 비동기, 논블로킹 방식으로 동작하는 고성능 네트워크 애플리케이션 프레임워크로, 이를 이용하여 HTTP 뿐만 아니라 TCP, UDP 프로토콜을 사용하여 통신하는 웹 서버 구성이 가능하다. 네티 자체는 자바 서블릿 명세를 따르지는 않으므로 서블릿 컨테이너가 아니지만 자바 서블릿 기술과 통합하여 고성능 웹 애플리케이션을 구성하는데 사용된다. 리액티브 스트림 구현체인 리액터를 네티 프레임워크와 결합한 것이 바로 리액터 네티(reactor netty)이다. 즉, 리액터 네티는 리액터의 리액티브 프로그래밍을 위한 다양한 API를 비동기, 논블로킹 기반 고성능 네티 프레임워크 상에서 사용할 수 있도록 만든 네티 구현체라고 볼 수 있다.

서블릿 3.1 이전의 서블릿 API는 NIO(논블로킹 IO)를 지원하지 않는다. 요청 당 스레드 모델을 사용하는 서블릿 컨테이너(예: 톰캣)의 경우 요청 수만큼 스레드가 생성되며 각 스레드는 이후 작업(예: 데이터베이스 및 외부 API로부터 데이터 조회)이 수행 완료될 때까지 스레드는 블로킹된다. 클라이언트로부터의 요청 및 클라이언트로의 응답 작업이 완료될 때까지 해당 스레드는 블로킹되어 다른 작업을 수행하지 못한 채 기다려야 한다. 스레드풀을 사용하여 필요에 따라 적절한 수준으로 스레드의 수를 관리할 수 있지만 스레드를 생성 및 제거하는 것은 비용이 많이 든다.

서블릿은 3.1 버전부터 NIO를 지원한다. 논블로킹 IO를 통해 요청 당 스레드가 작업을 수행하는 대신 적은 수의 스레드로 많은 요청을 동시에 처리할 수 있게 되었다. 따라서 해당 버전의 서블릿 스펙을 구현한 톰캣 서블릿 컨테이너를 사용하여 스프링 웹플럭스를 구동할 수도 있다. 하지만 서블릿 3.1에 도입된 비동기 방식은 리액티브 이벤트 루프(event loop)와 역압 개념을 지원하지 않으므로 완전한 리액티브 웹 애플리케이션을 구성하는 데에는 한계가 있다.

요청 당 스레드 모델을 통한 요청 처리 방식을 성능면에서 개선하기 위한 이벤트 루프 모델(event loop model)은 비동기 요청과 스레드 논블로킹을 기반으로 한다. 이 모델에서는 요청에 대한 작업을 이벤트 큐라는 대기열에 쌓고 스레드는 이벤트들을 처리하기 위한 작업들을 비동기적으로 요청한다. 작업 요청 시 콜백 함수를 함께 전달함으로써 작업이 완료되었을 때 별도의 추가 작업을 수행하여 응답 결과를 호출자 스레드에 전달할 수 있다. 이벤트 루프 모델에 반응형 프로그래밍의 역압 특성을 도입하면 요청과 응답을 조율하여 시스템의 리소스 사용 효율을 높일 수도 있다. 네티는 톰캣과 달리 이벤트 루프 모델을 사용한다.

스프링 웹플럭스를 프로젝트에 적용하기 위해서는 스프링 리액티브 웹(spring reactive web)을 의존성으로 추가하면 된다. 해당 의존성에 해당하는 라이브러리는 spring-boot-starter-webflux이다. 이 라이브러리에는 역압 기능 지원을 위해 네티 프레임워크 기반으로 만들어진 리액터 네티가 포함되어 있다. 리액터 네티는 리액티브 스트림, 리액터 코어, 네티를 포함한다. spring-boot-starter-webflux에 대한 의존관계를 설정하면 리액터 네티 스타터인 spring-boot-starter-reactor-netty 라이브러리도 프로젝트에 자동으로 포함된다.

스프링 웹플럭스의 컨트롤러 클래스의 라우팅(핸들러) 메서드는 데이터 스트림 객체인 Mono(또는 Flux)를 반환할 수 있다. 클라이언트가 HTTP를 통해 요청한 데이터는 비동기적으로 전달되며 이러한 비동기적 데이터 응답은 리액터 네티가 처리한다.

앞에서 데이터 스트림 처리를 위해서는 구독이라는 과정이 반드시 필요하다고 했다. 리액터 타입을 사용하여 데이터 스트림을 처리할 때 옵저버는 모든 데이터 준비에 성공하였을 때 어떤 작업을 콜백으로 수행한다. 이 콜백 메서드는 반드시 데이터 스트림에 대한 구독이 일어나야 호출된다. 구독을 하지 않으면 아무 일도 일어나지 않는다. 리액터 타입을 반환하는 웹 컨트롤러 메서드에 대해서도 구독 과정이 반드시 필요하다.

웹 애플리케이션에서 이러한 구독을 수행하기 위해서는 리액티브 스트림 방식으로 동작하는 웹 컨트롤러가 필요하다. 스프링 웹플럭스는 바로 이 리액티브 웹 컨트롤러를 제공하며 구독을 하는 주체가 바로 스프링 웹플럭스의 리액티브 웹 컨트롤러이다. 웹플럭스는 웹 요청을 리액티브하게 처리하기 위한 도구이다. 따라서 웹플럭스의 웹 컨트롤러는 데이터 스트림을 논블로킹, 비동기 방식으로 처리한다. 컨트롤러 메서드가 리액터 타입을 반환하도록 정의하면 클라이언트의 요청이 들어올 때 웹 컨트롤러는 적절한 타이밍에 구독을 하며 준비 완료된 데이터가 데이터 스트림을 통해 전달된다. 웹 컨트롤러는 클라이언트가 요청한 데이터가 준비 완료될 때까지 블로킹되지 않으며, 따라서 클라이언트의 다른 웹 요청을 받는 등의 다른 작업을 수행할 수 있다. 다른 요청을 받고 또다른 데이터 스트림 처리를 요청하는 작업을 수행하다가 기존 요청에 대한 데이터 준비 작업이 완료되면 적절한 신호를 받고 준비 완료된 데이터를 클라이언트에게 전달한다.

컨테이너로 리액터 네티를 사용할 경우로 사용할 경우 reactor.netty.http.server 패키지의 HttpServer 클래스의 onStateChange() 메서드에서 subscribe() 메서드를 호출함으로써 구독을 수행한다링크. 서블릿 컨테이너의 경우 org.springframework.http.server.reactive 패키지의 ServletHttpHandlerAdapter 클래스의 service() 메서드에서 subscribe() 메서드를 호출함으로써 구독을 수행한다링크.

스프링 웹플럭스의 역압

리액터의 타입과 연산

리액터는 데이터를 처리하기 위해 데이터 스트림 객체인 MonoFlux 두 타입을 제공한다. 모든 리액터 타입은 리액티브 스트림 명세를 따르고 호환성을 보장하기 위해 리액티브 스트림 타입인 Publisher 인터페이스를 구현한다. Mono는 하나의 데이터를 비동기적으로 전달해주는 역할을 수행하는 객체이며, Flux는 하나 이상의 데이터를 비동기적으로 전달해주는 역할을 수행하는 객체이다. 리액터 타입은 시간 순으로 발생 및 진행되는 일련의 데이터가 흐르는 데이터 스트림이다. 리액터 타입은 특정 객체를 담는 컨테이너라고도 한다.

Flux는 일련의 특정 POJO 타입 이벤트의 발행자(publisher)이므로 제네릭이다. 즉, Flux<T>T의 발행자이다. MonoFlux가 하나의 항목만 가지고 있거나 아예 항목을 가지고 있지 않은 경우를 위한 리액터 타입이다. Flux의 모든 연산자가 단일 값 시퀀스에 대해 의미가 있는 것은 아니기 때문에 Mono의 API는 Flux와 매우 유사하지만 있지만 차이가 있다.

리액터 타입 객체를 사용하여 데이터 스트림을 생성하고 나면 이를 사용하여 다음과 같이 오직 두 가지 처리를 수행할 수 있다.

  1. 리액터 연산자를 사용하여 데이터 스트림에 대한 연산 처리 (스트림 항목 변환 및 필터링, 스트림들의 결합 등)
  2. 발행자인 데이터 스트림을 구독

리액터 연산자를 통해 구현한 로직은 데이터가 흐르기(flow) 시작할 때만 실행된다. 데이터 스트림인 Mono(또는 Flux)를 구독하기 전까지는 데이터가 흐르지 않으며 스트림의 항목 처리를 위한 연산자는 실행되지 않는다. 즉, 구독을 해야 데이터가 흐르고 연산자를 통해 구현한 콜백 로직이 실행된다.

리액티브 연산자를 사용하여 정의한, 데이터 스트림 서브젝트에 대한 옵저버의 콜백 종류는 다음과 같다. 이때 동작(행위)을 인자로 넘기기 위해 함수형 프로그래밍 방식이 사용된다. 데이터를 처리하는 로직에 데이터를 전달하는 대신 데이터에 로직을 전달한다.

  1. 데이터가 방출될 때(일부 데이터가 제공될 때, 데이터가 하나라도 준비가 되었을 때) 어떤 일이 일어날지 정의: Mono(또는 Flux)의 연산자 메서드의 인자로 데이터가 방출되었을 때 실행할 함수를 전달한다.
  2. 데이터 스트림 처리 도중 에러가 발생하였을 때 어떤 일이 일어날지 정의: Mono(또는 Flux)의 연산자 메서드의 인자로 데이터 스트림 처리 도중 에러가 발생하였을 때 실행할 함수를 전달한다.
  3. 모든 데이터 준비에 성공하였을 때(데이터 스트림이 완료되었을 때) 어떤 일이 일어날지 정의: Mono(또는 Flux)의 연산자 메서드의 인자로 모든 데이터가 준비되었을 때 실행할 함수를 전달한다.

데이터 스트림 생성

  • generate()
  • create()

콜백

1. 데이터가 방출될 때 어떤 일이 일어날지 정의

Mono(또는 Flux)의 메서드의 인자로 데이터가 방출될 때 처리할 기능을 전달한다. 이때 동작(행위)을 인자로 넘기기 위해(동작을 파라미터화 하기 위해) 람다식을 활용한 함수형 프로그래밍 방식이 사용된다. 함수형 프로그래밍에서는 어떤 기능을 수행하는 함수를 메서드의 인자로 넘긴다. 자바의 함수는 1급(first-class) 지원이 되지 않으므로 람다 객체를 사용한다.

메서드의 종류는 다음과 같다.

  • 공통
    • doOnNext()
    • doOnEach()
    • map()
    • flatMap()

1.1 데이터 반복 처리

  • doOnNext() 메서드: 데이터 스트림이 데이터 항목을 성공적으로 방출하였을 때 실행될 콜백을 등록한다. 업스트림으로부터 받은 시그널이 콜백 함수의 인자로 전달된다. 업스트림으로부터 onNext 시그널이 전파되면 콜백 함수가 실행된다. 콜백 함수는 Consumer 객체이다. Consumer가 실행된 후 onNext 시그널이 다운스트림으로 전파된다.
  • doOnEach() 메서드: 데이터 스트림이 데이터 항목을 성공적으로 방출하거나, 방출 도중 에러가 발생하였을 때 실행될 콜백을 등록한다. 업스트림으로부터 받은 시그널이 콜백 함수의 인자로 전달된다. 데이터 항목이 성공적으로 방출되는 경우 업스트림으로부터 onNext 시그널이 전파되면 콜백 함수가 실행되며, 방출 도중 에러가 발생하는 경우 onError 시그널이 전파되면 콜백 함수가 실행된다. 콜백 함수는 Signal 객체를 인자로 받는 Consumer 객체이므로 데이터 항목의 방출 성공 또는 방출 실패 이벤트에 반응하는 함수 정의가 가능하다. Consumer가 실행된 후 관련 시그널이 다운스트림으로 전파된다.

1.2 데이터 변환(매핑) 처리

map()

map() 메서드는 하나의 데이터 스트림으로 전달되는 데이터 항목들에 변환 함수를 적용한 후 변환된 항목이 담긴 데이터 스트림 객체를 반환한다. 데이터 스트림의 데이터 항목을 1:1로 변환한다. 변환 시 데이터 스트림의 항목 순서를 유지하기 위해 동기적으로 처리한다.

map() 메서드의 인자로 하나의 함수를 전달한다. 이 함수는 하나의 데이터를 인자로 받아 다른 데이터로 변환하는 기능을 동기적으로 수행한다. map() 메서드는 Function<T, R>을 인자로 받고 Mono<T>(또는 Flux<T>)를 반환한다.

map(데이터변환함수) 메서드를 호출하면 다음과 같은 일이 발생한다.

  1. 방출된 하나 이상의 데이터가 변환 함수에 하나씩 전달된다.
  2. 변환 함수에 의해 데이터의 변환이 일어난다.
  3. 변환된 데이터들은 다시 Mono(또는 Flux) 데이터 스트림에 담겨 반환된다.

map() 메서드 호출 시 데이터 변환 함수가 실행되지는 않으며 반드시 subscribe() 메서드 호출을 통해 데이터 스트림을 구독해야 데이터 변환이 일어난다.

flatMap()

flatMap() 메서드는 하나 이상의 데이터 스트림으로 전달되는 데이터 항목들에 변환 함수를 적용하여 새로운 데이터 스트림 객체를 만든다. 데이터 스트림의 데이터 항목을 1:N(N은 또다른 데이터 스트림을 의미)으로 변환한다. map() 메서드와 달리 데이터 스트림을 비동기적으로 처리하므로 데이터 스트림의 항목 순서 유지를 보장하기 어렵다. 항목 순서 유지를 위해서는 flatMapSequential() 메서드를 사용한다.

flatMap() 메서드의 인자로도 하나의 함수를 전달한다. 이 함수는 map() 메서드와 같이 하나의 데이터 항목을 인자로 받아 다른 데이터 항목으로 변환하는 기능을 수행하는 것은 동일하지만 변환 결과는 Publisher 인터페이스 구현체인 Mono(또는 Flux)이다. flatMap() 메서드는 Function<T, Mono<T>(또는 ``Function<T, Flux`)를 인자로 받고 `Mono`(또는 `Flux`)를 반환한다.

flatMap(데이터변환함수) 메서드를 호출하면 다음과 같은 일이 발생한다.

  1. 방출된 하나 이상의 데이터가 변환 함수에 하나씩 전달된다.
  2. 변환 함수에 의해 데이터의 변환이 일어난다. 변환된 데이터는 또다른 데이터 스트림 객체이다. 데이터 항목 수만큼 새로운 데이터 스트림 객체가 생성된다.
  3. 변환된 데이터 스트림 객체(하나 이상)에 포함되어 있는 데이터 항목들은 다시 하나의 Mono(또는 Flux) 데이터 스트림에 담겨 반환된다.


flatMap() 메서드 호출 시 데이터 변환 함수가 실행되지는 않으며 반드시 subscribe() 메서드 호출을 통해 데이터 스트림을 구독해야 데이터 변환이 일어난다.

1.3 데이터 필터링 처리

2. 데이터 스트림 처리 도중 에러가 발생하였을 때 어떤 일이 일어날지 정의

  • doOnError() 메서드: 데이터 스트림이 완료되었으나 에러가 발생할 때 실행될 콜백을 등록한다. 업스트림으로부터 onError 시그널이 전파되면 콜백 함수가 실행된다. 첫 번째 인자로 어떤 에러가 발생할 때 콜백을 실행할지 명시하기 위해 에러 객체를 전달하거나 Predicate 객체를 전달한다. 콜백 함수는 Consumer 객체이다. Consumer가 실행된 후 onError 시그널이 다운스트림으로 전파된다.

3. 모든 데이터 준비에 성공하였을 때 어떤 일이 일어날지 정의

  • 공통
  • Flux
    • doOnComplete() 메서드: 데이터 스트림이 완료되었을 때 실행될 콜백을 등록한다. 업스트림으로부터 onComplete 시그널이 전파되면 콜백 함수가 실행된다. 콜백 함수는 Runnable 객체이다. Runnable이 실행된 후 onComplete 시그널이 다운스트림으로 전파된다.

데이터 스트림 객체 대상 연산

1. 하나의 스트림 또는 여러 스트림의 항목을 입력으로 사용하여 새로운 스트림을 만든다.

2. 서로 다른 두 스트림을 하나로 합친다.

  • zip() 메서드: 두 개 이상의 데이터 스트림을 합친다. 각각의 데이터 스트림이 데이터 항목을 모두 방출할 때까지 기다린 후 방출된 데이터 항목들을 합쳐 튜플(Tuple) 객체로 만든다. 데이터 스트림 중 하나라도 완료되면 이 연산은 중지된다.

3. 하나의 스트림을 필터링하여 새로운 스트림을 만든다.

4. 하나의 스트림이 포함하는 항목을 특정 항목으로 매핑하여 새로운 항목을 포함하는 스트림을 만든다.

리액터 동작 흐름

  1. 구독자는 필요한 데이터를 발행자에게 요청한다.
  2. 구독자는 subscribe() 메서드를 사용하여 데이터 스트림을 구독한다.
  3. 발행자는 필요한 데이터를 준비하여 Mono(또는 Flux) 데이터 스트림 객체에 담는다.
  4. 데이터 스트림을 통해 데이터가 전달되면서 적절한 시그널이 전파되면 콜백이 실행된다.


웹플럭스 관련 애플리케이션 프로퍼티

  • .spring.codec.max-in-memory-size: 입력 스트림을 집계해야 할 때마다 버퍼링할 수 있는 바이트 수에 대한 제한이다. 데이터의 인코딩/디코딩 처리 중 애플리케이션이 메모리에서 사용할 수 있는 버퍼 데이터의 최대 크기이다. 이 설정은 자동 구성된 웹플럭스 서버 및 웹클라이언트 인스턴스에만 적용된다. 기본값은 설정되지 않으며, 이 경우 개별 코덱 기본값이 적용된다. 대부분의 코덱은 기본적으로 256K로 제한된다.


웹플럭스 테스트

비동기 프로그래밍 시 주의점

블로킹 및 동기 방식으로 동작하는 코드를 작성하면 비동기 프로그래밍의 장점을 얻을 수 없으며 애플리케이션의 리소스 효율이 떨어지게 된다. 애플리케이션이 적은 수의 스레드를 사용하는 경우(예: 내장 네티를 사용) 블로킹 코드가 존재하지 않도록 해야 한다. 불가피하게 CPU 집약적인 연산이 존재한다면 해당 작업은 별도의 스레드로 처리하도록 하여 메인 스레드는 블로킹되지 않도록 하는 것이 좋다.

기존 동기 방식의 프로그래밍을 사용하면서 요청 당 스레드를 생성하는 방식이라면(예: 톰캣 서블릿 컨테이너 사용) 부분적으로 논블로킹, 비동기 처리를 하여 스레드 사용 효율을 증가시킬 수도 있다.


springdoc-openapi

스프링 부트 프로젝트의 API 자동 문서화 라이브러리인 springdoc-openapi는 스프링 웹플럭스를 지원한다. 스프링 부트 2 버전에서는 springdoc-openapi v1을 사용하고 스프링 부트 3 버전부터는 v2를 사용한다.

// Spring Boot 2
implementation("org.springdoc:springdoc-openapi-webflux-ui:1.7.0")

// Spring Boot 3
implementation("org.springdoc:springdoc-openapi-starter-webflux-ui:2.3.0")

블로킹 호출을 감지하는 도구인 블록하운드(blockhound)를 사용하는 경우 springdoc-openapi를 적용하고 스웨거 UI 페이지나 OpenAPI 디스크립션 URL을 호출하는 경우 다음과 같이 블로킹 호출이 감지된다.

reactor.blockhound.BlockingOperationError: Blocking call! java.io.RandomAccessFile#readBytes
	at java.base/java.io.RandomAccessFile.readBytes(RandomAccessFile.java) ~[na:na]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint  HTTP GET "/webjars/swagger-ui/index.html" [ExceptionHandlingWebHandler]


블로킹 호출의 원인은 springdoc-openapi가 내부적으로 호출하는 InflaterInputStreamread() 메서드이며, 다음과 같이 블로킹 호출을 허용하여 감지를 막을 수는 있지만 해당 메서드를 호출하는 다른 라이브러리나 코드에 의한 호출도 허용되므로 정확한 해결 방법은 아니다.링크

BlockHound.install(b -> b.allowBlockingCallsInside(InflaterInputStream.class.getName(), "read"));


참고

Comments