본문 바로가기
개발/Spring

[WebClientRequestException] Pending acquire queue has reached its maximum size of 40 해결한 경험

by 방구쟁이 2024. 9. 16.
728x90

서론

 오늘 다룰 이슈는 pendingAcquireMaxCount에 관한 이야기입니다.
 우리 팀에서는 광고 이벤트 트래킹 통계를 연동사 API를 통해 제공 받습니다. 연동사 마다 제공해주는 단위가 각각 다르며 특정 연동사는 다수의 Report API를 호출하여 DB에 적재해야 합니다. 이를 WebFlux를 사용하여 적재하는 과정 중 해당 에러가 발생하였습니다.

에러 전문 열어보기

더보기
[2024-08-19 15:28:35,344] [ERROR] [] [boundedElastic-34] [c.m.d.r.v.a.ReportWebClientAdapter:79] : reportRequest Error occur. uri: {{URI}}, request: {{ReportRequest}}
org.springframework.web.reactive.function.client.WebClientRequestException: Pending acquire queue has reached its maximum size of 40; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 40
	at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ Request to GET {{URL}} [DefaultWebClient]
Original Stack Trace:
		at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
		at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
		at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:227)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:134)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171)
		at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
		at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
		at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
		at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
		at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
		at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:414)
		at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
		at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
		at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
		at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
		at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
		at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
		at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:194)
		at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:304)
		at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:194)
		at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:172)
		at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:477)
		at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:365)
		at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:558)
		at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:268)
		at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432)
		at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
		at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676)
		at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:135)
		at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
		at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
		at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
		at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
		at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
		at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
		at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
		at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
		at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
		at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 40
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:365)
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:558)
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:268)
	at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432)
	at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676)
	at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:135)
	at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
	at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
	at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
	at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
	at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
	at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

 

 로그 내용을 정리하자면, Spring WebFlux 애플리케이션에서 WebClient를 사용하여 외부 API를 호출할 때 발생하는 문제입니다. 여기서 Pending acquire queue has reached its maximum size of 40라는 오류는 애플리케이션이 연결 풀에서 새로운 연결을 확보하지 못하고 대기열이 최대 크기(40)에 도달했음을 알 수 있습니다. 이로 인해 더 이상 새로운 요청을 처리할 수 없게 되어 예외가 발생합니다.

 여기서 최대 크기는 pendingAcquireMaxCount로 설정하는데 Reactor Netty의 커넥션 풀에서 사용되는 설정 중 하나로 Spring WebFlux에서 비동기 HTTP 요청을 처리하기 위해 사용된다. pendingAcquireMaxCount는 커넥션 풀에서 커넥션을 획득하려고 대기 중인 요청의 최대 수를 정의합니다. 커넥션 풀이 커넥션을 제공할 수 없을 때, 요청들은 대기열에 들어가고 이 설정은 대기열의 최대 크기를 제한합니다.  에러 로그를 확인해보면 pendingAcquireMaxCount가 나온다.

 여기서 대기열이란, 커넥션을 획득하기 위해 대기 중인 요청들을 저장하는 큐입니다. 최대 대기 요청 수는 대기열에 최대 몇 개의 요청까지 저장할 수 있는지를 설정합니다. 

 

왜 중요할까요?

 pendingAcquireMaxCount는 커넥션 풀이 과도하게 많은 요청으로 인해 과부하되는 것을 방지하는 데 중요한 역할을 합니다. 이 설정을 적절히 조정함으로써 다음 역할을 수행할 수 있습니다.

  1. 자원 보호
    너무 많은 요청이 대기열에 쌓이지 않도록 하여 시스템 자원을 보호합니다.

  2. 성능 조정
    대기열이 너무 작으면, 대기 중인 요청이 많아져서 PoolAcquirePendingLimitException과 같은 예외가 발생할 수 있습니다. 반대로 너무 크면, 시스템이 불필요하게 많은 요청을 대기시킬 수 있습니다.

  3. 에러 방지
    대기열의 크기가 적절하게 설정되어 있으면, 커넥션 부족으로 인한 런타임 에러를 줄일 수 있습니다.

 에러 로그를 보면 현재 리액티브 클라이언트의 설정이 처리 중인 작업량에 비해 충분하지 않다고 알려줍니다. 이를 해결하기 위한 방안으로는 연결 풀 구성을 조정하거나 작업량을 최적화하는 것이 문제를 해결하는 방법이 있을 것이라 판단하였습니다.

 우리팀의 WebFlux 설정을 확인해보면, pendingAcquireMaxCount을 따로 설정하지 않았기 때문에 기본값인 40으로 설정되어 있고, 대기 큐의 크기는 20개였다. 서비스 사용량이 급격히 증가하면서 연동사의 API 호출을 약 80개 정도를 호출해야 했기 때문에 적절하지 못한 설정 값을 가지고 있었다. 

 

해결 방법


 우리의 현재 서비스에서 발생하는 이 문제는 여러가지 방안으로 해결 할 수 있었다.

  1. 대기 큐 크기 증가
  2. 최대 연결 수 증가
  3. 연결을 일정 배치 단위로 분리하기

 

첫번째 방법, 대기 큐 크기 증가

 해당 에러가 발생하는 근본적인 원인은 대기 큐가 최대치에 도달 했기 때문에 maxPendingAcquire 값을 늘려서 대기 큐 크기를 증가시킬 수 있습니다. 이렇게 하면 더 많은 요청을 대기 상태로 유지할 수 있습니다.

HttpClient httpClient = HttpClient.create()
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
    .responseTimeout(Duration.ofMillis(5000))
    .poolResources(ConnectionProvider.builder("custom")
        .pendingAcquireMaxCount(100) // 대기 큐 크기 증가
        .build());

WebClient webClient = WebClient.builder()
    .clientConnector(new ReactorClientHttpConnector(httpClient))
    .build();

 

두번째 방법, 최대 연결 수 증가

 현재 서비스는 연결 수가 20으로 설정되어 있기 때문에 maxConnections 값을 증가시켜 동시에 처리할 수 있는 연결 수를 늘릴 수 있습니다. 이렇게 하면 연결 풀이 더 많은 동시 요청을 처리할 수 있습니다.

HttpClient httpClient = HttpClient.create()
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
    .responseTimeout(Duration.ofMillis(5000))
    .poolResources(ConnectionProvider.builder("custom")
        .maxConnections(50) // 최대 연결 수 설정
        .build());

WebClient webClient = WebClient.builder()
    .clientConnector(new ReactorClientHttpConnector(httpClient))
    .build();

 


세번째 방법, 연결을 일정 배치 단위로 분리하기

 Flux.merge를 사용하여 여러 Mono를 병렬로 처리하고 있지만, 요청을 배치로 처리하려면 요청을 일정 크기의 버퍼로 나누어야 합니다. buffer 연산자를 사용하여 요청을 배치로 나누고, 각 배치에 대해 병렬 처리를 수행하는 방식으로 코드를 수정할 수 있습니다.

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class ApiBatchProcessor {

    private static final int DEFAULT_BATCH_SIZE = 10;
    private static final int DEFAULT_PARALLELISM = 5;

    public List processRequests(List reportRequests, String uri, TypeReference typeReference) {
        List<mono> requestMonos = Optional.ofNullable(reportRequests)</mono
                .orElseGet(Collections::emptyList)
                .stream()
                .map(o1 -> requestReport(uri, o1, typeReference))
                .collect(Collectors.toList());

        return Flux.fromIterable(requestMonos)
                .buffer(DEFAULT_BATCH_SIZE)
                .concatMap(batch -> Flux.merge(batch) // 배치 단위로 요청 처리
                        .parallel(DEFAULT_PARALLELISM) // 병렬 처리
                        .runOn(Schedulers.parallel())
                        .sequential()
                        .collectList())
                .flatMap(Flux::fromIterable)
                .collectList()
                .doOnError(error -> log.error("getRequestReport Error. uri:{}, reportRequests:{} ", uri, reportRequests, error))
                .block();
    }

    private Mono requestReport(String uri, ReportRequest request, TypeReference typeReference) {
        return WebClient.create()
                .post()
                .uri(uri)
                .bodyValue(request)
                .retrieve()
                .bodyToMono(typeReference.getType());
    }
}


  Flux.fromIterable(requestMonos).buffer(DEFAULT_BATCH_SIZE)를 사용하여 요청을 배치 단위로 나누고 concatMap(batch -> Flux.merge(batch))를 사용하여 각 배치의 요청을 병렬로 처리합니다. collectList()를 사용하여 모든 결과를 리스트로 수집하고, 에러가 발생한 경우 적절하게 로그를 기록할 수 있습니다. 위 방법을 통해 요청을 배치로 나누고, 각 배치를 병렬로 처리하여 요청량을 효과적으로 제어할 수 있습니다. 배치 크기와 병렬 처리 수를 조정하여 애플리케이션의 성능과 자원 사용을 최적화할 수 있습니다.

 우리 서비스에서는 WebClientAdapter와 ReportWebClientAdapter로 Bean을 나누어 사용하고 있습니다. 그 중 Report 통계를 적재하는 서비스는 ReportWebClientAdapter를 사용하며, 이 Adapter는 현재 최대 연결 수를 더 사용할 수 있음에도 불구하고 20개로 낮게 설정되어 있었으며, 최대 대기열 수가 연결 수보다 높은 것은 처리량에 대비 대기열이 많다는 뜻으로 바람직한 설정이 아니라고 판단하였습니다. 따라서 근본적인 원인이 최대 대기열 수 부족임에도 최대 연결 수 증가하는 방향으로 해결하였으며 서비스 증가량에 따라 차후 일정 배치 단위로 나눌 수 있는 방향으로 잡았습니다. 

 앞으로 컴포넌트나 라이브러리 설정 값들을 서비스 크기 및 트래픽에 따라 적절히 설정하는데 더욱 기울여야 할 것 같다고 생각되었습니다.

 

읽어 주셔서 감사합니다.

728x90

댓글