ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • ExecutorService 그리고 maxThreadPool
    Java & Kotlin 2023. 11. 15. 03:24
    반응형

    1) 서론

    이번글에서는 병렬처리 및 별도의 쓰레드풀에서 관리하기 위해서 ExecutorService 사용 중 만난 문제에 대해서 정리하려고 합니다.


    2) ExeucutorService가 뭔가요?

    자바의 ExecutorServcie 인터페이스는 비동기 작업을 위해서 Future를 생성하고 다룰 수 있게 해줍니다. Future 객체의 특징은 연산에 대한 결과를 blocking 되어 기다리지 않습니다. 요청 후 응답이 왔는지 확인했을 때 결과를 응답하는데요. 이러한 과정이 비동기적으로 일어납니다.
     
    하지만 비동기 요청을 위해서는 기존에 수행되던 쓰레드가 아닌, 별도의 쓰레드에서 수행되어야 합니다. 어떻게 별도의 쓰레드를 비동기 작업을 위해 할당할 수 있을까요? 
     
    간편하게는 new Thread()를 이용해서, 직접 쓰레드를 생성하여 사용할 수도 있습니다. 하지만 이러한 방식은 다수의 요청이 발생했을 때 쓰레드를 관리하기 어렵고, 요청마다 쓰레드가 생성되어 OOM이 발생할 수도 있습니다.
     
    자바는 이러한 쓰레드 관리를 위해서 쓰레드 풀(thread pool)을 제공하고 있는데요. ExecutorService 인터페이스를 구현한 ThreadPoolExecutor를 이용해서 쓰레드 풀을 생성하고, 관리할 수 있습니다.
     

    ExecutorService.submit()

     
    정리하자면 이렇습니다.

    • ThreadPoolExecutor를 이용하여 쓰레드풀을 만든다
    • Future 객체를 return 타입으로 하는 함수를 사용하여 실행한다
    • 이때의 요청은 별도의 쓰레드에서 수행되며, 앞서 만든 쓰레드풀의 쓰레드를 사용한다

    3) 그런데 maximumPoolSize가 늘어나지 않는다

    아래와 같이 CompletableFuture를 이용하여, 비동기 병렬 요청을 하려고 했는데요. maximumPoolSize만큼 병렬 수행이 되는 것을 기대했습니다.

    CompletableFuture.runAsync(task, executorService)

     
    쓰레드풀을 설정할 때 핵심되는 설정 세 가지가 있습니다.

    • corePoolSize: 쓰레드풀 생성 시 기본적으로 생성되는 쓰레드 수
    • maximumPoolSize: corePoolSize 보다 더 필요할 때 최대로 증가되는 쓰레드 수
    • workQueue: 쓰레드 할당 필요한 task들이 대기하는 queue

     

    corePoolSize를 1로 설정했을 때 아래와 같은 동작을 예상했습니다.

    • 첫 번째 요청은 corePoolSize에 할당이 되어 수행된다
    • 두 번째 요청부터는 maximumPoolSize만큼 늘어나고 수행된다
    • maximumPoolSize도 가득 차면 workQueue만큼 늘어나는 것을 기대한다

     
    하지만 maximumPoolSize까지 요청이 발생하지 않았지만, 응답에 지연이 발생했습니다. 조금 이상하여 확인해봤습니다.
    (아래의 내용들은 재현을 위한 예시입니다)

    • 10개의 요청이 모두 in progress 상태를 예상합니다

     
    maxPoolSize를 설정했음에도 불구하고, 예상과는 다르게 두 번째 요청부터는 모두 queue에 담깁니다. 실제로 running 중인 쓰레드는 1개입니다.
     
    즉 1개의 요청에 10초가 소요된다면, 총 100초의 소요시간이 걸리며 선형적으로 증가하는 구조입니다. 병렬 요청을 위해서 Future와 쓰레드풀을 설정한 것이 의미 없습니다.
     
    왜 그럴까요?


    4) 큐(Queue)가 먼저다.

    사실 위의 디버깅 로그는 이미 정답을 말해주고 있는데요. maxThreadPool 설정대로 늘어나지 않고, 큐에 우선 쌓이고 실행을 대기하고 있습니다.
     

    출처: https://medium.com/@cse.soumya/executor-service-java-b3dc91853140
    ThreadPoolExecutor.execute()

     
    ExecutorService의 구조는 위와 같습니다.

    • 각 task들이 쓰레드에 할당
    • core 쓰레드풀의 쓰레드가 고갈되면 queue에 대기
    • queue의 공간이 가득 차게 되면 maxPoolSize 만큼 증가 -> 각 쓰레드 할당
    • maxPoolSize 가득 차고, queue도 가득 차면 RejectedExecutionException 발생하여 더 이상 요청받지 못 함

     
    곧바로 maxPoolSize 만큼 늘어나지 않고 queue에 대기하게 됩니다. 
     
    설정된 큐 사이즈는 10이기 때문에 두 번째 ~ 열 번째(9개) 요청은 모두 담기게 됩니다. 그리고 첫 번째 요청에 대한 처리가 종료되면, 차례차례 수행됩니다.
     
    큐에 담기지 않고 곧바로 실행하려면 어떻게 해야 할까요? 아쉽게도 LinkedBlockingQueue의 capacity는 0으로 설정할 수 없습니다. 기본값이 Integer 최댓값이고, 0 이하로는 설정할 수 없습니다.

     
     
    큐가 담지 않는다면 어떻게 동작하는지 보기 위해서 LinkedBlockingQueue 대신에 SynchronousQueue를 사용합니다.
     
    SynchronousQueue의 특징은 객체를 담는 공간이 없는데요. Task를 큐에 넣고, 받는 쓰레드는 큐에서 task를 꺼내서 사용하는 방식입니다. 그리고 더 이상 꺼내어 사용할 수 없다면 RejectedExecutionException가 발생됩니다.

    val executorService = ThreadPoolExecutor(
        1,
        10,
        1,
        TimeUnit.DAYS,
        SynchronousQueue()
    )
    
    fun main() {
        for (i in 1..10) {
            println("============== try: $i ==============")
            executorService.submit {
                try {
                    println("============== in progress: $i ==============")
                    Thread.sleep(5_000L)
                    println("============== finish: $i ==============")
                } catch (e: RejectedExecutionException) {
                    Thread.currentThread().interrupt()
                    throw e
                }
            }
            println("============== activeCount: ${executorService.activeCount} ==============")
            println("============== queue: ${executorService.queue.size} ==============\n")
        }
    }

     
    activeCount를 확인했을 때 들어오는 요청이 모두 수행되고 있습니다. 대기 큐에 쌓이지 않고, 곧바로 maxPoolSize까지 늘어나고 있는데요. 즉 Queue에 더 이상 쌓을 수 없을 때 maxPoolSize가 늘어나는 것을 확인할 수 있습니다.


    5) 결론

    위의 확인을 통해서 쓰레드풀은 queue가 가득 찼을 때 최대 수만큼 스레드가 늘어난다는 것을 확인했습니다.
     
    실제 업무 시 만난 위의 문제는 두 가지를 고민하였습니다.

    • 지연이 발생하더라도 반드시 수행이 되는 것이 중요한지
    • 다수의 요청이 발생했을 때 빠르게 실패시키고, 재시도를 할 것인지

     
    해당 업무는 지연이 발생하더라도 반드시 수행이 되는것이 중요했습니다. 그래서 queue 사이즈를 크게 생성하고, core 스레드 수를 늘려서 병렬처리 후 queue에 쌓인 요청들을 처리하는 방식으로 해결했습니다. Integer.MAX로 설정할 수도 있지만, 지나치게 queue에 task가 쌓기에 되면 OOM이 발생할 수도 있기 때문입니다.
     
    이번 문제를 바탕으로 스레드풀을 설정하는 데 있어서 상황에 맞게 설정하고, 설정에 대해서 정확히 이해하는 것이 중요하다는 것을 느꼈습니다.

    반응형

    댓글

Designed by Tistory.