ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka, @Async 비동기 처리 맛보기
    Kafka 2021. 8. 16. 16:50
    반응형

    서론

    이 글의 계기는 이렇습니다.

     

    1. 특정 게시글을 조회하고, 조회 수를 올려준다.
    2. 하지만 해당 게시글에 트래픽이 몰리고, 조회 수를 처리하는데 자원을 낭비해야 할까?
    3. 일단 글부터 보여주고, 조회 수는 천천히 반영해도 되지 않을까?

     

    그래서 게시글을 조회 후 빠르게 사용자에게 보여주고, 조회 수는 Kafka, @Async를 사용해서 다른 서버에서 비동기로 처리하는 것이 더 좋지 않을까 생각했습니다.

     

    Hello World 수준의 예제입니다.

    오류가 있다면 언제든지 댓글 남겨주세요!


    Kafka

    Kafka는 크게 producer, consumer, topic으로 이루어진 데이터 스트리밍 플랫폼입니다.

    (앞으로 producer: 생산자, consumer: 소비자로 부르겠습니다.)

     

    생산자가 메시지를 만들어내고, 소비자는 해당 메시지를 읽어오는 역할을 합니다.

     

    생산자가 만들어낸 메시지는 key-value 형태로 Kafka 서버에 저장됩니다. 이 서버는 "Broker"라고 불리는데요. 영어 단어 그대로 가운데에서 중개해주는 사람이라는 의미입니다. 이때 broker는 여러 개를 만들 수도 있고, zookeeper가 broker들을 관리합니다. 그래서 kafka 서버 실행 전 zookeeper는 반드시 먼저 실행되어야 합니다.

     

    하지만 생산자와 소비자는 무수히 많아질 수 있습니다. 소비자가 어떤 생산자의 메시지를 읽을지 어떻게 구분할까요?

     

    소비자는 topic 기준으로 메세지를 읽어옵니다. Topic은 영어단어 그대로 "주제" 입니다. 그리고 소비자는 topic을 "구독" 하게 됩니다. 이때 "구독"이라는 의미는 중요합니다. Kafka 서버에 저장된 데이터를 소비자에게 강제로 전송하는 것이 아니라, 소비자가 필요에 의해서 가지고 오기 때문입니다. 그렇기 때문에 감당할 수 있는 양만큼만 가지고 올 수 있습니다.

     

    Topic은 다시 partition으로 나누어질 수 있는데요. 한 개의 topic이 수십 개의 patition으로 나뉘어, 병렬 스트리밍이 가능하게 됩니다.

     

    출처: https://bit.ly/3CRqCeb
    출처: https://bit.ly/3yPvRZx


    @Async

    비동기 방식의 처리를 가능하게 해주는 애노테이션입니다. 자바스크립트를 사용해봤다면 익숙한 방식일 것 같습니다.

     

    기존의 동기화 방식에서는 같은 스레드에서 해당 작업을 모두 처리합니다. 이를 비동기 방식으로 바꾸려면 직접 새로운 스레드 생성 후 결과를 넘겨줘야 합니다. 하지만 @Async 애노테이션을 사용하면, 별다른 설정 없이 해당 메서드를 다른 스레드에서 실행하게 해 줍니다.

     

    사용하는 것은 간단합니다.

    @EnableAsync를 설정 클래스에 추가해줍니다. 그래서 스프링에서 @Async 애노테이션을 찾도록 합니다.


    Kafka & @Async

    Kafka, Zookeeper 설치 후 실행이 되고 있다는 것을 가정합니다.

     

    Port

    • 테스트, 게시글 조회, 메시지 생산자 - 8080
    • 조회 수 증가, 메시지 소비자 - 9091
    @EnableAsync
    @Configuration // 비동기처리 설정 입니다.
    public class AsyncConfig extends AsyncConfigurerSupport {
    
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(2);
            executor.setMaxPoolSize(10);
            executor.setQueueCapacity(500);
            executor.setThreadNamePrefix("yeoncheol-async-service-");
            executor.initialize();
            return executor;
        }
    }
    • ThreadPoolTaskExecutor - 비동기로 실행될 스레드를 실행합니다
    • setCorePoolSize() - 스레드 풀에 생성될 수 있는 스레드의 개수입니다.
    • setMaxPoolsize() - 기존의 pool 사이즈가 동적으로 늘어날 수 있는 최대치입니다
    • setQueueCapacity()
      • core pool size 보다 적은 스레드가 실행된다면, queue가 아닌 새로운 스레드를 만듭니다
      • core pool size와 같거나 큰 스레드가 필요하다면, queue에 올립니다.
      • queue가 가득 차고, max pool size에 여유 있다면 새로운 스레드를 생성합니다
    // 애노테이션들 생략
    public class AsyncTest {
    
        @Test
        public void kafkaTest() {
            // 204번 게시물을 찾아옵니다.
            Post post = postRepository.findByPostNo(204).orElseThrow(FindAnyFailException::new);
            
            // 204번 게시물 조회수 증가합니다.
            viewCount(204);
    
            System.out.println("================= 게시글 조회 결과 =================");
            System.out.println("게시글 제목 >>>>>>>> " + post.getTitle());
            
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.getMessage();
            }
        }
        
        @Async // 조회수 증가 메세지 보낼 메서드
        public void viewCount(long postNo) {
            kafkaProducer.sendMessage(postNo);
        }
    }
    • postRepository.findbyPostNo(204) - 204번 게시물을 조회합니다
    • viewCount(204) - 204번 게시물에 대한 조회수 증가 메서드 실행합니다
    • kafkaProducer.sendMessage(postNo) - kafka broker로 보낼 메시지를 만듭니다
    • Thread.sleep(10초)
      • viewCount 메서드에 글 번호만 전달하면 테스트는 끝이 납니다
      • 그래서 메시지 전송, 9091 포트에서 DB 반영까지 끝나는 것을 기다리기 위함입니다
    // Kafka 메세지 생산자입니다.
    public class KafkaProducer {
    
        private final KafkaTemplate<String, Long> kafkaTemplate;
        private static final String topicCount = "viewCount";
    
        @Async
        public void sendMessage(long postNo) {
            System.out.println("====================== 조회 수 증가 메세지 전송 시작 ======================");
    
            try {
                Thread.sleep(5000);
                System.out.printf("Producer message : %s%n", postNo);
                kafkaTemplate.send(topicCount, postNo);
            } catch (InterruptedException e) {
                e.getMessage();
            }
    
            System.out.println("====================== 메세지 전송 완료 ======================");
        }
    }
    • 실제 kafka brokder에 메시지를 보내는 메서드입니다
    • 비동기로 발생하는 것을 보기 위해 Thread.sleep(5초) 걸어줍니다
    // Kafka 메세지 소비자입니다.
    public class KafkaConsumer {
    
        private final PostRepository postRepository;
    
        @Async
        @KafkaListener(topics = "viewCount", groupId = "yeon")
        public void consume(long postNo) {
            System.out.printf("Consumed message : %s%n", postNo);
    
            System.out.println("======================== 조회 수 증가 DB 반영 ========================");
            postRepository.updateViewCount(postNo);
            System.out.println("======================== 조회 수 증가 DB 완료 ========================");
        }
    }
    • 9091 포트를 사용하는 별도의 스프링부트 프로젝트입니다
    • Kafka broker로부터 메시지 받고, DB에 반영합니다

    결과

     

    기존의 동기 방식에서는 kafka broker를 통해서 메시지 전송이 완료된 뒤, 게시글 출력 되어야 합니다. 조회수 증가 메서드가 완료된 뒤 게시글 출력됩니다. 

     

    하지만 비동기 방식에서는 viewCount() 결과를 기다리지 않고, 게시글 출력이 먼저 됩니다. 그리고 5초 뒤 kafka broker를 통해서 메시지 전송 완료됩니다. 즉 조회수 증가를 기다리지 않고, 게시글 출력됩니다.

     

     

    9091 포트의 실제 조회 수 반영되는 쿼리도 잘 발생된 것을 볼 수 있습니다.


    부족한 점

    사실 이러한 kafka를 활용한 비동기 방식은 각각의 서버를 운영할 때 효율적이라고 생각합니다.

     

    현재는 간단한 테스트를 위해 8080, 9091 포트만 다르게 사용하고 있기 때문에 서버가 뻗어버리면, 두 포트 모두 사용할 수 없게 됩니다. 그래서 트래픽을 분산시키고자 하는 최초의 의도에는 미치지 못합니다.

     

    또한 kafka도 redis처럼 클러스터를 만들어 운영할 수 있습니다. 마찬가지로 하나의 서버가 죽더라도, 다른 서버에서 replication을 만들어 사용한다면 더 안전할 것이라고 생각합니다.

     

    반응형

    댓글

Designed by Tistory.