ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • (Kafka) 객체를 JSON 타입으로 넘겨보자
    Kafka 2021. 9. 1. 17:22
    반응형

    1) 서론

    Kafka는 메시지 브로커를 이용해 서로 다른 프로젝트 사이에 메시지를 주고받을 수 있습니다. 이를 통해 비동기적인 처리를 할 수 있는데요.

    메시지를 넘겨줄 때 int, string 등 타입을 지정해서 넘기는 것이 가장 간단한 방법입니다.

     

    하지만 이러한 방식은 하나의 데이터 객체를 가지고 있더라도, 각각의 타입을 가진 직렬화 방식을 사용해야 합니다. 즉 개수도 많아지고, 어떤 목적으로 메시지를 전달하는지 명확히 파악하기 어렵다고 생각합니다.

    그래서 모든 메세지 전달을 JSON 타입으로 통일하여 전달하고, 직렬화에 필요한 객체의 정의 방법을 기록합니다. 그리고 해당 과정에서 발생한 오류를 기록합니다.

    아직 코린이 입니다. 혹시나 더 좋은 방법이나, 틀린점이 있다면 알려주세요.


    2) 목적

    • 사용자가 한 개의 게시물 요청합니다.
    • 게시물 내용 응답, 조회 수 증가 분리합니다.
    • 게시물 - 사용자가 기다리지 않게, 조회 후 바로 응답합니다.
    • 조회 수 증가 - 비동기 메세지 활용하여, 해당 게시물 번호를 넘겨 처리합니다.

    3) 설정

    3. 1) 공통

    PostViewCountDTO.class

    public class PostViewCountDTO { 
    
        @NotNull 
        private long postNo; 
        
    }
    • JSON 타입으로 넘겨줄 데이터 객체입니다.
    • postNo - 게시글 번호

     

    3. 2) Producer (springboot1)

    application.yml

    spring:
      kafka:
        producer:
          bootstrap-servers: localhost:9092

    CommonJsonSerializer.class

    public class CommonJsonSerializer {
    
        static Map<String, Object> getStringObjectMap(String bootstrapServer) {
            Map<String, Object> props = new HashMap<>();
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
            return props;
        }
    }
    • 공통으로 사용될 JSON 타입의 직렬화 객체입니다.

    PostViewProducerConfig.class

    @Configuration
    public class PostViewProducerConfig {
    
        @Value("${spring.kafka.producer.bootstrap-servers}")
        private String bootstrapServer;
    
        @Bean
        public Map<String,Object> postViewProducerConfigs() {
            return CommonJsonSerializer.getStringObjectMap(bootstrapServer);
        }
    
        @Bean
        public ProducerFactory<String, PostViewCountDTO> postViewCountDTOProducerFactory() {
            return new DefaultKafkaProducerFactory<>(postViewProducerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, PostViewCountDTO> postViewDTOKafkaTemplate() {
            return new KafkaTemplate<>(postViewCountDTOProducerFactory());
        }
    }
    • 메시지의 실질적인 전달 방법을 정의합니다.
    • 공통으로 정의한 CommonJsonSerializer.class는 여러 설정에서 활용 가능합니다.
    • ProducerFactory - produce 전략을 정의하는 싱글톤 인터페이스입니다.
    • DefaultKafkaProducerFactory - ProducerFactory에 직렬화 객체를 전달합니다.
    • KafkaTemplate - 정의된 설정을 바탕으로 실질적으로 동작하게 해 줍니다.

     

    3. 3) Consumer (springboot2)

    application.yml

    spring:
        kafka:
            consumer:
                  bootstrap-servers: localhost:9092
                  group-id: yeon
                  auto-offset-reset: earliest

    CommonJsonDeserializer.class

    public class CommonJsonDeserializer {
    
        static Map<String, Object> getStringObjectMap(String bootstrapServer) {
            Map<String, Object> props = new HashMap<>();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
            props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
            props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    
            return props;
        }
    }
    • ErrorHandlingDeserializer - 에러 관리 위한 역직렬화 객체입니다. 아래에서 추가 설명합니다.


    PostViewConsumerConfig

    @Configuration
    @RequiredArgsConstructor
    public class PostViewConsumerConfig {
    
        @Value("${spring.kafka.consumer.bootstrap-servers}")
        private String bootstrapServer;
    
        @Bean
        public Map<String,Object> postViewConsumerConfigs() {
            return CommonJsonDeserializer.getStringObjectMap(bootstrapServer);
        }
    
        @Bean
        public ConsumerFactory<String, PostViewCountDTO> postViewCountDTO_ConsumerFactory() {
            return new DefaultKafkaConsumerFactory<>(postViewConsumerConfigs());
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, PostViewCountDTO> postViewCountListener() {
            ConcurrentKafkaListenerContainerFactory<String, PostViewCountDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(postViewCountDTO_ConsumerFactory());
            return factory;
        }
    
        @Bean
        public StringJsonMessageConverter jsonConverter() {
            return new StringJsonMessageConverter();
        }
    }
    • producer와 같은 설정입니다.

    4) Test

    ... 생략
    public class KafkaTest {
    
        @Autowired
        private PostRepository postRepository;
        @Autowired
        private KafkaTemplate<String, PostViewCountDTO> postViewKafkaTemplate;
    
        static Post post = new Post();
    
    
        @Test
        public void kafkaSendTest() {
            final String topic_viewCount = "viewCount";
            
            // 480번 게시글 조회 합니다.
            PostViewCountDTO postViewCountDTO = PostViewCountDTO.builder().postNo(480).build();
            assertEquals(480, postViewCountDTO.getPostNo());
    
            // 조회한 게시글을 JSON 타입으로 전송합니다.
            postViewKafkaTemplate.send(topic_viewCount, null, postViewCountDTO);
    
            // 비동기 메세지가 전달되고, 조회 수 증가까지 20초 기다립니다.
            await().atLeast(20000, TimeUnit.MICROSECONDS).until(awaitTest());
    
            // 게시물의 변경된 조회 수 출력합니다.
            assertEquals(1, post.getViewCount());
            System.out.println(" ================================================== ");
            System.out.println("getPostNo() 게시물 번호: " + post.getPostNo());
            System.out.println("getViewCount() 조회 수: " + post.getViewCount());
            System.out.println(" ================================================== ");
        }
    
        // 조회 수 증가를 20초간 기다립니다.
        public Callable<Boolean> awaitTest() {
            post = postRepository.findByPostNo(480).orElseThrow(FindAnyFailException::new);
            return () -> post.getViewCount() == 1;
        }
    }
    • await()
      • 비동기 테스트의 경우 모든 처리가 완료되기 전, 테스트가 끝납니다.
      • 그래서 awaitility 의존성을 활용하여, 해당 처리가 끝날 때까지 기다려줍니다.
      • Thread.sleep() 사용할 수도 있지만, return으로 일치하는 결과를 기다리기 위해 awaitility 사용했습니다.

     

    4. 1) 에러 발생

    • 객체가 JSON 타입으로 전달되어, consumer의 파라미터로 들어와야 합니다.


    하지만 Consumer 측에서 아래와 같은 에러가 발생합니다.

    // consumer에서 producer 객체 classpath가 달라서 못 찾습니다.
    
    failed to resolve class name. 
    
    Class not found [com.example.springboot1.DTO.PostViewCountDTO]; 
    
    nested exception is java.lang.ClassNotFoundException: com.example.springboot1.DTO.PostViewCountDTO


    이유는 springboot1springboot2 각각의 프로젝트의 패키지명이 다르기 때문입니다.

    왜냐하면 카프카 메시지를 전송할 때 headers에 metadata를 담아서 보냅니다. 이때의 정보는 HTTP 헤더와 비슷합니다. 

    문제는 headers에 담기는 metadata에 target type을 포함합니다. 여기서 말하는 target type은 전송하고자 하는 객체의 패키지명 입니다. 

     

    프로젝트가 다르면, 패키지명이 다른 것은 당연합니다. 그렇기 때문에 producer의 target type에 일치하는 객체를 찾을 수 없고, 데이터를 바인딩해줄 수 없습니다. "com.example.springboot1", "com.example.springboot2"는 다른 패키지명을 가졌으니까요. 

    어떻게 해결할 수 있을까요?

    가장 쉬운 방법은 프로젝트 2개의 패키지명을 일치시켜주면 됩니다. 하지만 이러한 방식은 두 프로젝트 간의 강결합이 발생하기 때문에 좋아 보이지 않습니다. 추가로 다른 객체를 넘겨주고자 할 때 문제가 발생할 수 있습니다.


    5) 해결 방법

    두 가지의 해결 방법이 있습니다.

    5. 1) TYPE_MAPPINGS

    직렬화 대상이 되는 객체의 타입을 정의해줍니다.

    "foo:bar, foo2:bar2"와 같이 여러가지의 타입을 정의할 수 있습니다.

    public class CommonJsonSerializer {
    
        static Map<String, Object> getStringObjectMap(String bootstrapServer) {
            Map<String, Object> props = new HashMap<>();
    
            ... 생략
            
            // 직렬화할 대상의 타입과 이름을 지정합니다.
            props.put(org.springframework.kafka.support.serializer.JsonSerializer.TYPE_MAPPINGS, 
                        "PostViewCountDTO:com.example.springboot.DTO.kafka.PostViewCountDTO");
    
            return props;
        }
    }
    • 매핑할 타입의 이름과 대상이 되는 객체를 설정합니다.

     

    public class CommonJsonDeserializer {
    
        static Map<String, Object> getStringObjectMap(String bootstrapServer) {
            Map<String, Object> props = new HashMap<>();
    
            ... 생략
            
            // producer에서 직렬화 타입의 이름과 같고, 바인딩 할 consumer의 객체를 지정합니다.
            props.put(org.springframework.kafka.support.serializer.JsonDeserializer.TYPE_MAPPINGS,
                    "PostViewCountDTO:com.example.springboot.DTO.PostViewCountDTO");
            
            ... 생략
    
            return props;
        }
    }
    • producer 보내준 객체를 바인딩할 consumer 측 객체의 type 지정합니다.

     

    5. 2) useHeadersIfPresent: false

    ... 생략
    public class PostViewConsumerConfig {
        ... 생략
        
        @Bean
        public ConsumerFactory<String, PostViewCountDTO> postViewCountDTO_ConsumerFactory() {
            return new DefaultKafkaConsumerFactory<>(
                    postViewConsumerConfigs(),
                    new StringDeserializer(),
                    // userHeadersIfPresent false 합니다. 
                    new JsonDeserializer<>(PostViewCountDTO.class, false));
        }
    
        .. 생략
    }
    • 패키지명은 headers에 담겨 옵니다. 그래서 headers 사용 자체를 false 합니다.


    개인적으로 두 번째 방법은 그다지 좋은 것 같지는 않습니다. 왜냐하면 headers 정보가 필요한 경우 사용할 수 없기 때문입니다.

    더 좋은 방법이 있는지 모르겠지만, 현재로서는 첫 번째 방법이 더 좋다고 생각합니다.

     

    5. 3) Test 결과

    Producer (springboot1)

    ================================================== 
    getPostNo() 게시물 번호: 480 
    getViewCount() 조회 수: 1 
    ==================================================
    • 테스트 성공하여 정상적으로 출력됐습니다.
    • 조회 수 0 --> 1 증가했습니다.

    Consumer (springboot2)

    ======================================== 
    viewCount Consumed message : 480 
    ======================================== 
    Hibernate: update post set viewCount=viewCount+1 where postNo=?
    • 조회 수 증가 쿼리 발생했습니다.

    6) ErrorHandlingDeserializer

    만약 producer의 직렬화 객체(String), consumer의 역직렬화 객체(int)가 다르다면 어떤 일이 발생할까요?

    이때 consumer는 메시지를 역직렬화 할 수 없습니다.

    하지만 단순히 에러를 발생시키고 멈추지 않습니다. 계속해서 반복을 하고, 해결이 될 때까지 무한 반복합니다. 즉 순식간에 엄청난 크기의 로그 파일을 만들어 내고, 램을 잡아먹습니다.

    그래서 ErrorHandlingDeserializer가 소개되었습니다.

    ErrorHandlingDeserializer는 실제로 역직렬화 하는 객체 (StringDeserializer 등)로 위임합니다. 만약 해당 객체가 역직렬화 하지 못 한다면, value를 null로 return 합니다. 그리고 DeserializationException를 발생시킵니다. 이를 통해 무한 반복을 방지합니다.


    7) 참고 문헌

    https://stackoverflow.com/questions/68987866/kafka-consumer-classnotfoundexception
    Spring Kafka Beyond the Basics - How to Handle Failed Kafka Consumers (confluent.io)

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-mapping-types

     

     

    반응형

    댓글

Designed by Tistory.