본문 바로가기
개발

객체 지향 5원칙을 준수하여 리팩토링 하기 (with. Kafka Consumer)

by 방구쟁이 2024. 3. 16.
728x90

 23년도 회고에서 언급했던 Kafka Consumer를 리팩토링하며 새로 개발한 경험에 대해 포스팅 해보자. 먼저 글을 작성하기 전에 이번 Consumer 개발 당시 매우 많은 도움을 주신, 저를 성장할 수 있게 해주신 같은팀 대리님께 감사드립니다! 😌

 해당 포스팅에서는 Consumer에 OCP(Open Closed Principle)를 만족하기 위한 추상 팩토리 패턴 적용, ThreadSafe 보장을 synchronized 대신하는 방법 적용, 동시에 요청이 들어올 경우 메모리 경합을 줄여준 방법에 대해 기술하였습니다. 

 

1. 들어가며

 이번에 영상 광고 송출 서비스를 새로 시작하게 되며 프로젝트 일정 상 기존에 담당한 동료 개발자분들이 통계 파트까지 개발하기에는 힘든 상황이었습니다. 제가 급히 프로젝트에 투입되면서 Kafka Consumer Report 적재 서비스 개발을 담당하게 되었습니다.

벤틀리



우리팀의 통계 데이터 적재 방식을 매우 간단히 요약하면 다음과 같습니다.

  1. Send : Producer(광고 송출 서비스)에서 통계 데이터에 대한 Topic들을 Kafka cluster로 보낸다.
  2. Subscribe(Poll & commit) : Consumer(통계 적재 서비스)에서 Kafka cluster가 호스팅하는 Topic 데이터들을 받는다.
  3. Schedule(데이터 처리) : Consumer에서 읽어온 데이터를 메모리에 저장해두고 주기적으로 DB에 적재한다.

 

 2. 새롭게 Consumer를 개발하기로 한 이유


 팀 내부에서 "Consumer를 손 봐야해.. 손 봐야해..!" 라는 말들이 이미 오가고 있었고, 이번 프로젝트에 Consumer를 사용해야 했기 때문에 기존에 사용하고 있던 Consumer를 먼저 분석하게 되었습니다. 기존 Consumer는 다음과 같은 특징들을 가지고 있었습니다. 

기존 Consumer 특징

  • Local 환경 구축에 대한 어려움
  • 확장성이 떨어짐 & OCP 위반 (신규 topic 추가 시 기존 소스 변경이 필요)
  • Topic message 처리에 대한 구조가 복잡함  (실제로 토픽 하나 추가 시 모듈로 나눠진 4개의 모듈 중 3개의 모듈을 수정해야 하고, 7개의 새로운 클래스와 4개의 기존 클래스를 변경해야 해 변경점이 분산되어 있음..)
  • 해당 프로젝트만 Maven 사용 (컨슈머 외의 다른 프로젝트는 모두 gradle)

 결론적으로 Kafka Consumer가 구독하는 Topic들은 앞으로도 새로운 통계 데이터를 적재할 때 마다 계속 추가될 가능성이 높아 확장성이 중요했으나 기존 Consumer는 확장성이 떨어졌고, 개발 환경에 대해 어려움도 있었기 때문에 새롭게 개발하기로 하였습니다.

 그래서 Consumer Group을 두 그룹으로 나눠 관리하고 기존에 받고 있는 토픽들은 현재 컨슈머 그룹이 처리하도록 하고 앞으로 신규로 확장될 토픽들은 새로운 컨슈머와 그룹을 만들어 처리하도록 방향을 잡았습니다. 
 (Kafka 경험이 없었기 때문에 설렘반 두려움반으로 급히 데브원영 강사님 Kafka 강의를 수강..)

Kafka Consumer Group2가 추가된 구조

 

3. 신규 Consumer 구조

 먼저 신규 Consumer 개발 당시 우리 백엔드팀 대리님께서 추상화시킨 구조를 잡아주시며 성능을 향상시켰던 방법도 공유해주시는 등 많은 도움을 받을 수 있어 Consumer 개발은 매우 좋은 경험이 되었습니다.(대리님 덕분에 많이 성장하고 있는 것 같다..😭)

 추상화된 프로젝트 구조는 다음과 같습니다.
 (실제 DB에 적재 처리하는 Schedule 클래스는 생략하는 등 포스팅에 필요한 부분만 작성)

Kafka Consumer Factory 부분 클래스 다이어그램
Kafka Consumer Topic 추가 부분 클래스 다이어그램

 

4. 신규 Consumer 특징

 위의 다이어그램 구조로 만든 신규 Consumer의 특징입니다.

  • Factory 메서드 패턴 사용
  • ConcurrentHashMap & LongAdder 타입 사용
  • ReentrantLock의 lock 사용
  • spring boot, gradle로 환경 구축 편리함

 하나 하나 소스코드 예시를 보며 장단점을 살펴보려고 합니다. 실제 서비스 소스와는 구조만 비슷하게 재구성하였으며 기존 소스코드와의 비교는 목차 5~7에서 확인 가능합니다.

 4.1 Factory 메서드 패턴 사용

 KafkaConsumerService에서 어떤 Topic들이 들어와도 공통으로 처리하기 위해 추상 팩토리 패턴을 사용하였습니다. 그 결과 Open-Closed Principle을 만족하여 확장성을 높였습니다.

List<Worker> topicWorkers = workerFactory.get(topicTypeEnum);
workers.forEach(it -> it.work(list));

 다음은 WorkerFactory 클래스 구현 방식입니다. Factory를 통해 Worker 인터페이스를 implements한 Worker 클래스(실제 비지니스 로직을 구현)들을 Map에 저장해두고 사용이 필요한 시점에 Topic을 key값으로 조회하여 Topic의 비지니스 로직을 실행할 수 있도록 구현하였습니다.

@Component
public class WorkerFactory {

    private final Map<TopicTypeEnum, List<Worker>> workerMap;

    public WorkerFactory(List<Worker> beanWorkers) {
        wrokerMap = new ConcurrentHashMap<>();

        for (Worker work : beanWorkers) {
            TopicTypeEnum type = work.getType();
            List<Worker> savedData = workerMap.get(type);

            if (savedData == null) {
                List<Worker> workers = new ArrayList<>();
                workers.add(it);
                workerMap.put(type, workers);
            } else {
                savedData.add(it);
            }
        }
    }

    public List<Worker> get(TopicTypeEnum topicTypeEnum) {
        return workerMap.get(topicTypeEnum);
    }
}

 

 4.2 ConcurrentHashMap & LongAdder 타입 사용

 Topic Message가 들어올때 마다 값을 메모리에 저장해두는 로직은 ConcurrentHashMap과 computeIfAbsent, LongAdder를 이용하여 Thread-Safe하도록 구현하였습니다.

  • ConcurrentHashMap은 검색 작업에서는 lock이 이루어 지지 않으며 갱신 작업에서만 lock이 된다. Thread-Safe를 보장하면서도 Hashtable보다 높은 성능을 보장할 수 있음
  • computeIfAbsent를 사용하여 메모리의 저장된 데이터 또는 새로운 메모리로 생성하여 반환
  • LongAdder는 AtomicLong의 연산에 의한 경합 과정에서의 CPU 소모를 줄여준 클래스입니다. AtomicLong는 접근이 빈번한 경우 반복적인 메모리 읽기와 비교 연산을 수행하게 되어 경합 과정이 발생하지만 LongAdder는 스레드 별로 별도의 데이터를 배열로 관리하여 각 스레드가 경합이 발생하는 상황에서 자신만의 변수를 접근하도록 설계됨
TopicCache topicCache = topicCacheFactory.getTopicCache(topic);
topicCache.addData(data);

실제 DB에 데이터 저장하는 로직을 가진 Worker와 마찬가지로 Factory Method Pattern을 이용하여 Topic Message 값을 메모리에 저장해두는 로직을 구현하였습니다.

public abstract class AbstractTopicCache<T> implements TopicCache<T> {

    private final Map<String, TopicDTO<T>> memoryMap = new ConcurrentHashMap<>();
    private final ReentrantLock lock = new ReentrantLock();

    @Override
    public void addData(T data) {
        TopicDTO<T> topicDTO = (TopicDTO<T>) data;
        lock.lock();
        try {
            memoryMap
                    .computeIfAbsent(topicDTO.generateKey(), v -> (TopicDTO<T>) topicDTO.getEmptyDto())
                    .sum(data);
        } finally {
            lock.unlock();
        }
    }
}
// TopicA 클래스의 일부

public class TopicA implements TopicDTO<TopicA> {

    private LongAdder viewCnt = new LongAdder();

    ...

    @Override
    public void sum(TopicA data) {
        viewCnt.add(data.getViewCnt().intValue());
    }
    
}

 

 4.3 ReentrantLock 사용

 Thread-safe하며 비지니스 로직의 유연성을 갖기 위해 synchronized 대신 ReentrantLock을 사용하여 정합성을 유지해주었다. ReentrantLock은 다양하고 복잡한 상황에서 Thread를 관리할 수 있고 락을 획득할 Thread를 정해줄 수 있다.(명시적인 lock)

 (위 AbstractTopicCache에서 엿볼 수 있어 예시 소스는 생략하도록 하겠습니다. 사실 로직에 따라 4.1, 4.2만 적용해주어도 ThreadSafe 함을 구현하였지만 이후 추가적으로 로직이 변경될 경우 확실한 ThreadSafe하도록 추가해주었습니다)

 지금까지 신규 Consumer의 특징들을 살펴보았습니다.
 
 다음으로 기존 Consumer와 신규 Consumer의 차이를 비교해 보겠습니다.

 

5. 기존 Consumer 메세지 처리 방식

// 기존 Consumer process 로직

public void processMain(String topic, String message) {
    
    try {
      String className = "";
      String userId = "";
      JSONObject jSONObject = JSONObject.fromObject(message);
      try {
        className = (String) jSONObject.get("className");
        userId = (String) jSONObject.get("userId");
        
      } catch (Exception e) {
        return;
      }
      
      if (GlobalConstants.TopicA.equals(topic)) {
        BaseTopicAData record = null;

        TopicAData tmp = TopicAData.fromHashMap(jSONObject);
      	record = tmp.toBaseTopicAData();
      	record = this.processBaseTopicAData(record);
        
        if (record != null) {
          sumObjectManager.appendTopicAData(record);
        }

      } else if (GlobalConstants.TopicB.equals(topic)) {
        BaseTopicBData record = null;
        
        TopicBData tmp = TopicBData.fromHashMap(jSONObject);
        record = tmp.toBaseTopicBData();
        record = this.processTopicBData(record);
        
        if (!ObjectUtils.isEmpty(record)) {
          sumObjectManager.appendTopicBData(record);
        }
        
      } else if (GlobalConstants.TopicC.equals(topic)) {
        if (GlobalConstants.TopicCData.equals(className)) {
          TopicCData r = new TopicCData(jSONObject);
          if (!ObjectUtils.isEmpty(r)) {
            sumObjectManager.appendTopicCData(r);
          }
        }
      } 

	  ...


}

 기존 Consumer는 메세지를 받으면 분기문으로 각 Topic별로 처리하도록 되어있습니다. 또한 메모리에 데이터를 축적하는 역할을 하는SumObjectManager 클래스에서도 Topic별로 메모리에 저장 및 초기화시키는 메서드가 각각 존재하고 Topic이 추가될 때마다 해당 클래스에 메서드를 추가해야 합니다.

// 기존 Consumer 메모리에 데이터 축적 메서드들

public synchronized void appendTopicAData(TopicAData record) {
    if (mapTopicAData.size() > summeryListSize) {
      long millis = Calendar.getInstance().getTimeInMillis();
      String writeFileName = String.format("%s_%s", "insertIntoError", DateUtils.getDate("yyyyMMdd_HHmm"), millis);
      JSONObject jJSONObject = JSONObject.fromObject(record);
      try {
        ConsumerFileUtils.writeLine(logPath + "retry/", writeFileName, GlobalConstants.req_info, jJSONObject);
      } catch (Exception e) {
        logger.error("{}", e.getMessage());
      }
    } else {
      TopicAData sum = mapTopicAData.get(record.getKeyCode());
      if (ObjectUtils.isEmpty(sum)) {
        mapTopicAData.put(record.getKeyCode(), record);
      } else {
        sum.sumGethering(record);
      }
    }
  }
  
  public synchronized void appendTopicBData(TopicBData record) {
    if (mapTopicBData.size() > summeryListSize) {
      long millis = Calendar.getInstance().getTimeInMillis();
      String writeFileName = String.format("%s_%s", "insertIntoError", DateUtils.getDate("yyyyMMdd_HHmm"), millis);
      JSONObject jJSONObject = JSONObject.fromObject(record);
      try {
        ConsumerFileUtils.writeLine(logPath + "retry/", writeFileName, GlobalConstants.conv_info, jJSONObject);
      } catch (Exception e) {
        logger.error("{}", e.getMessage());
      }
    } else {
      TopicBData sum = mapTopicBData.get(record.getKeyCode());
      if (ObjectUtils.isEmpty(sum)) {
        mapTopicBData.put(record.getKeyCode(), record);
      } else {
        sum.sumGethering(record);
      }
    }
  }
  
  ...

  public synchronized Map<string, ?> removeTopicAObjectMap() {</string, ?>
    Map<string, ?> result = null;</string, ?>
    result = this.mapTopicAData;
    mapTopicAData = (new ConcurrentHashMap<string, topicAdata>());</string, topicAdata>
    return result;
  }
  
  public synchronized Map<string, ?> removeTopicBMap() {</string, ?>
    Map<string, ?> result = null;</string, ?>
    result = this.mapTopicBData;
    mapTopicBData = (new ConcurrentHashMap<string, topicBdata>());</string, topicBdata>
    return result;
  }
  
  ...

기존 컨슈머는 Topic이 추가될 때마다 메서드에 Topic별 분기가 계속 추가되어야 하며, 기존 클래스에 메서드도 계속 추가해야 합니다. 실제 코드에서는 더 많은 클래스들의 변형이 일어나게 됩니다.

 

6. 신규 Consumer 메세지 처리 방식

 다음은 신규 Consumer의 process 로직입니다.

// KafkaConsumerService

public void process(String topic, String value) {
        try {
            var topicDTOClass = topicDTOFactory.getTopicDTOClass(topic);
            var data = messageParser.parseJson(value, topicDTOClass);

            TopicCache topicCache = topicCacheFactory.getTopicCache(topic);
            topicCache.addData(data);

        } catch (Exception e) {
            log.error("ConsumerService Exception Topic: {}", topic, e);
        }
}

 기존 process 로직을 Factory method Pattern 적용하여 Topic 별로 각 TopicMemory 클래스를 찾아 addData를 수행합니다.

// TopicDTOFactory 일부

@Service
public class TopicDTOFactory {

    private final Map<TopicTypeEnum, TopicDTO> topics;

    public TopicDTOFactory(List<TopicDTO> topicDTOList) {
        topics = new ConcurrentHashMap<>();

        for (TopicDTO topicDTO : topicDTOList) {
            topics.put(topicDTO.getType(), topicDTO);
        }
    }

    public Class<? extends TopicDTO> getTopicDTOClass(String topic) {
        TopicTypeEnum topicTypeEnum = TopicTypeEnum.findTopic(topic);

        TopicDTO topicDTO = topics.get(topicTypeEnum);

        return topicDTO.getClass();
    }
}


@Service
public class TopicMemoryFactory {

    private final Map<TopicTypeEnum, TopicMemory> memoryRepositoryMap;

    public TopicMemoryFactory(List<TopicMemory> memoryRepositories) {
        memoryRepositoryMap = new ConcurrentHashMap<>();

        for (TopicMemory memoryRepository : memoryRepositories) {
            memoryRepositoryMap.put(memoryRepository.getType(), memoryRepository);
        }
    }

    public TopicMemory getTopicMemory(TopicTypeEnum topicTypeEnum) {
        return memoryRepositoryMap.get(topicTypeEnum);
    }
}

 위와 같이 Factory를 구현할 경우 각각의 Factory의 빈 생성 시 각 TopicDTO, TopicMemory를 implements하고 있는 클래스들을 주입 받아 Map에 저장됨으로써 Topic이 신규로 추가되어도 process 메서드는 수정 없이 확장할 수 있게 됩니다.

@Slf4j
public abstract class AbstractTopicMemory implements TopicMemory {

    private final Map<string, topicdto> memoryMap = new ConcurrentHashMap<>();</string, topicdto
    private final ReentrantLock lock = new ReentrantLock();

    @Override
    public void addData(T data) {
        TopicDTO topicDTO = (TopicDTO) data;
        lock.lock();
        try {
            memoryMap
                    .computeIfAbsent(topicDTO.generateKey(), v -> (TopicDTO) topicDTO.getEmptyDto())
                    .sum(data);
        } finally {
            lock.unlock();
        }
    }

	...
}

 또한 추상화 TopicMemory 클래스를 구현하여 모든 Topic들의 memory 저장을 위한 꼭 필요한 메서드들은 공통으로 처리하였습니다.

 

7. 신규 컨슈머 결과

 Topic을 받아 처리하는 Consumer를 위와 같이 리팩토링하게 되며 아래 결과를 얻었습니다.

 

1. OCP를 만족하는 결과를 얻게 되었다.
  새로운 Topic을 구독할 경우 기존 소스의 변경 없이 한 패키지만 생성하면 된다. - 3. 신규 Consumer 구조 그림 확인
  (topic, topic cache, topic worker 추가 및 topic enum에 신규 Topic값 추가)

2. Factory method Pattern을 적용하여 클래스당 하나의 역할을 수행하도록 분리되었다.

3. Local, dev, stage, prod 환경 셋팅이 비교적 간단해 개발하기 편해졌다. 

 

결론적으로 객체지향 설계 5대원칙인 SOLID과 디자인패턴의 중요성을 체감하는데 좋은 경험이 된 프로젝트 였습니다 😁

 

8.  테스트 코드

 번외로 syncronized 대신 (lock 미사용), ConcurrentHashMap, LongAdder를 이용하여 구현한 결과 적재하는 로직이 ThreadSafe한지 테스트 코드로 살펴보자.

 먼저 TopicA를 테스트 ConcurrentHashMap, LongAdder로 구현하여 성공 예시를 살펴보자.

// ExampleTest

@ExtendWith(MockitoExtension.class)
@DisplayName("Consumer process 예시 소스 테스트")
class ExampleTest {

    @Mock
    private KafkaConsumerService subject;
    private TopicAMemory topicAMemory = new TopicAMemory();

    @BeforeEach
    void setUp() {
        TopicDTOFactory topicDTOFactory = new TopicDTOFactory(
                List.of(new TopicA())
        );
        
        TopicMemoryFactory topicMemoryFactory = new TopicMemoryFactory(
                List.of(topicAMemory)
        );

        subject = new KafkaConsumerService(
                topicDTOFactory,
                topicMemoryFactory
        );
    }

    @DisplayName("TopicA 메세지가 10000개 적재된 값 10000개")
    @RepeatedTest(100)
    void sumTopicA() throws InterruptedException {
        // Given
        String key = "TOPICA";
        String value = "{\"testCnt\":1}";
        int count = 10000;
        CountDownLatch countDownLatch = new CountDownLatch(count);

        // When
        ExecutorService consumeExecutor = getExecutorService(count,key, value,countDownLatch);

        // Then
        countDownLatch.await();
        consumeExecutor.shutdown();
        List<TopicA> topicAS = topicAMemory.removeAndGetData();

        Assertions.assertAll(
                () -> assertThat(topicAS.get(0).getTestCnt().intValue()).isEqualTo(count)
        );
    }

    private ExecutorService getExecutorService(int count, String key, String value, CountDownLatch countDownLatch) {
        ExecutorService consumeExecutor = Executors.newFixedThreadPool(100);

        for (int i = 0; i < count; i++) {
            String topic;
            String data;

            topic = key;
            data = value;

            consumeExecutor.submit(() -> {
                try {
                    subject.process(topic, data);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }

        return consumeExecutor;
    }

}

ConcurrentHashMap과 LongAdder를 사용한 테스트 코드 결과는 다음과 같다.

멀티 쓰레드 환경으로 데이터 적재 확인
synchronized를 사용하지 않고 ThreadSafe하게 처리 완료

더보기

TopicA 테스트 코드 참고용 소스

// TopicA

@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
@ToString
@Component
public class TopicA implements TopicDTO<TopicA> {

    private int statsDttm;

    @Builder.Default
    private LongAdder testCnt = new LongAdder();

    @Override
    public TopicTypeEnum getType() {
        return TopicTypeEnum.TOPICA;
    }

    @Override
    public String generateKey() {
        return new StringBuilder()
                .append(getStatsDttm())
                .toString();
    }

    @Override
    public TopicA getEmptyDto() {
        return TopicA.builder()
                .statsDttm(this.statsDttm)
                .build();
    }

    @Override
    public void sum(TopicA data) {
        testCnt.add(data.getTestCnt().intValue());
    }
}
// TopicAMemory
// 분석하기 쉽게 추상화 메서드를 해당 클래스로 옮김(실제 테스트는 추상화 메서드 사용)

@Slf4j
@Repository
public class TopicAMemory implements TopicCache<T> {

    private final Map<String, TopicDTO<T>> memoryMap = new ConcurrentHashMap<>();
	// HashMap을 사용할 경우 ConcurrentModificationException 발생
    private final ReentrantLock lock = new ReentrantLock();

    @Override
    public TopicTypeEnum getType() {
        return TopicTypeEnum.TOPICA;
    }
    
    @Override
    public void addData(T data) {
        TopicDTO<T> topicDTO = (TopicDTO<T>) data;
        lock.lock(); // 해당 부분은 없어도 sum 동작에 따라 없어도 된다.
        try {
            memoryMap
                    .computeIfAbsent(topicDTO.generateKey(), v -> (TopicDTO<T>) topicDTO.getEmptyDto())
                    .sum(data);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public List<T> removeAndGetData() {
        lock.lock();
        try {
            return (List<T>) memoryMap.keySet().stream()
                    .map(memoryMap::remove)
                    .collect(Collectors.toList());
        } finally {
            lock.unlock();
        }
    }
    
    
}

 

TopicB는 int와 HashMap을 사용하여 구현하여 테스트를 진행하였다. (스포하자면 역시 ThreadSafe하지 못함)

@DisplayName("TopicB 메세지가 10000개 적재된 값 10000개")
@RepeatedTest(10)
void sumTopicB() throws InterruptedException {
    // Given
    String key = "TOPICB";
    String value = "{\"testCnt\":1}";
    int count = 10000;
    CountDownLatch countDownLatch = new CountDownLatch(count);

    // When
    ExecutorService consumeExecutor = getExecutorService(count,key, value,countDownLatch);

    // Then
    countDownLatch.await();
    consumeExecutor.shutdown();
    List<TopicB> topicAS = topicBMemory.removeAndGetData();

    Assertions.assertAll(
            () -> assertThat(topicAS.get(0).getTestCnt()).isEqualTo(count)
    );
}
더보기

TopicB 테스트 코드 참고용 소스

@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
@ToString
@Component
public class TopicB implements TopicDTO<TopicB> {

    private int statsDttm;

    private int testCnt;

    @Override
    public TopicTypeEnum getType() {
        return TopicTypeEnum.TOPICB;
    }

    @Override
    public String generateKey() {
        return new StringBuilder()
                .append(getStatsDttm())
                .toString();
    }

    @Override
    public TopicB getEmptyDto() {
        return TopicB.builder()
                .statsDttm(this.statsDttm)
                .build();
    }

    @Override
    public void sum(TopicB data) {
        testCnt++;
    }

    @Override
    public void init() {}
}
@Slf4j
@Repository
public class TopicBMemory extends AbstractTopicCache<TopicB> {

	private final Map<String, TopicDTO<T>> memoryMap = new HashMap<>();

    @Override
    public void addData(T data) {
        TopicDTO<T> topicDTO = (TopicDTO<T>) data;
        String key = topicDTO.generateKey();
        TopicDTO<T> dto = memoryMap.get(key);
        if (dto == null) {
            dto = (TopicDTO<T>) topicDTO.getEmptyDto();
            memoryMap.put(key, dto);
        }
        dto.sum(data);
    }

    @Override
    public List<T> removeAndGetData() {

        List<T> removedData = new ArrayList<>();
        Iterator<String> iterator = memoryMap.keySet().iterator();
        while (iterator.hasNext()) {
            String key = iterator.next();
            removedData.add((T) memoryMap.remove(key));
        }
        return removedData;

    }
    @Override
    public TopicTypeEnum getType() {
        return TopicTypeEnum.TOPICB;
    }
}

10번의 테스트 모두 실패

HashMap, int를 사용하여 구현한 TopicB의 테스트 결과는 ThreadSafe하지 못하였다.

ThreadSafe 하지 못함

 

 

이상 KafkaConsumer를 새롭게 개발하면서 얻게 된 경험을 포스팅하였습니다. 감사합니다.

참고 문헌



728x90

'개발' 카테고리의 다른 글

Grafana K6를 이용하여 부하 테스트 해보기  (0) 2024.10.27
메모리 누수 분석 및 개선 경험  (0) 2023.04.10
Referer Policy 적용하기  (0) 2022.04.29
웹 렌더링 과정  (0) 2021.09.13
SPA(Single Page Application)란?  (0) 2021.07.13

댓글