[자바/스프링] 비동기/동시성 프로그래밍 - 실행 제어

JVM 스레드


동기적 작업

동기적 프로그래밍에서 메서드 내 일반적인 코드 실행 흐름은 다음과 같다.

메서드명() {
  동기적 요청1
  동기적 요청2
}

별다른 라이브러리 또는 API 없이 수행하고자 하는 코드들을 절차적으로 작성한 후 실행하면 된다. 동기적 요청이 완료되어야 그 다음 요청들이 실행될 수 있다. 별도의 스레드 생성 없이 하나의 호출자 스레드가 동기적으로 요청한 작업의 결과를 기다린다면 요청이 완료되는 동안 블로킹(blocking)되어 다른 작업을 수행할 수 없다.

CPU 계산 집약 또는 IO 집약적인 연산들을 처리하는 스레드가 블로킹되었을 때 다른 스레드가 요청을 처리할 수 있도록 하기 위해 작업의 비동기 및 논블로킹 실행이 필요하며 이를 통해 애플리케이션의 리소스 효율성과 확장성을 높일 수 있다.


java.util.concurrent

자바 5부터 제공되는 java.util.concurrent 패키지는 비동기/동시성 프로그래밍을 위한 유용한 유틸리티 클래스들을 제공한다. 이 패키지의 Executor는 주어진 작업을 실행하는 표준 인터페이스이다. 어떤 Executor 구현체를 사용하는지에 따라 작업은 새롭게 생성된 스레드에서 실행되거나 기존 작업 실행 스레드(또는 execute() 메서드를 실행한 스레드)에서 실행될 수 있다. 작업들은 또한 순차적으로 실행되거나 비동기(asynchronous) 및 동시적으로(concurrently) 실행될 수 있다. Executorexecute() 메서드는 Runnable을 인자로 받아 실행한다.

ExecutorService는 작업의 대기열을 관리하고 스레드의 가용성에 따라 작업의 실행을 스케줄링하며, 작업의 종료를 제어하는 방법을 제공함으로써 보다 완벽한 비동기/동시성 작업 실행 프레임워크를 제공한다. ExecutorService는 스레드 풀을 제공하고 실행할 작업을 스레드에 할당한다. ExecutorServiceRunnable 또는 Callable 작업을 실행할 수 있으며, Runnable 또는 Callable로 표현되는 모든 함수의 비동기 실행을 관리하기 위한 메서드를 제공한다. Runnable은 작업 완료 후 결과를 반환하지 않는 반면, Callable은 결과를 반환한다.


Future

ExecutorService는 하나 이상의 비동기적 작업들의 진행 상태(실행 및 취소)를 추적하고 수행 결과(Callable인 경우)를 확인할 수 있는 Future를 생성한다. Future 인터페이스는 비동기 작업을 관리하기 위해 사용된다.

Future는 비동기 작업의 결과를 나타내는 인터페이스이며 서로 다른 스레드에서 비동기적으로 수행되는 작업들의 결과를 저장한다. Future는 작업이 정상적으로 완료되었는지 또는 취소되었는지 확인하는 메서드, 작업이 진행 중이라면 작업이 끝날 때까지 기다린 후 완료되면 결과를 받는 메서드, 작업을 취소하는 메서드 등을 제공한다.


작업 실행

작업을 실행하는 ExecutorService의 메서드는 다음과 같다.

  • execute()
  • submit()
  • invokeAny()
  • invokeAll()


execute() 메서드는 비동기 작업을 실행한다. 이 메서드는 Executor 인터페이스로부터 제공되는 메서드이다. 값을 반환하지 않는 Runnable을 인자로 받으므로 반환형이 없다. 따라서 작업의 실행 결과를 얻거나 작업의 상태를 확인할 수는 없다. 즉, 작업을 실행하기만 한다.

ExecutorService executorService = Executors.newFixedThreadPool(10);

// 비동기 작업 정의
Runnable runnableTask = () -> { ... }

executorService.execute(runnableTask);


submit() 메서드는 Callable 또는 Runnable 작업을 ExecutorService에 제출(submit)함으로써 비동기 작업을 실행하고 작업 실행 결과를 갖고 있는 Future를 반환한다. 이 때 작업은 값을 반환하는 Callable이다. Runnable 작업을 실행하는 경우 작업 실행이 성공적으로 완료되면 null을 반환한다.

ExecutorService executorService = Executors.newFixedThreadPool(10);

// 비동기 작업 정의
Callable callableTask = () -> { ... }

Future<String> future = executorService.submit(callableTask);


invokeAny() 메서드는 여러 작업들을 각각 비동기적으로 실행하고 실행을 성공한 작업이 존재하는 경우 성공한 작업 중 하나의 작업 실행 결과를 반환한다.

ExecutorService executorService = Executors.newFixedThreadPool(10);

// 비동기 작업 정의
Callable callableTask1 = () -> { ... }
Callable callableTask2 = () -> { ... }
Callable callableTask3 = () -> { ... }

// 작업 목록 정의
ArrayList<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask1);
callableTasks.add(callableTask2);
callableTasks.add(callableTask3);

String result = executorService.invokeAny(callableTasks);


invokeAll() 메서드는 여러 작업들을 각각 비동기적으로 실행하고 각각의 작업 실행 결과를 갖고 있는 Future 리스트를 반환한다.

List<Future<String>> futuresFromInvokeAll = executorService.invokeAll(callableTasks);


작업 상태 확인 및 작업 결과 확인

작업의 상태(완료 또는 취소) 확인은 Future가 제공하는 다음 메서드들을 통해 수행할 수 있다.

  • isDone(): 작업이 완료되었는지 확인한다. 요청 스레드를 블로킹하지 않는다.
  • isCancelled(): 작업이 취소되었는지 확인한다. 요청 스레드를 블로킹하지 않는다.


작업의 호출자는 ExecutorService에 작업을 제출한 후 반환되는 Future의 메서드를 호출하여 작업의 상태를 확인할 수 있다. 작업이 완료되기 전에 isDone() 또는 isCancelled() 메서드를 호출하면 false를 반환받으며 get() 메서드를 통해 작업의 실행 결과를 조회한 후 메서드를 호출하면 작업의 성공적인 실행 완료 여부에 따라 다른 값을 반환받게 된다.

작업 수행 결과를 확인하는 메서드는 다음과 같다.

  • get(): 작업 실행 결과를 조회한다. 작업 실행 시간에 대한 타임아웃 설정이 가능하다. Future 인터페이스가 제공하는 메서드이다. 작업이 완료될 때까지 요청 스레드를 블로킹한다.


get() 메서드는 작업 실행이 성공적으로 완료된 경우 결과를 반환한다. 필요한 경우 작업이 완료될 때까지 최대한 주어진 시간을 기다린 다음, 가능한 경우 결과를 반환한다. get() 메서드 실행 시 타임아웃 시간과 시간 단위 두 개의 인자를 전달하여 작업 실행 시간에 대한 타임아웃 설정을 할 수 있다. 작업 실행 시간이 설정한 타임아웃 시간을 초과하는 경우 체크 예외(checked exception)인 TimeoutException이 발생한다.

ExecutorService executorService = Executors.newFixedThreadPool(10);

// 비동기 작업 정의
Callable callableTask = () -> { ... }

// 단일 작업 실행 및 작업 결과 조회
String result = executorService.submit(callableTask).get();


Runnablerun() 메서드는 반환형이 없으므로 ExecutorServicesubmit() 메서드로 Runnable을 전달하는 경우, 반환되는 Futureget() 메서드는 null을 반환한다. Future를 사용하고 싶지만 사용 가능한 결과를 제공하지 않으려면 Future<?> 형식으로 타입을 선언하고 기본 작업의 결과로 null을 반환할 수 있다.

get() 메서드는 작업 스레드가 작업이 수행 완료된 후 결과를 반환할 때까지 요청 스레드가 기다리는 블로킹 메서드이다. 따라서 작업이 완료되기까지 오래 걸리는 경우 그 시간 동안 스레드가 블로킹되므로 사용에 주의해야 한다. 예상치 못한 긴 스레드 블로킹을 막기 위해서는 타임아웃 설정을 사용해야 한다.

작업이 실행 도중 취소된 후 get() 메서드를 호출하면(이미 취소된 작업의 결과를 얻으려고 하면) 체크 예외인 CancellationException 예외가 발생한다.

ExecutorServiceFuture를 사용한 비동기/동시성 프로그래밍 구현 코드는 다음과 같다.

ExecutorService executorService = Executors.newFixedThreadPool(10);

// 비동기 작업 정의
Callable callableTask1 = () -> { ... }
Callable callableTask2 = () -> { ... }
Callable callableTask3 = () -> { ... }

// 작업 목록 정의
ArrayList<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask1);
callableTasks.add(callableTask2);
callableTasks.add(callableTask3);

// 단일 작업 실행 및 작업 결과 조회
Future<String> futureFromSubmit = executorService.submit(callableTask1);
String result = futureFromSubmit.get();

// 다중 작업 실행(비동기 작업 병렬 실행) 및 작업 결과 조회
List<Future<String>> futuresFromInvokeAll = executorService.invokeAll(callableTasks);
for (Strig future : futuresFromInvokeAll) {
  String result = future.get();
}


Future 인터페이스를 사용하는 경우 비동기 작업이 실행 완료되었을 때 실행할 콜백 코드를 관리하기 어렵고 비동기 작업과 콜백들의 실행 중 에러가 발생하는 경우 해당 에러를 처리하기 어렵다는 단점이 있다. 따라서 Future에 콜백 함수를 연결하여, 작업 수행 결과를 사용할 수 있을 때 해당 콜백 함수가 자동으로 호출되도록 할 수 없다는 한계가 있다. 가장 큰 단점은 작업의 결과를 조회하는 get() 메서드가 스레드를 블로킹한다는 점이다.


포크/조인 프레임워크

자바 7부터 도입된 포크/조인(fork/join) 프레임워크는 분할 정복(devide and conquer) 접근 방식을 통해 사용 가능한 모든 프로세서의 코어를 사용하여 병렬 처리 속도를 높이기 위한 도구를 제공한다. 포크/조인 프레임워크는 내부적으로 분할 정복 알고리즘을 사용하여 ExecutoreService를 통한 비동기 작업 실행의 성능을 향상시킨다. 포크/조인 프레임워크는 재귀적으로 더 작은 부분으로 나눌 수 있는 작업의 속도를 향상시키기 위해 설계되었다.

포크/조인 프레임워크는 작업의 병렬 실행을 위해 포크와 조인이라는 두 과정을 수행한다. 먼저 비동기적으로 실행하기 충분할 때까지 작업을 서로 독립적인 하위 작업으로 분리하는 포크(fork) 과정이 수행된다. 분리된 하위 작업들이 별도의 스레드 상에서 실행 완료되면 모든 하위 작업들의 실행 결과를 재귀적으로(recursively) 결합하여 단일 결과로 만드는 조인(join) 과정이 수행된다. 포크/조인 프레임워크에서 스레드 풀은 ForkJoinPool이라는 별도의 클래스로 관리한다. ForkJoinPoolForkJoinWorkerThread 타입의 워커 스레드(worker thread)를 관리한다.

ExecutorService를 포크/조인 프레임워크로 대체하는 것이 항상 좋은 방법인 것은 아니다. 포크/조인 프레임워크는 동시성 처리를 보다 단순하게 하고 처리 성능을 향상시키지만 동시 실행에 대한 제어를 제한한다는 한계점이 있다. 작업의 동시 실행 시 생성되는 스레드의 수를 조절하고 특정 작업을 별도의 스레드에서 실행하는, 보다 세분화된 작업 제어 기능을 사용하기 위해서는 포크/조인 프레임워크 대신 ExecutorService를 사용하는 것이 좋다. 특정 작업에 특정 스레드를 할당하여 작업을 실행함으로써 작업 당 스레드 할당 방식으로 작업들을 각각 독립적으로 처리하기 위해서는 ExecutorService을 사용하는 것이 좋다.


CompletableFuture

기존 Future의 기능 상 한계점은 다음과 같다.

  • 실행 중인 작업을 직접 완료할 수 없다.
  • 작업이 완료되면 값을 반환하는 get() 메서드는 스레드를 블로킹한다.
  • 작업과 콜백을 결합할 수 없다.
  • 여러 작업들을 병합할 수 없다.
  • 작업 실행 도중 발생 가능한 에러를 처리할 수 없다.


자바 8부터 제공되는 CompletableFuture는 기존 Future 인터페이스 뿐만 아니라 CompletionStage라는 인터페이스를 추가로 구현하여 개선된 동시성 기능을 제공한다. CompletableFuture는 단어의 뜻 그대로 명시적으로 완료될 수 있는 Future를 의미한다. 즉, Future와 다르게 작업을 외부에서 명시적으로 완료할 수 있다. 또한 CompletableFuture는 비동기 작업과 콜백들을 서로 결합시키고 코드 실행 도중 발생 가능한 에러에 대한 처리를 개선하였다. CompletableFutureFuture 보다 발전된 개념이며 여러 비동기 작업을 쉽게 파이프라이닝하고 단일 비동기 작업으로 병합할 수 있게 해준다.

CompletableFuture는 기존 Future와 다르게 직접 완료될 수 있다. 작업이 어떻게 실행될지, 실행 도중 어떤 상황이 발생할 수 있을지에 따라 작업을 성공적 또는 예외적으로 완료시키고 작업 수행에 대한 결과값과 작업 상태(성공 및 실패 등)를 설정할 수 있다. CompletableFuture는 작업 완료 시 트리거되는 종속적인 콜백 함수인 액션(action)을 지원하는 CompletionStage로 사용될 수도 있다.


작업 실행

Future의 경우 get() 메서드는 작업이 완료될 때까지 호출 스레드를 블로킹한다. CompletableFuture를 사용하여 작업을 비동기적으로 실행하는 경우 호출 스레드는 별도의 작업 스레드에게 작업 결과 조회와 콜백 실행을 위임한다. 작업이 완료될 때까지 호출 스레드 대신 작업 스레드가 블로킹된다.

작업을 비동기적으로 실행하기 위해서 다음 두 메서드를 사용한다.

  • runAsync(): 결과를 반환하지 않는 작업을 비동기적으로 실행한다. 요청 스레드를 블로킹하지 않고 별도의 스레드에서 작업을 실행한다.
  • supplyAsync(): 결과를 반환하는 작업을 비동기적으로 실행한다. 요청 스레드를 블로킹하지 않고 별도의 스레드에서 작업을 실행한다.


runAsync() 메서드는 결과를 반환하지 않는 작업을 비동기적으로 실행하기 위해 사용한다. 메서드는 Runnable를 인자로 받고 작업을 실행한 후 CompletableFuture<Void>를 반환한다. Executor 인자를 추가로 전달하는 경우 해당 Executor로 작업을 실행하며 그렇지 않을 경우 사용 가능한 기본 포크/조인 풀을 써서 작업을 실행힌다.

CompletableFuture<Void> completableFuture = CompletableFuture
  .runAsync(() -> { ... });


supplyAsync() 메서드는 결과를 반환하는 작업을 비동기적으로 실행하기 위해 사용한다. Supplier<T>를 인자로 받고 CompletableFuture<T>를 반환한다. 여기서 T는 주어진 Supplier를 호출하여 얻은 값의 타입이다. Supplier는 결과의 공급자를 나타내는 간단한 함수형 인터페이스이다. Supplier 인터페이스는 인자가 없고 파라미터화된 타입의 값을 반환한다. Executor 인자를 추가로 전달하는 경우 해당 Executor로 작업을 실행하며 그렇지 않을 경우 사용 가능한 기본 포크/조인 풀을 써서 작업을 실행힌다.

CompletableFuture<String> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... });

String result = completableFuture.get();


작업의 상태 확인 및 작업 결과 확인

작업 수행 결과를 확인하는 메서드는 다음과 같다.

  • get(): 작업 실행 결과를 조회한다. 작업 실행 시간에 대한 타임아웃 설정이 가능하다. Future 인터페이스의 메서드이다.
  • join(): 작업 실행 결과를 조회한다. 작업 실행 시간에 대한 타임아웃 설정은 불가능하다. CompletableFuture 인터페이스의 메서드이다.


join() 메서드도 get() 메서드와 마찬가지로 작업이 수행 완료된 후 결과를 반환할 때까지 기다리는 블로킹 메서드이다. join() 메서드는 작업 실행이 정상적으로 완료되면 결과값을 반환하며, 예외적으로 완료되면 작업 실행 도중 발생 예외를 원인으로 하는, 언체크 예외(unchecked exception)인 CompletionException을 발생시킨다.

ExecutorService executorService = Executors.newFixedThreadPool(10);

// 비동기 작업 정의
Callable callableTask = () -> { ... }

// 단일 작업 실행 및 작업 결과 조회
String result = executorService.submit(callableTask).join();


get() 메서드는 체크 예외를 발생시키므로 코드 상에서 try/catch 구문으로 명시적 예외 처리가 필요한 반면, join() 메서드는 언체크 예외를 발생시키므로 명시적 예외 처리가 필요하지 않다.

CompletableFutureget() 메서드는 Future 인터페이스의 메서드이므로 마찬가지로 작업이 완료될 때까지 스레드를 블로킹한다. 완료되지 않는 작업일 경우 해당 코드는 스레드를 영원히 블로킹 상태로 만들어 버린다. 작업 실행이 의도치 않게 완료되지 않은 경우에 대해서 get() 메서드 대신 complete() 메서드를 사용하여 작업을 수동으로 완료할 수 있다. 이때 결과값을 메서드의 인자로 전달한다.

Futureget() 메서드를 사용하는 경우 클라이언트는 작업이 실행 완료될 때까지 기다려야 하며 작업이 예외적으로 실패하는 경우에도 별도의 예외 처리를 할 수 없어 기다려야 했다. 반면 CompletableFuturecomplete() 메서드를 사용하면 작업을 직접 완료하고 특정 결과값을 클라이언트에게 반환함으로써 클라이언트는 작업이 성공적으로 수행되었을 때와 작업이 실패했을 때 모두 지정된 특정 결과를 얻을 수 있게 된다.

CompletableaFutureFuture 인터페이스와 CompletionStage 인터페이스를 다중 구현하므로 API가 CompletableaFuture를 반환하는 경우 클라이언트는 Future 인터페이스가 제공하는 get() 메서드를 통해 블로킹 호출을 사용할 수도 있다. API가 CompletableaFuture 대신 CompletionStage를 반환하는 경우 클라이언트는 작업 실행 후 작업이 완료될 때까지 스레드를 블로킹하는 대신 반환값을 비동기 방식으로 사용하는 것이 기대된다.


콜백 결합

Future의 경우 첫 번째 작업이 완료된 후 해당 작업의 결과를 사용하여 두 번째 작업을 실행하기 위해서는 다음과 같이 동기적으로 코드를 작성해야 한다.

  1. 첫 번째 작업을 실행한다.
  2. get() 메서드를 호출하여 첫 번째 작업의 결과를 조회한다.
  3. 첫 번째 작업의 결과를 사용하는 두 번째 작업을 실행한다.


동기적으로 수행되는 두 작업을 연결하는 방법을 연결하는 방법은 get() 메서드를 사용하는 것이다. 위 경우 호출 스레드는 첫 번째 작업이 완료될 때까지 블로킹된다. CompletableFuture를 사용하는 경우 작업에 대한 콜백 결합이 가능하며, 호출 스레드는 직접 첫 번째 작업이 완료될 때까지 블로킹되지 않는다. 호출 스레드는 별도의 다른 작업 스레드에게 작업 결과 조회와 콜백 실행을 위임한다. 호출 스레드 대신 작업 스레드는 첫 번째 작업이 완료될 때까지 기다리고, 첫 번째 작업이 완료되면 콜백인 두 번째 작업을 실행한다.

비동기 작업이 완료된 후 실행할 콜백을 정의하기 위해서는 다음 메서드들을 사용한다. 콜백이 작업의 결과값을 반환 받는지 아닌지, 콜백이 실행 결과 또다른 값을 반환하는지 아닌지에 따라 구분된다.

  • thenRun(): 작업 실행 결과를 받지 않고 값을 반환하지 않는 콜백(Runnable)을 실행한다.
  • thenAccept(): 작업 실행 결과를 받고 값을 반환하지 않는 콜백(Consumer)을 실행한다.
  • thenApply(): 작업 실행 결과를 받고 값을 반환하는 콜백(Function)을 실행한다.


thenRun() 메서드는 Runnable를 전달받아 콜백을 실행한다. Runnable은 작업 결과를 반환하지 않으므로 이전 작업 수행 결과를 콜백에 전달할 필요가 없고 콜백이 값을 반환할 필요가 없을 때 thenRun() 메서드를 사용한다. thenRun() 메서드의 반환형은 CompletableFuture<Void>이다.

CompletableFuture<Void> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... })
  .thenRun(() -> { ... });


thenAccept() 메서드는 Consumer 함수형 인터페이스 구현체를 인자로 받는다. Consumer는 단일 인자를 전달 받고 결과를 반환하지 않는다. 이전 작업인 CompletableFuture를 메서드의 인자인 Consumer에게 전달할 수 있다. 따라서 콜백은 작업 결과를 인자로 받아 처리할 수 있다. 이전 작업 수행 결과를 전달 받아 처리할 수 있지만 반환값이 없으므로 값을 소비하고 끝내기 위해 사용한다. 따라서 thenAccept()의 반환형은 thenRun()와 동일하게 CompletableFuture<Void>이다.

CompletableFuture<Void> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... })
  .thenAccept((result) -> { ... });


thenApply() 메서드는 Function 함수형 인터페이스 구현체를 인자로 받는다. Function은 단일 인자를 전달 받고 결과를 반환한다. 이전 작업인 CompletableFuture와 작업 결과를 메서드의 인자인 Function 람다식에 전달할 수 있다. 따라서 콜백은 작업 결과를 인자로 받아 처리할 수 있으므로 이전 작업 수행 결과를 전달 받아 처리하며 콜백 처리 결과를 다시 반환할 수 있다. thenApply() 메서드는 콜백 작업 수행 결과를 다시 CompletableFuture 객체로 반환하므로 이후 콜백에게 CompletableFuture를 전달할 수 있다. thenApply()의 반환형은 CompletableFuture<T>이다.

CompletableFuture<String> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... })
  .thenApply((result) -> { ... });


작업 결합

작업의 결합(조합)을 위해서는 다음 메서드들을 사용한다.

  • thenCompose(): 두 개의 작업을 순차적으로 실행 (동기적)
  • thenCombine(): 두 작업을 각각 독립적으로 동시에 실행 (비동기적)
  • allOf(): 여러 작업을 동시에 실행한 후 모두 완료되었을 때 작업 실행 결과를 받는 콜백 실행
  • anyOf(): 여러 작업을 동시에 실행한 후 작업 중 하나라도 완료되었을 때 완료된 작업 실행 결과를 받는 콜백 실행


thenCompose() 메서드는 두 개의 작업을 순차적으로 실행하기 위해 두 개의 CompletableFuture를 결합하는 기능을 수행한다. Function을 인자로 받으며 첫 번째로 실행한 작업의 실행 결과가 담긴 CompletableFuture를 두 번째 작업에 전달한다.

CompletableFuture<String> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... })
  .thenCompose(s -> CompletableFuture.supplyAsync(() -> { ... });

String result = completableFuture.get();


thenCombine() 메서드는 두 개의 작업을 비동기적으로 동시에 실행하기 위해 두 개의 CompletableFuture를 결합하는 기능을 수행한다. 두 작업을 실행할 뿐만 아니라 두 작업의 실행 결과를 합해 추가적인 콜백 처리를 수행하기 위해 사용한다. CompletableFutureFunction 두 개를 인자로 받으며 첫 번째 인자는 두 번째 작업, 두 번째 인자는 콜백 처리에 해당된다. 동시에 실행한 두 작업이 완료되면 두 결과에 대한 CompletableFuture를 콜백 람다식에 전달한다.

CompletableFuture<String> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... })
  .thenCombine(
    CompletableFuture.supplyAsync(
      () -> { ... }),
      (s1, s2) -> { ... });

String result = completableFuture.get();


allOf(), anyOf() 메서드는 세 개 이상의 작업들을 동시에 실행하기 위해 CompletableFuture를 결합하는 기능을 수행한다. 두 메서드 모두 CompletableFuture 리스트를 인자로 받는다.

CompletableFuture<String> future1 = CompletableFuture
  .supplyAsync(() -> { ... });
CompletableFuture<String> future2 = CompletableFuture
  .supplyAsync(() -> { ... });
CompletableFuture<String> future3 = CompletableFuture
  .supplyAsync(() -> { ... });

CompletableFuture<Void> combinedFuture = CompletableFuture
  .allOf(future1, future2, future3);

combinedFuture.get();


allOf() 메서드는 작업들 중 하나라도 예외적으로 완료되면 메서드가 반환하는 작업도 예외적으로 완료하며, CompletionException은 발생한 예외를 원인으로 갖는다. 메서드가 반환하는 CompletableFuture는 모든 작업의 실행 결과를 갖고 있지는 않으며 작업들을 개별적으로 확인하여 결과를 얻을 수 있다. CompletableFuture가 인자로 전달되지 않으면, null 값으로 완료된 CompletableFuture를 반환한다.

anyOf() 메서드도 마찬가지로 작업들 중 하나라도 예외적으로 완료되면 메서드가 반환하는 작업도 예외적으로 완료한다. CompletableFuture가 인자로 전달되지 않으면, 완료되지 않은 CompletableFuture를 반환한다.

콜백을 결합하는 경우 allOf()는 작업들이 모두 완료되었을 때 작업 실행 결과를 받는 콜백을 실행한다. anyOf() 메서드는 여러 작업을 동시에 실행한 후 작업 중 하나라도 완료되었을 때 완료된 작업 실행 결과를 받는 콜백을 실행한다.


예외 처리

비동기적으로 동시 실행되는 작업들의 실행 도중 에러가 발생하였을 때 예외 처리가 필요하다. 기존 예외 처리 방식과 동일하게 try/catch 구문을 사용할 수도 있지만 CompletableFuture는 예외 처리를 위한 다양한 메서드들을 제공한다. 예외 처리를 수행할 작업에 대해 예외 처리 메서드를 호출하며 메서드의 인자로 예외 발생 시 처리할(수행할) 액션을 인자로 전달한다. CompletionStage 인터페이스의 경우 작업을 스테이지(stage), 예외 처리를 위한 수행 로직 또는 콜백을 액션(action)으로 표현한다.

예외처리대상작업.예외처리메서드(액션);


비동기 작업 실행 도중 발생 가능한 예외 중 java.util.concurrent 패키지에 정의된 예외는 다음과 같다.

  • CompletionException
  • ExecutionException
  • TimeoutException
  • CancellationException


CompletionException은 작업을 완료하는 과정에서 에러가 발생하거나 다른 런타임 예외가 발생할 때 우선적으로 발생하는 예외이다. 실제 발생한 예외를 루트 원인으로 래핑하여 가지고 있므로 getCause() 메서드를 통해 작업 실행 도중 발생한 예외의 종류를 확인할 수 있다. CompletionException 예외는 get() 또는 join() 메서드 실행 도중 발생 가능한 예외이다. CompletionException 대신 다른 예외를 발생시키기 위해서는 completeExceptionally() 메서드를 사용한다.

ExecutionException은 작업 실행 도중 예외가 발생하여 중단된 작업의 결과를 확인하려 할 때 발생하는 예외이다. CompletionException과 동일하게 실제 발생한 예외를 루트 원인으로 래핑하여 가지고 있므로 getCause() 메서드를 통해 작업 실행 도중 발생한 예외의 종류를 확인할 수 있다. ExecutionExceptionget() 메서드 실행 도중 발생 가능하다.

TimeoutException은 작업 실행을 기다리는 동안 특정 시간이 경과하였을 때 발생하는 예외이다. TimeoutExceptionget() 메서드를 통해 작업 실행 결과를 조회할 때 타임아웃 인자를 전달하는 경우 발생 가능하다.

CancellationException은 작업이 실행 도중 취소된 후 작업 실행 결과를 조회하는 경우(이미 취소된 작업의 결과를 얻으려고 하는 경우) 발생하는 예외이다. CancellationExceptionget() 또는 join() 메서드 실행 도중 발생 가능하다.

작업 실행 도중 예외가 발생했는지 확인하는 메서드는 isCompletedExceptionally()이다. 이 메서드는 CompletableFuture가 예외적으로 완료되었는지 확인한다.

예외 처리 메서드는 크게 작업 실행 도중 예외가 발생했을 때만 실행되는 메서드와 예외가 발생하지 않고 성공적으로 완료될 때와 예외가 발생할 때 두 경우 모두 실행되는 메서드로 구분된다.

  • exceptionally()
  • whenComplete()
  • handle()
  • completeExceptionally()


exceptionally() 메서드는 작업 실행 도중 예외가 발생되었을 때만 실행되어 예외 처리를 수행한다. 인자로 전달되는 Function을 실행함으로써 예외 처리를 수행한다. Function은 발생한 예외를 인자로 전달받으며 값을 반환한다. 예외가 발생하는 경우 예외 처리를 수행한 후 예외적으로 완료된 CompletableFuture를 반환한다. 예외가 발생하지 않은 경우 예외 처리는 수행되지 않으며 정상적으로 완료된 CompletableFuture를 반환한다.

CompletableFuture<String> completableFuture = CompletableFuture  
  .supplyAsync(() -> { ... })
  .exceptionally((e) -> {
    에러 처리;
    return 에러 발생  반환값;
  })
  // 에러 처리 후 반환된 작업을 이후에 콜백에서 처리한다.
  .thenApply(() -> { ... });


whenComplete() 메서드는 작업 실행 도중 예외가 발생하지 않고 성공적으로 완료될 때와 예외가 발생할 때 두 경우 모두 추가적인 콜백을 수행한다. 작업이 성공적으로 완료된 경우 작업 실행 완료에 대한 처리를, 실패한 경우 예외 처리 기능을 수행한다. 즉, 예외 발생 여부에 관계 없이 실행된다. whenComplete() 메서드는 BiConsumer 함수형 인터페이스 구현체를 인자로 받는다. BiConsumer는 작업 실행 결과와 예외 두 인자를 전달받으며 값을 반환하지 않고 소비한다. whenComplete() 메서드도 마찬가지로 작업 실행 도중 예외가 발생하는 경우 예외 처리를 수행한 후 예외적으로 완료된 CompletableFuture를 반환한다. 예외가 발생하지 않은 경우 예외 처리는 수행되지 않으며 정상적으로 완료된 CompletableFuture를 반환한다. whenComplete() 메서드의 인자는 결과를 반환할 수 없는 BiConsumer이므로 작업의 실행 결과값을 특정값으로 지정할 수는 없으며 단순히 예외 처리 콜백을 실행하기 위해 사용된다.

CompletableFuture<String> completableFuture = CompletableFuture  
  .supplyAsync(() -> { ... })
  .whenComplete((result, e) -> {
    if (e != null) {
      예외 처리;
    } else {
      성공적으로 완료  실행할 액션;
    }
  })
  // 예외 처리 후 반환된 작업을 이후에 콜백에서 처리한다.
  .thenApply(() -> { ... });


handle() 메서드도 whenComplete() 메서드와 마찬가지로 작업 실행 도중 예외가 발생하지 않고 성공적으로 완료될 때와 예외가 발생할 때 두 경우 모두 추가적인 액션을 수행한다. 작업이 성공적으로 완료된 경우 작업 실행 완료 결과값을, 실패한 경우 특정값을 반환하는 예외 처리 기능을 수행한다. 즉, 예외 발생 여부에 관계 없이 실행된다. 작업이 성공적으로 완료된 경우 그 결과값과, 작업이 성공적으로 완료되지 않은 경우 발생한 예외 두 가지를 인자로 받아 처리하고 결과를 반환하는 BiFunction 함수형 인터페이스 구현체를 인자로 받는다. BiFunction이 특정값을 반환하도록 하여 작업 실행이 실패로 완료되었을 때 기본값을 제공할 수 있다. handle() 메서드는 whenComplete() 메서드와 다르게 값을 반환하는 BiFunction을 액션으로 수행하므로 예외가 발생하였을 경우 예외적으로 완료된 CompletableFuture가 특정값을 결과값으로 갖도록 할 수 있다.

CompletableFuture<String> completableFuture = CompletableFuture  
  .supplyAsync(() -> { ... })
  .handle((result, e) -> {
    if (e != null) {
      예외 처리;
      return 예외 발생  반환값;
    } else {
      성공적으로 완료  실행할 액션;
      return 성공적으로 완료  반환값;
    }
  })
  // 예외 처리 후 반환된 작업을 이후에 콜백에서 처리한다.
  .thenApply(() -> { ... });


completeExceptionally() 메서드는 작업 실행 도중 예외가 발생한 경우 예외를 처리하지 않고 동일한 예외나 또다른 예외를 의도적으로 발생시킨다. 작업이 예외적으로 완료될 때 특정 예외를 발생시키려면 이 메서드를 사용한다. 이 메서드는 Throwable을 인자로 받으며 메서드 호출로 인해 CompletableFuture가 예외적으로 완료된 상태가 되는 경우 true를 반환하고 그렇지 않으면 false를 반환한다. completeExceptionally() 메서드로 작업 완료 시 특정 예외를 발생하도록 만든 CompletableFuture는 대상 작업이 아직 완료되지 않은 경우 get() 메서드 및 관련 메서드 호출 시 해당 예외가 발생한다.

handle() 메서드는 작업 실행 도중 발생한 예외를 잡아 처리한 후 작업을 예외적으로 완료하므로 예외적으로 완료된 CompletableFuture를 반환하는 반면, completeExceptionally()는 작업을 완료 처리하지 않고 예외를 발생시키며 CompletableFuture 대신 boolean 값을 반환한다. 따라서 발생한 예외를 추가적으로 처리하는 것이 필요하다.

CompletableFuture<String> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... });
completableFuture.completeExceptionally(발생시킬예외);
// 작업 실행 결과를 기다리는 도중 예외 발생 시 다른 예외를 발생시킨다. 해당 예외에 대한 별도의 처리가 필요하다.
try {
  String result = completableFuture.get();
} catch(발생한예외) {
  예외 처리;
}


completeExceptionally() 메서드를 사용하여 exceptionally(), whenComplete(), handle() 예외 처리 메서드가 CompletionException이 아닌 특정 예외를 인자로 받도록 할 수 있다.

CompletableFuture<String> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... })
  .whenComplete((result, e) -> { ... });
completableFuture.completeExceptionally(발생시킬예외);


작업 완료 및 취소 처리


Async 메서드

CompletableFuture의 메서드 중 Async 접미사가 추가된 메서드들은 다음 작업을 호츨자 스레드가 아닌 다른 스레드에서 실행한다. Async 접미사가 없는 메서드는 호출자 스레드를 사용하여 다음 작업을 실행한다.

Async 메서드들은 Executor를 인자로 받는 메서드와 그렇지 않은 메서드로 구분된다. Executor를 인자로 받는 메서드는 해당 Executor를 사용하여 작업을 실행하며, Executor를 구현한 공통 포크/조인 프레임워크의 ForkJoinPool.commonPool() 메서드에 의해 제공되는 공통 스레드 풀을 사용하여 작업을 실행한다.


스프링의 비동기 처리

스프링 코어는 멀티 스레드 기반 동시성 구현을 위해 TaskExecutor 인터페이스를 제공한다. TaskExecutorRunnable 작업의 실행을 추상화하는 간단한 작업 실행 인터페이스이다. 자바의 Executor, CommonJ의 WorkManager 등 다양한 TaskExecutor 구현체가 존재하며 필요 시 커스텀 구현체를 만들 수 있다. 스프링은 이들 구현체를 모두 자바 Executor 인터페이스로 단일화하였다.

TaskExecutor 구현 시 작업의 동기 및 비동기 실행, 스레드풀 사용 등과 같은 모든 종류의 다양한 작업 실행 전략을 사용할 수 있다. TaskExecutor는 자바의 Executor 인터페이스와 동일하므로 비동기 작업 실행 클라이언트는 Executor에 대한 의존성을 선언하고 TaskExecutor 구현을 사용할 수 있다.

스프링은 TaskExecutor 인터페이스를 바로 사용할 수 있도록 용도별 TaskExecutor 구현체 몇 가지를 제공한다. 일반적으로 많이 사용되는 TaskExecutor 구현체는 다음과 같다.

  • SimpleAsyncTaskExecutor: TaskExecutor 빈이 정의되지 않는 경우 사용되는 기본적인 TaskExecutor 구현체이다. 비동기 작업을 호출할 때마다 새로운 스레드를 생성하며 기존 스레드를 재사용하지 않는다.
  • ThreadPoolTaskExecutor: 빈 프로퍼티를 통해 ThreadPoolExecutor를 구성하고 스프링 TaskExecutor로 노출시키는 기능을 제공하는 TaskExecutor 구현체이다.
  • SimpleThreadPoolTaskExecutor: 쿼츠(Quartz)의 SimpleThreadPool의 서브 클래스이다. 쿼츠와 쿼츠가 아닌 컴포넌트 사이에 스레드 풀을 공유해야 할 때 사용한다.
  • SyncTaskExecutor: 작업을 비동기로 실행하지 않는다. 작업을 호출한 스레드(호출자 스레드) 내에서 작업을 동기적으로 실행한다.


작업의 비동기 실행 시 TaskExecutorexecute() 메서드의 인자로 비동기로 실행할 작업을 Runnable 객체로 전달하면 된다. execute() 메서드는 주어진 작업을 실행하며 작업을 비동기적으로 실행하는 경우 메서드 호출 결과(작업 실행 결과)가 즉시 반환될 수 있다. 반면 작업을 동기적으로 실행하는 경우 작업 요청 스레드(호출 스레드)는 작업의 결과를 받을 때까지 블로킹될 수 있다.

TaskExecutor로 작업을 비동기로 실행하고 작업의 실행 결과를 CompletableFuture 타입으로 반환할 수 있다. 작업 실행 코드를 CompletableFuturesupplyAsync() 메서드의 인자로 전달하여 호출하고 CompletableFuture 객체를 반환 받는다. 또는 반환형이 없는 runAsync() 메서드를 호출할 수도 있다. supplyAsync() 메서드는 SupplierExecutor 두 타입의 객체를 인자로 받으므로 비동기 처리 시 TaskExecutor를 재사용할 수 있다. 인자가 Supplier 하나뿐인 supplyAsync() 메서드는 JVM에서 사용 가능한 기본 포크/조인 풀을 써서 실행된다. CompletableFuture를 반환받으면 여러 CompletableFuture 인스턴스를 조합(compose)하거나 연결(chain)해서 모든 기능을 최대한 활용할 수 있다는 장점이 있다.

private final TaskExecutor taskExecutor;

...

CompletableFuture<String> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... }, taskExecutor)
  .thenApply((result) -> { ... });


스프링은 자바의 Future를 구현한 ListenableFuture 인터페이스도 제공한다. ListenableFuture 인터페이스는 Future를 확장하여 작업 실행 완료 콜백을 실행하는 콜백 기능을 추가한 것이다. 비동기 작업이 완료된 후 콜백을 실행하기 위해 ListenableFuture를 사용할 수 있다. ListenableFuture는 작업이 완료되는 시점(Future 완료 시점)에 콜백을 실행한다. 작업 실행 코드를 AsyncListenableTaskExecutor에 전송하면 ListenableFuture가 반환된다. Callable 타입의 작업 실행 코드를 TaskExecutorsubmitListenable() 메서드 호출 시 인자로 전달하면 반환되는 ListenableFuture를 작업 실행 결과로 사용하면 된다. 스프링 6 버전 부터 ListenableFuture는 더 이상 사용되지 않으므로 대신 CompletableFuture를 사용하는 것이 좋다.

스프링은 또한 비동기/동시성 구현을 위한 다음 어노테이션을 제공한다.

  • @EnableAsync
  • @Async


@EnableAsync 어노테이션은 스프링의 비동기 메서드 실행 기능을 활성화한다. 구성 클래스에 이 어노테이션을 사용하여 전체 스프링 애플리케이션 컨텍스트에 대해 비동기 처리를 활성화할 수 있다. 이 어노테이션을 구성 클래스에 설정해야 @Async 어노테이션을 사용하여 비동기로 메서드를 실행할 수 있다.

기본적으로 스프링은 관련 스레드 풀 정의를 확인한다. 컨텍스트에 TaskExecutor 빈 또는 taskExecutor라는 이름의 Executor 빈을 스캔하며 둘 다 없을 경우 SimpleAsyncTaskExecutor를 사용하여 비동기 메서드 호출을 처리한다.

다음은 비동기 메서드 호출 기능 활성화 및 스레드 풀 설정 코드이다.

@Configuration
@EnableAsync
public class AppConfig {
  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(7);
    executor.setMaxPoolSize(42);
    executor.setQueueCapacity(11);
    executor.setThreadNamePrefix("MyExecutor-");
    executor.initialize();
    return executor;
  }
}


스프링 3.0 버전 이후부터 애너테이션을 사용해 비동기로 작업을 실행하는 기능이 지원된다. 비동기로 실행하려는 메서드에 @Async 어노테이션을 설정하면 된다. 비동기의 메서드의 반환형은 void 또는 Future 인터페이스 구현체이어야 하며 비동기 메서드 호출자는 비동기로 실행한 작업의 정보를 확인하고 작업 실행이 성공적으로 완료되었을 때 실행 결과를 받아 처리할 수 있다.


스프링에서 CompletableFuture를 사용한 비동기/동시성 구현


서블릿 컨테이너의 동기 및 비동기 처리, IO와 NIO 커넥터

요청 당 스레드 모델을 사용하는 서블릿 컨테이너 중 하나인 아파치 톰캣(apache tomcat)의 경우 클라이언트의 요청 수만큼 작업 스레드(worker thread)가 생성되는 요청 당 스레드 모델(thread per request model)을 사용하여 요청을 처리한다. 이때 요청에 대한 작업이 블로킹 작업일 경우 일련의 요청을 동기적으로 처리할 것인지, 비동기적으로 처리할 것인지와 요청에 대한 응답을 처리하는 내부적인 방식에 따라 작업 스레드의 블로킹 및 논블로킹 여부가 달라진다.

요청을 동기적으로 처리하는 경우 스레드는 요청에 대한 작업이 완료될 때까지 블로킹되고 작업이 완료되면 클라이언트에 응답을 돌려준다. 하나의 요청에 대한 작업이 완료되어야 다음 요청에 대한 작업이 수행될 수 있으므로 요청을 동기적으로 처리하는 것이다. 요청에 대한 응답 작업이 완료될 때까지 해당 스레드는 블로킹되어 다른 작업을 수행하지 못한 채 기다려야 한다. 스레드풀을 사용하여 필요에 따라 적절한 수준으로 스레드의 수를 관리할 수 있지만 스레드는 한정적인 자원이다. 요청에 대한 작업 수행 시간이 오래 걸리는 경우 상대적으로 빠른 시간 내에 스레드 스레드풀의 모든 스레드가 점유되고 더 이상 요청을 처리하지 못하게 되므로 서버의 요청 처리 능력이 한계에 직면한다. 서블릿 3.0 이전 버전은 이러한 방식으로 동작한다. 보다 확장성 있는 요청 처리를 위해서는 스레드가 블로킹되지 않도록 해야한다.

요청을 비동기적으로 처리하면 스레드는 작업을 기다리는 일을 다른 스레드에게 넘기며, 스레드는 작업이 끝날 때까지 블로킹되지 않고 다른 요청을 받는 등의 다른 작업을 처리할 수 있으므로 적은 수의 스레드로 더 많은 작업을 처리할 수 있다. 요청을 비동기 처리한다는 것은 요청을 읽고 응답을 쓰는 서블릿 컨테이너의 스레드와 작업을 수행하는 스레드를 분리하는 것이다. 작업을 요청하고 결과를 반환받는 요청 스레드와 작업을 처리하고 결과를 반환하는 작업 스레드(요청 처리 스레드)를 서로 분리하여 동작하도록 하여 서블릿 컨테이너의 요청 처리 작업을 보다 효율적으로 수행한다.

요청을 비동기적으로 처리하기 위해 스레드를 분리하더라도 서블릿 컨테이너의 IO 커넥터(connector)의 IO처리 구현이 블로킹인지 논블로킹인지에 따라 서블릿 컨테이너의 스레드의 블로킹 여부가 달라진다. 커넥터란 클라이언트와 서버 간의 데이터 전송을 처리하는 컴포넌트이다. 커넥터에는 여러 종류가 있으며 서로 다른 기능과 성능을 제공하므로 상황에 맞게 적절한 선택이 필요하다. 커넥터는 종류 별로 연결(connection) 당 스레드 할당 방법이 다르므로 요청량에 따른 리소스 부하를 고려하여 커넥터를 선택해야 한다. 최신 버전의 톰캣은 블로킹 IO 커넥터를 제공하지는 않지만 톰캣 8.5 미만의 버전에서는 블로킹 IO 커넥터가 존재했었다.

블로킹 IO 커넥터에서는 클라이언트의 서버로의 연결 당 스레드가 할당되며 연결이 열리고 요청에 대한 응답이 완료되어 연결이 닫힐 때까지 해당 연결에 대한 스레드는 블로킹되며 다른 작업을 수행할 수는 없다. 요청 헤더와 요청 바디 읽기, 응답 쓰기, 다음 요청에 대한 대기, SSL 핸드세이크 작업은 모두 스레드를 블로킹한다. 커넥션 지속(keep-alive) 기능에 의해 클라이언트와 서버 간 연결이 지속되고, 클라이언트의 요청이 연속적일 경우 한 요청에 대한 응답이 전송된 후 그 다음 요청 간 시간이 소요된다면 해당 시간 만큼 연결에 할당된 스레드는 블로킹되어 다른 작업을 수행하지 못한다. 또한 커넥션 지속 요청에 대해 타임아웃이 발생하기 전까지 후속 요청이 없다면 타임아웃 설정 시간 만큼 스레드는 아무런 작업을 하지 못하고 블로킹되므로 리소스 소모가 매우 비효율적이다. 블로킹 IO 커텍터의 경우 서버의 최대 동시 처리 가능 요청 수는 서버의 최대 가용 스레드풀 크기와 동일하며 요청의 연결 지속 기능 여부와도 관련이 있다. 이러한 이유로 블로킹 IO 커넥터를 사용하는 경우 애플리케이션의 확장성이 제한될 수 있다.

반면 논블로킹 IO(NIO) 커넥터의 경우 하나의 스레드가 여러 커넥션을 처리할 수 있다. 하나의 스레드는 클라이언트의 서버로의 여러 연결에 대해, 하나의 연결이 열리고 요청에 대한 응답이 완료되어 해당 연결이 닫힐 때까지 스레드는 블로킹되지 않으며 다른 작업을 수행할 수는 있다. 요청 헤더와 요청 바디 읽기, 응답 쓰기, 다음 요청에 대한 대기, SSL 핸드세이크 작업은 모두 스레드를 블로킹한다. 커넥션 지속 기능에 의해 클라이언트와 서버 간 커넥션이 지속되는 동안 하나의 요청에 대한 작업이 완료될 때까지 스레드는 블로킹되지 않으며 해당 작업이 수행되는 동안 스레드는 다른 연결에 대한 요청을 처리할 수 있다. 논블로킹 IO는 내부적으로 별도의 추가적인 스레드(풀러(puller) 스레드)를 위한 스레드풀과 이벤트 큐(event queue) 방식을 사용하여 하나의 스레드가 여러 커넥션에 대해 요청에 대한 응답을 처리하므로 보다 효율적이다.

서블릿 3.0에서부터 비동기 요청 처리가 가능하지만, 기존 IO(블로킹 IO)만 허용했다. 서블릿 3.1 버전(JSR 340)부터 NIO(논블로킹 IO)를 지원한다. 논블로킹 IO를 통해 하나의 요청 당 하나의 스레드가 작업을 수행하는 대신 적은 수의 스레드로 많은 요청을 동시에 처리할 수 있게 되었다. 따라서 해당 버전의 서블릿 스펙을 구현한 서블릿 컨테이너를 사용하여 요청의 비동기 처리가 가능하다.

NIO 기반이고 스프링 Executor 스레드풀과 같이 별도의 작업 스레드를 위한 스레드풀이 존재한다고 해도 애플리케이션 내에 DB나 API 호출과 같은 블로킹 IO 작업이 존재한다면 해당 작업 스레드는 블로킹될 수 있다(NIO이므로 요청 스레드는 블로킹되지 않는다)는 한계가 존재한다. 따라서 완전한 비동기 요청 및 스레드 논블로킹 구현을 통해 블로킹되는 스레드 없이 스레드라는 리소스를 최대한 활용하기 위해 리액티브 프로그래밍(reactive programming)이 등장하게 되었다. 리액티브 프로그래밍 구현을 위해 리액티브 스트림 명세가 존재하며 스프링은 프로젝트 리액터(reactor)라는 리액티브 스트림 구현체를 기반으로 한 스프링 웹플럭스 프로젝트를 제공하고 있다.

서블릿 3.1에 도입된 비동기 방식은 리액티브 이벤트 루프(event loop)와 역압(backpressure) 개념을 지원하지 않으므로 완전한 리액티브 웹 애플리케이션을 구성하는 데에는 한계가 있다. 요청 당 스레드 모델을 통한 요청 처리 방식을 성능면에서 개선하기 위한 이벤트 루프 모델(event loop model)은 비동기 요청과 스레드 논블로킹을 기반으로 한다. 이 모델에서는 요청에 대한 작업을 이벤트 큐라는 대기열에 쌓고 스레드는 이벤트들을 처리하기 위한 작업들을 비동기적으로 요청한다. 작업 요청 시 콜백 함수를 함께 전달함으로써 작업이 완료되었을 때 별도의 추가 작업을 수행하여 응답 결과를 호출자 스레드(요청 스레드)에 전달할 수 있다. 이벤트 루프 모델에 반응형 프로그래밍의 역압 특성을 도입하면 요청과 응답을 조율하여 시스템의 리소스 사용 효율을 더 높일 수도 있다. 네티(netty)는 톰캣과 달리 이벤트 루프 모델을 사용한다.


스프링 MVC의 웹 요청 비동기 처리

스프링 MVC 컨트롤러의 핸들러 메서드는 여러 가지 반환형을 지원하며 비동기로 처리되는 반환형은 다음과 같다.

  • DeferredResult
  • CompletableStage
  • CompletableFuture
  • Callable


컨트롤러에 정의된 핸들러 메서드의 반환형을 비동기 처리 타입으로 정의하면 컨트롤러는 클라이언트의 요청을 비동기 처리한다.

스프링 MVC는 작업의 비동기 처리를 위해 자바의 Callable를 대체하는 DeferredResult 클래스를 제공한다. DeferredResult는 서블릿 컨테이너가 웹 요청을 비동기적으로 처리하기 위한 것으로 이를 핸들러 메서드의 반환형으로 사용함으로써 요청을 비동기적으로 처리하는 컨트롤러를 만들 수 있다. Callable의 경우 스레드를 직접 관리할 필요 없이 애플리케이션을 대신하여 동시적으로 실행되지만, DeferredResult의 경우 애플리케이션이 선택한 스레드에서 결과를 생성할 수 있다는 차이점이 있다.

TaskExecutorexecute() 메서드에 작업 실행 코드에 해당하는 Runnable 객체를 전달하고 실제 작업 실행 결과값은 작업 실행 코드 내에서 DeferredResultsetResult()를 통해 설정한다. 작업 실행 결과를 DeferredResult로 반환할 경우 스레드를 직접(또는 TaskExecutor에 위임해서) 만들어야 하지만 Callable로 반환할 경우엔 그럴 필요가 없다. 작업 실행 시 예외가 발생하면 DeferredResult.setErrorResult() 메서드의 인자로 보내 처리한다.


톰캣 서블릿 컨테이너의 스레드풀 및 최대 연결 설정 (톰캣 8)

톰캣 컨테이너는 기본적으로 HTTP 커넥터 컴포넌트로 클라이언트와의 HTTP 프로토콜 기반 통신을 수행한다. 커넥터의 특정 인스턴스는 서버의 지정된 TCP 포트 번호에 대한 연결을 수신한다. 하나 이상의 커넥터로 단일 서비스를 구성할 수 있으며, 서비스가 관련 엔진(engine)으로 요청을 전달하면 엔진은 요청에 대한 처리를 수행하고 응답을 생성한다. 응답이 완성되면 다시 커넥터에 전달한다.

HTTP 요청의 처리는 서블릿 컨테이너의 스레드풀에 의해 제한된다. 커넥터의 관련 속성 설정을 통해 서블릿 컨테이너의 스레드풀 크기를 변경할 수 있다. 톰캣의 기본 스레드풀 크기는 200이다. 스레드풀은 HTTP 요청을 받고 응답을 처리하는 데 사용된다.

동기 요청은 해당 요청 기간 동안 작업을 처리할 스레드가 필요하다. 현재 요청을 처리할 수 있는 가용 스레드 보다 많은 동시 요청이 들어오면 설정된 최대 스레드 수(maxThreads 속성값)까지 추가 스레드가 생성된다. 최대 스레드 보다 더 많은 동시 요청이 들어오면, 현재 연결 수가 서버가 수락 가능한 최대 연결 수(maxConnections 속성값)에 도달할 때까지 새로운 연결을 수락할 것이다. 연결이란 서버가 요청을 수락하고 처리하는 과정 및 상태를 말하며 처리가 완료되어 응답이 성공적으로 전달되면 연결은 종료된다. 요청에 대한 응답을 받기까지 연결이 지속되는 시간이 바로 응답 시간(response time)이다. 새로운 연결은 스레드가 연결을 처리할 수 있을 때까지, 커넥터가 생성한 서버 소켓 내부에 대기된다. 만약 연결 수가 maxConnections 값에 도달하면 추가 연결을 대기할 것이다. 연결 대기열의 크기는 acceptCount 속성을 통해 설정할 수 있다. 연결 대기열 마저 모두 가득찬다면 maxConnections 값과 acceptCount 값의 합을 넘어서는 추가적인 연결 요청이 거부되거나 타임아웃이 발생할 수 있다. 속성별 기본값은 다음과 같다.

  • acceptCount: 100
  • maxConnections
    • NIO 및 NIO2 커넥터: 10000
    • APR/네이티브(Native) 커넥터: 8192
  • maxThreads: 200


커넥터가 생성할 수 있는 최대 스레드 수(maxThreads)는 서버의 동시 처리 가능한 최대 요청 수를 결정한다. 서버로 들어오는 요청 수가 최대 스레드 수 보다 크다면 연결이 수립되었지만 응답을 주지 못하는 상태가 된다. 이때 요청 수가 증가하면 연결 수도 증가하게 되고 연결 수가 최대 연결 수(maxConnections)에 다다르면 연결은 대기열에 저장된다. 서버로 들어오는 요청 수가 최대 스레드 수 보다 크면 스레드가 기존 요청을 처리하고 새로운 요청을 처리하기까지 시간이 소요되므로 응답이 지연되기 시작하며, 요청을 처리하기 위한 스레드가 가용 상태가 될 때까지 연결 상태에 있는 요청들은 대기 상태로 존재한다. 연결 수가 maxConnections 값과 acceptCount 값의 합 보다 큰 상태가 되는 경우는 서버가 허용하는 연결 수를 초과하는 상태이므로 서버의 요청 처리 한계 상태로 볼 수 있다.


스프링의 TaskExecutor

스프링은 동시성 구현을 위한 인터페이스인 TaskExecutor를 제공한다. TaskExecutor는 자바 5의 java.util.concurrent.Executor 인터페이스를 상속한다.

스프링의 TaskExecutor 스레드풀은 비동기 작업을 처리하기 위해 사용된다. 스프링에서는 TaskExecutor를 구현한 다양한 스레드풀 구현체를 제공하며, 이를 활용하여 비동기 작업을 실행할 수 있다. TaskExecutor 스레드풀의 크기는 직접 설정할 수 있으며, 이 크기는 동시에 처리 가능한 작업의 수를 결정한다.

@Configuration
public class MyConfiguration {
  @Bean
  public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);  // 스레드풀의 기본 크기 설정
    executor.setMaxPoolSize(20);   // 스레드풀의 최대 크기 설정
    executor.setQueueCapacity(30); // 대기열 크기 설정
    executor.initialize();
    return executor;
  }
}

setCorePoolSize() 매서드를 통해 설정하는 기본 스레드풀의 크기는 요청을 동시에 처리하는 스레드 개수를 의미한다. 기본 스레드풀 크기 만큼 생성된 스레드가 모두 요청을 처리 중이고 추가적인 요청을 처리해야할 때 추가적인 스레드가 생성되며 생성 가능한 스레드 수의 최대 개수를 setMaxPoolSize() 메서드를 통해 설정한다. 최대 스레드풀 크기 만큼 생성된 스레드 마저 모두 요청을 처리 중이고 추가적인 요청을 처리해야 한다면 요청을 대기열에 대기시킬 수 있으며 setQueueCapacity() 메서드를 통해 대기열의 크기를 설정한다.

스레드풀의 크기의 결정은 서버의 성능(리소스 현황), 예상되는 동시 요청의 수, 작업의 특성 등을 고려하여 수행되어야 한다. 보통 서블릿 컨테이너의 스레드풀 크기는 대부분의 요청을 처리할 수 있는 수준으로 설정되며, TaskExecutor 스레드풀의 크기는 하나의 요청에 대해 비동기적으로 처리할 작업의 처리 시간과 가용 리소스 범위에 따라 결정한다.

요청 당 스레드가 할당되는 모델(블로킹 IO)에서 초당 100개의 요청이 유입되는 상황을 가정해보자. 서블릿 컨테이너의 스레드풀 크기가 100이라면 서버는 동시에 100개의 요청을 받을 수 있다. 요청 당 스레드가 할당되는 구조에서는 해당 요청에 대한 작업은 전적으로 할당된 스레드 상에서 수행된다. 요청 작업을 완료하기까지 0.5초가 소요된다고 할 때 서버는 0.5초 동안 100개의 요청을, 1초 동안 200개의 요청을 처리하여 응답할 수 있다. 즉, 서버의 처리 능력(TPS)은 200개/s이다. 스레드풀의 크기를 늘리면 동시에 처리할 수 있는 요청의 개수가 증가하게 된다. 이때 스레드풀의 크기를 넘어서는 요청을 차례대로 대기시킨 후 스레드풀이 사용 가능한 상태가 되었을 때 유입된 순서대로 스레드를 할당하여 처리하기 위해 대기열(큐)이라는 것이 필요하다. 대기열 관리를 위해서는 별도의 메모리 자원이 필요하다.

서블릿 컨테이너의 동기적 요청 처리 시 하나의 요청에 대한 작업이 모두 완료될 때까지 요청 스레드가 블로킹된다. 또한 블로킹 IO이므로 스레드가 요청 데이터를 읽고 응답 데이터를 쓰는 작업은 스레드를 블로킹시킨다. 따라서 스레드풀의 크기가 최대 동시 요청 처리 수를 결정하게 된다.

현재 스레드풀 크기에 대한 요청 처리 능력을 보다 향상시키기 위해 동기적 요청 처리 대신 비동기적 요청 처리를 고려해볼 수 있으며 이를 위한 방법이 애플리케이션의 TaskExecutor 스레드풀이다. 이 스레드풀은 서블릿 컨테이너의 스레드풀과 분리되며 이를 사용하여 작업을 요청하고 결과를 반환받는 요청 스레드와, 작업을 처리하고 결과를 반환하는 작업 스레드를 서로 분리하여 동작하도록 할 수 있다. 스레드풀을 추가로 생성하고 분리하여 서블릿 컨테이너의 부하를 줄이는 것이다. 요청을 동기적으로 처리하면 요청 스레드가 블로킹된다. 요청 스레드를 블로킹하지 않고 작업을 별도의 스레드가 백그라운드에서 처리하기 후 서블릿 컨테이너의 스레드에게 요청 처리 결과를 응답으로 반환하는 것이 더 효율적이다.

요청을 비동기 처리하게 되면 HTTP 요청을 받아 작업을 수행하는 작업 스레드에게 작업을 요청하는 스레드(요청 스레드)와 작업 수행 완료 결과를 받고 응답으로 반환하는 스레드(작업 스레드)가 달라지게 된다. 따라서 서로 다른 스레드간 데이터 교환이 필요하므로 리소스 소모가 있다는 점은 고려해야 한다.

동일하게 초당 100개의 요청이 유입되는 상황에서 서블릿 컨테이너의 최대 스레드풀 크기가 100이고 스프링 TaskExecutor의 최대 스레드풀의 크기가 100이라고 가정해보자. 서버는 동시에 100개의 요청을 모두 받을 수 있으며 애플리케이션은 동시에 100개의 요청을 처리할 수 있다. 요청 작업이 0.5초가 소요된다고 할 때 스프링 애플리케이션은 0.5초 동안 100개의 요청을, 1초 동안 200개의 요청을 처리하여 응답할 수 있다. 서블릿 컨테이너의 스레드는 요청을 받은 후 해당 요청에 대한 작업을 작업 스레드에게 할당하고 작업 결과를 받기 전까지 기다리지 않더라도 다른 추가적인 요청을 받을 수 있다. 작업은 별도의 스레드에서 진행되므로 요청 스레드는 작업에 의해 블로킹되지 않는다.


서블릿 컨테이너와 네티 프레임워크

네티(netty)는 이벤트 기반의 비동기 네트워크 애플리케이션을 개발하기 위한 고성능 서버 프레임워크로서 다양한 프로토콜을 처리할 수 있다.

서블릿 컨테이너는 요청이 들어오면 스레드를 할당하여 해당 요청을 처리하고, 요청에 대한 응답을 반환하는 방식으로 동작한다. 각 요청마다 스레드를 생성하고 관리하는 이러한 방식에서는 스레드 간의 동기화 및 블로킹으로 인한 성능 저하 문제가 발생할 수 있다.

서블릿 컨테이너에서도 요청의 비동기 처리가 가능하지만 요청 당 스레드 모델을 통한 요청 처리 방식을 성능면에서 개선하기 위한 이벤트 루프 모델(event loop model)은 비동기 요청과 스레드 논블로킹을 기반으로 한다. 이 모델에서는 요청에 대한 작업을 이벤트 큐(event queue)라는 대기열에 쌓고 요청 스레드는 이벤트들을 처리하기 위한 작업들을 작업 스레드에게 비동기적으로 요청한다. 작업 요청 시 콜백 함수를 함께 전달함으로써 작업이 완료되었을 때 별도의 추가 작업을 수행하여 응답 결과를 요청 스레드에 전달할 수 있다. 이벤트 루프 모델에 리액티브 프로그래밍의 역압 특성을 도입하면 요청과 응답을 조율하여 시스템의 리소스 사용 효율을 높일 수도 있다. 네티는 톰캣과 달리 이벤트 루프 모델을 사용한다.

스프링 웹플럭스에 내장된 리액터 네티(reactor netty)는 스프링 MVC의 서블릿 컨테이너에 비해 비교적 적은 수의 스레드를 사용하여 요청 처리 작업을 수행한다.

톰캣 서블릿 컨테이너와 달리 리액터 네티의 경우 서버의 연결 대기열 크기, 스레드풀 최대 크기, 수락 가능한 최대 연결 수 등의 속성 설정을 할 수는 없다. 연결 수는 클라이언트에 의해 결정되며 서버 측에서 이를 제한할 수 없다. 연결 수 제한이 필요 없는 이유는 이벤트 루프 모델이라는 다른 모델을 사용하기 때문이다. 비동기 및 스레드 논블로킹을 기반으로 하는 이벤트 루프 모델에서는 요청 수만큼 새로운 스레드를 생성할 필요가 없으며

대신 네티에서는 TCP 또는 HTTP 서버의 채널 옵션 설정, 워커 스레드 수, SSL 및 TLS 설정 등이 가능하다.


구조적 동시성

구조적 동시성(structured concurrency)이란 동시성 프로그래밍에 구조적 접근 방식을 사용하여 멀티스레드 구현 코드의 명확성, 유지보수성 및 품질과 프로그램 개발 시간을 향상시키는 것을 목표로 하는 프로그래밍 패러다임이다.

구조적 동시성은 자바의 경우 19 버전부터 도입된 가상 스레드를 사용하여 적용할 수 있으며, 코틀린의 경우 코루틴(coroutine)을 사용하여 적용할 수 있다.

구조적 동시성을 코드에 적용하기 전 코드의 형태는 비구조적 동시성(unconstructed concurrency)이라고 한다. CompletableFuture를 사용한 비구조적 동시성 코드의 형태는 다음과 같다.

ExecutorService executorService = Executors.newFixedThreadPool(10);

// 비동기 작업 정의
Callable callableTask1 = () -> { ... }
Callable callableTask2 = () -> { ... }

// 작업 목록 정의
ArrayList<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask1);
callableTasks.add(callableTask2);

// 다중 작업 실행(비동기 작업 병렬 실행) 및 작업 결과 조회
CompletableFuture<String> completableFuture = CompletableFuture
  .supplyAsync(() -> { ... })
  .thenApply((result) -> { ... });


비구조적 동시성의 문제점은 비동기적으로 수행되는 작업의 취소 및 종료가 의도대로 이루어지지 않거나 지연되어 스레드의 리소스 낭비가 발생한다는 점이다. Futureget() 메서드는 작업이 수행 완료된 후 결과를 반환할 때까지 기다리는 블로킹 메서드이다. 작업이 완료될 때까지 스레드는 블로킹된다.

만약 두 개의 비동기 작업이 존재하고 두 작업 중 하나라도 실패하면 전체 작업도 실패하는 것이 의도된 상황을 가정해보자. 첫 번째 작업 결과와 두 번째 작업 결과를 합쳐서 최종 결과를 만들지만 두 결과 중 하나라도 누락되면 최종 결과는 실패한 것으로 간주하는 상황이 있을 수 있다. 첫 번째 비동기 작업이 실행 도중 실패하였고 두 번째 비동기 작업이 실행 중일 때 두 번째 작업이 완료될 때까지 스레드는 블로킹된다. 두 번째 작업은 첫 번째 작업이 실행 도중 실패하였는지 알지 못하며 계속 진행된다. 동시에 실행 중인 비동기적 작업들 중 한 작업이 실행 도중 실패하면 실행 중이던 나머지 작업은 더 이상 실행될 필요가 없으므로 도중에 취소되어 종료되는 것이 이상적이지만 비구조적 동시성에서는 해당 방식으로 동작하지 않는다. 이러한 실행 구조로 인해 비동기적 작업을 수행하는 스레드는 불필요한 작업에 할당되어 사용되므로 리소스 측면에서 낭비라고 볼 수 있다. 구조적 동시성은 이러한 문제를 해결하는 프로그램 패러다임이다.

자바 8부터 도입된 비동기 프로그래밍을 위한 CompletableFuture 인터페이스는 구조적 동시성을 제공하지는 않지만 작업을 직접 완료할 수 있으므로, 다음과 같이 비구조적 동시성의 문제점을 해결할 수 있다.


가상 스레드

자바 19에서 프리뷰 기능으로 도입된 프로젝트 룸(Project Loom)의 가상 스레드(virtual thread)링크는 경량 스레드이다. 자바 20을 거쳐 자바 21에서 최종 릴리스되었다. 가상 스레드는 처리량이 많은 동시성 애플리케이션의 개발, 관리 및 관찰(observe)을 훨씬 용이하게 만든다.

요청 당 스레드를 할당하여 처리하는 서버 애플리케이션의 하드웨어 리소스 활용을 최적화하는 것이 주 목적이다. 가상 스레드는 구조적 동시성을 제공하는 기능을 포함하고 있다. 이를 통해 CompletableFuture와 같은 비동기 작업을 보다 구조적으로 다룰 수 있다.


참고

Comments