Cloudwave

[Cloudwave] CGV 대용량 트래픽 예매 서비스 구현 - 4

냄B뚜껑 2025. 5. 4. 23:23

서론

지난 글에 Redis의 SortedSet을 통해 대기열을 구현하였다.

대기열을 통해 서버와 DB에 가해지는 순간적인 부하를 점진적으로 줄 수 있게 되었다.

 

하지만 현재 Redis 기반 대기열 구성에는 다음과 같은 문제점이 존재한다.

 

 

 

서버의 과부하

 

위에 부하테스트는 총 90000명의 사용자가 동시에 Redis 대기열에 입장하는 부하테스트이다.

아쉽게도 486명이 에러가 생겨 대기열에 입장하지 못했다.

 

 

그라파나를 통해 파드의 cpu 모니터링을 보았더니, 서버의 과부하로 인한 대기열 입장 실패였다.

Redis의 대기열 입장 역시 서버를 통해 진행되기 때문에 급격한 트래픽에는 제대로 동작하지 못하는 것이다.

 

서버의 부하를 줄이려고 대기열을 만들었는데, 대기열을 입장하는 것에 트래픽이 몰려 여전히 서버에 과부하가 오는 상황이다.

 

순서 보장 x

현재 대기열에서 대기 순서를 정렬하는 기준은 System.currentTimeMillis() 이다. 이 방식은 멀티 인스턴스 환경에서 서버 간 타임스탬프 차이나 동일 타임스탬프 충돌로 인해 정확한 순서 보장이 어렵다.

 

 

Kafka로 해결 가능?

그렇다면, Redis의 대기열 입장 요청을 바로 서버가 처리하는 것이 아닌, kafka의 메시지 큐로 요청을 쌓아놓고 감당할 수 있을 만큼만 받아 처리하면 어떨까?

 

Kafka는 고성능 메시지 큐로, 수 천~수 만 건의 요청도 손실 없이 안정적으로 처리할 수 있고, 각 메시지에 자동 증가하는 고유 번호(offset) 를 부여하여, 생성된 순서대로 메시지를 정렬한다.

 

흐름은 다음과 같다.

  1. 서버에서 Kafka로 대기열 입장 메시지 전송
  2. kafka 큐에서 메시지를 받아 Redis 대기열 입장 요청 처리

 

Kafka → Redis 순으로 대기열 입장 처리를 하면,
Kafka에서 이미 순서가 보장된 상태(offset 기반)로 서버가 감당할 수 있을만큼만 대기열에 안정적으로 적재할 수 있어, 서버 과부하를 막고, 서버 간의 시간 차나 동일한 시간에도 불구하고 처리 순서를 정확하게 유지할 수 있게 된다.

 

 

Kafka로 순서 보장 및 완충제 역할 수행

 

 

 

메시지를 순차적으로 처리하면 동시성 제어도 가능하지 않을까?

지난 글에 동시 예매 시, 공유락과 배타락이 동일한 레코드에 걸려, 데드락이 발생하여 비관적 락을 통해 동시성 제어를 하였다.

만약, Kafka를 통해 파티션의 이벤트 메시지를 순차적으로 처리한다면, 락을 걸 필요없이 들어온 순서대로 예매 처리하면 되지 않을까?

 

@Component
@RequiredArgsConstructor
@Slf4j
public class ReservationProducer {
    private final KafkaTemplate<String, ReservationEvent> kafkaTemplate;

    public void sendReservationRequest(String userName, Long seatId, Long scheduleId) {
        ReservationEvent request = new ReservationEvent(userName, seatId, scheduleId);
        String topic = "reservation-" + scheduleId;
        kafkaTemplate.send(topic, seatId.toString(), request); // seatId 기준 파티셔닝

        log.info("예메 요청 전송됨: 사용자={}, 좌석ID={}, 스케줄ID={}, 토픽={}",
                userName, seatId, scheduleId, topic);
    }
}

 

먼저 예매 요청이 들어오면, ReservationProducer가 Kafka 토픽으로 메시지 전송한다.

토픽명은 reservation-{scheduleId}으로 영화 일정id로 구분한다.

그리고 seatId로 키를 설정해 동일 좌석 요청은 같은 파티션으로 순서대로 처리한다.

 

@KafkaListener(topicPattern = "reservation-.*", groupId = "reservation-group")
    @Transactional
    public void consume(ConsumerRecord<String, ReservationEvent> record) {
        ReservationEvent request = record.value();

        Seat seat= seatRepository.findById(request.getSeatId()).orElseThrow(() -> new CustomException(StatusCode.SEAT_SOLD_OUT));

        log.info("예약 이벤트 수신: 사용자={}, 좌석ID={}, 스케줄ID={}", request.getUserName(), seat.getId(), request.getScheduleId());

        if(!seat.getIsReserved())
            seat.soldout();
        else {
            log.warn("이미 예약된 좌석입니다. seatId={}", seat.getId());
            throw new CustomException(StatusCode.SEAT_SOLD_OUT);
        }


        Reservation reservation= Reservation.builder()
                .userName(request.getUserName())
                .status(Status.RESERVED)
                .seat(seat)
                .build();

        reservationRepository.save(reservation);

        log.info("예약 저장 완료: 사용자={}, 좌석ID={}", request.getUserName(), seat.getId());
    }

 

@KafkaListener로 해당하는 토픽을 수신한다.

  • topicPattern = "reservation-.*" → 모든 reservation-스케줄ID 형식의 토픽을 수신함.
  • groupId = "reservation-group" → 동일 그룹 ID를 사용하는 Consumer는 파티션을 나눠서 병렬 처리 가능.

기존 예매 서비스 코드에서 비관적 락만 설정하지 않은 코드이다.

 

 

동시성 제어 테스트 코드

@BeforeEach
    void setUp() {
        // 기존 데이터 정리
        reservationRepository.deleteAll();
        seatRepository.deleteAll();

        seatService.createSeat(1L,new SeatReq(3,3));

        seatId=seatRepository.findBySchedule_IdAndRowIndexAndColumnIndex(1L,1L,1L).get().getId();


    }


    @Test
    @DisplayName("카프카 예매 동시성 제어 테스트")
    void testConcurrentReservation() throws InterruptedException {
        int numberOfThreads = 5000; // 요청 수
        ExecutorService executorService = Executors.newFixedThreadPool(10); // 동시 처리할 스레드 풀
        CountDownLatch latch = new CountDownLatch(numberOfThreads); // 모든 요청이 끝날 때까지 기다리기 위한 CountDownLatch
        AtomicInteger successCount = new AtomicInteger(0); // 성공한 예약 수
        List<Exception> exceptions = new ArrayList<>(); // 예외를 담을 리스트
        List<Long> responseTimes = new ArrayList<>(); // 각 요청의 응답 시간을 담을 리스트

        // Kafka Consumer에 latch, successCount, exceptionList 주입
        reservationConsumer.setLatch(latch);
        reservationConsumer.setSuccessCount(successCount);
        reservationConsumer.setExceptionList(exceptions);

        // 테스트 시작 시간 기록
        long testStartTime = System.nanoTime();

        for (int i = 0; i < numberOfThreads; i++) {
            final String username = "user" + i; // 각 사용자마다 고유 이름 생성

            executorService.submit(() -> {
                long startTime = System.nanoTime(); // 시작 시간 기록
                try {
                    // 예약 요청을 보내고 성공하면 카운트 증가
                    reservationProducer.sendReservationRequest(username,seatId,1L);
                } catch (CustomException e) {
                    synchronized (exceptions) {
                        exceptions.add(e); // 사용자 정의 예외 처리
                    }
                } catch (Exception e) {
                    synchronized (exceptions) {
                        exceptions.add(e); // 기타 예외 처리
                    }
                } finally {
                    // 응답 시간 계산
                    long endTime = System.nanoTime(); // 종료 시간 기록
                    long responseTime = endTime - startTime; // 응답 시간 계산
                    synchronized (responseTimes) {
                        responseTimes.add(responseTime); // 응답 시간 리스트에 추가
                    }
                }
            });
        }

        latch.await(); // 모든 요청이 완료될 때까지 대기
        executorService.shutdown(); // Executor 서비스 종료

        // 테스트 종료 시간 기록
        long testEndTime = System.nanoTime();

        // 전체 테스트 시간 계산 (단위: 밀리초)
        long totalTestTimeMillis = (testEndTime - testStartTime) / 1_000_000; // nano -> milliseconds


        // 예약된 좌석 정보 확인
        Seat seat = seatRepository.findById(seatId)
                .orElseThrow(() -> new RuntimeException("Seat not found"));

        // 예외 종류 출력
        exceptions.forEach(ex -> System.out.println("예외 종류: " + ex.getClass().getSimpleName()));

        // 테스트 결과 출력
        System.out.println("성공한 예약 수: " + successCount.get());
        System.out.println("좌석 예약 여부: " + seat.getIsReserved());
        System.out.println("발생한 예외 수: " + exceptions.size());


        // 응답 시간 계산
        List<Long> responseTimeList = new ArrayList<>(responseTimes);
        long fastestResponse = responseTimeList.stream().min(Long::compare).orElse(0L) / 1_000_000; // nanoseconds -> milliseconds
        long slowestResponse = responseTimeList.stream().max(Long::compare).orElse(0L) / 1_000_000; // nanoseconds -> milliseconds
        double averageResponse = responseTimeList.stream().mapToLong(Long::longValue).average().orElse(0) / 1_000_000.0; // nanoseconds -> milliseconds


        // 응답 시간 관련 정보 출력
        System.out.println("최단 응답 시간: " + fastestResponse + "ms");
        System.out.println("최장 응답 시간: " + slowestResponse + "ms");
        System.out.println("평균 응답 시간: " + averageResponse + "ms");
        // 전체 테스트 시간 출력
        System.out.println("전체 테스트 시간: " + totalTestTimeMillis + "ms");

        // 최종 검증: 예약이 되어 있어야 하며, 성공한 예약은 1건만 있어야 하고, 나머지는 예외가 발생해야 합니다.
        assertTrue(seat.getIsReserved(), "좌석은 예약 상태여야 합니다.");
        assertEquals(1, successCount.get(), "동시에 하나만 성공해야 합니다.");
        assertEquals(numberOfThreads - 1, exceptions.size(), "나머지는 예외가 발생해야 합니다.");
    }

 

 

테스트 코드는 다음과 같고, Producer가 이벤트를 발행하고, Consumer가 이벤트를 소비하므로, 비동기 방식이다.

따라서 비동기 처리의 테스트를 위한 Consumer 클래스의 임시 설정을 다음과 같이 해주었다.

 

private CountDownLatch latch;
    private AtomicInteger successCount;
    private List<Exception> exceptionList;

    public void setLatch(CountDownLatch latch) {
        this.latch = latch;
    }

    public void setSuccessCount(AtomicInteger successCount) {
        this.successCount = successCount;
    }

    public void setExceptionList(List<Exception> exceptionList) {
        this.exceptionList = exceptionList;
    }

    @KafkaListener(topicPattern = "reservation-.*", groupId = "reservation-group")
    @Transactional
    public void consumeTest(ConsumerRecord<String, ReservationEvent> record) {
        try {
            ReservationEvent request = record.value();

            Seat seat= seatRepository.findById(request.getSeatId()).orElseThrow(() -> new CustomException(StatusCode.SEAT_SOLD_OUT));

            log.info("예약 이벤트 수신: 사용자={}, 좌석ID={}, 스케줄ID={}", request.getUserName(), seat.getId(), request.getScheduleId());

            if(!seat.getIsReserved())
                seat.soldout();
            else {
                log.warn("이미 예약된 좌석입니다. seatId={}", seat.getId());
                throw new CustomException(StatusCode.SEAT_SOLD_OUT);
            }


            Reservation reservation= Reservation.builder()
                    .userName(request.getUserName())
                    .status(Status.RESERVED)
                    .seat(seat)
                    .build();

            reservationRepository.save(reservation);

            log.info("예약 저장 완료: 사용자={}, 좌석ID={}", request.getUserName(), seat.getId());
            successCount.incrementAndGet();
        } catch (Exception e) {
            exceptionList.add(e);
        } finally {
            latch.countDown();
        }
    }

 

 

 

테스트 결과

사용자 수 1000명
성공한 예약 수 1
좌석 예약 여부 true
발생한 예외 수 999
최단 응답 시간 0ms
최장 응답 시간 214ms
평균 응답 시간 3.26ms
전체 테스트 시간 8320ms

 

한 좌석에 대해 1000개의 스레드를 예매처리 한 결과, 데드락이 발생하지 않고, 정확히 1개의 좌석만 예약이 성공하고 나머지는 커스텀한 예외가 발생했다.

이로써 락을 걸지 않았음에도 순차적으로 처리해, 동시성 제어를 성공한 결과가 나왔다.

(하지만 kafka는 서비스 분리가 된 msa 구조에서 서비스간 통신을 하는데에 쓰이기 때문에 현재 좌석, 예매 도메인이 섞인 비즈니스 코드를 kafka로 동시성 제어를 하는 건 아닌 것 같다. 순차적으로 처리한다는 결과를 얻었음에 의의를 두자!)

 

구현

이제 Redis로 몰리게 될 트래픽을 Kafka가 안정적으로 받고, 순차적으로 Redis에 대기열 입장 처리 하는 로직을 구현하겠다.

 

@Component
@RequiredArgsConstructor
@Slf4j
public class QueueProducer{
    private final KafkaTemplate<String, ReservationEvent> kafkaTemplate;

    public void sendReservationEvent(Long scheduleId, String userName) {
        ReservationEvent request = new ReservationEvent(scheduleId, userName);
        kafkaTemplate.send("reservation-queue", scheduleId.toString(), request);

        log.info("Redis 큐 입장 요청 전송됨: 사용자={}, 영화 스케줄ID={}" , userName, scheduleId);
    }
}

 

대기열 입장 요청을 받으면 reservation-queue라는 토픽에 메시지를 넣고 보내게된다.

 

 

    @KafkaListener(topicPattern = "reservation-queue", groupId = "reservation-group")
    @Transactional
    public void consume(ConsumerRecord<String, ReservationEvent> record) {
        Long offset= record.offset();
        ReservationEvent request = record.value();

        waitingQueueService.enterWaitingQueueWithKafka(offset, request.getUserName(), request.getScheduleId());
    }

 

메시지가 오면, username과 offset을 꺼내 Redis 큐에 넣는다.

 

public void enterWaitingQueueWithKafka(Long offset, String username, Long scheduleId) {
        String queueKey = getWaitingQueueKey(scheduleId);
        redisTemplate.opsForZSet().add(queueKey, username, offset);

        log.info("대기열 진입: 사용자={}, 오프셋={}, 스케줄ID={}", username, offset, scheduleId);
    }

 

Redis의 SortedSet에 offset을 score삼아 정렬하게 된다. 이로써 큐에 순서를 정확하고 겹칠일 없게 정렬할 수 있게된다.

 

 

테스트

@Test
    @DisplayName("카프카 대기열 입장 테스트")
    void testConcurrentReservation() throws InterruptedException {
        int numberOfThreads = 1000; // 요청 수
        ExecutorService executorService = Executors.newFixedThreadPool(10); // 동시 처리할 스레드 풀
        CountDownLatch latch = new CountDownLatch(numberOfThreads); // 모든 요청이 끝날 때까지 기다리기 위한 CountDownLatch
        AtomicInteger successCount = new AtomicInteger(0); // 성공한 예약 수
        List<Exception> exceptions = new ArrayList<>(); // 예외를 담을 리스트
        List<Long> responseTimes = new ArrayList<>(); // 각 요청의 응답 시간을 담을 리스트

        queueConsumer.setLatch(latch);
        queueConsumer.setSuccessCount(successCount);

        // 테스트 시작 시간 기록
        long testStartTime = System.nanoTime();

        for (int i = 0; i < numberOfThreads; i++) {
            final String username = "user" + i; // 각 사용자마다 고유 이름 생성

            executorService.submit(() -> {
                long startTime = System.nanoTime(); // 시작 시간 기록
                try {
                    // 예약 요청을 보내고 성공하면 카운트 증가
                    queueProducer.sendReservationEvent(1L,username);
                } catch (CustomException e) {
                    synchronized (exceptions) {
                        exceptions.add(e); // 사용자 정의 예외 처리
                    }
                } catch (Exception e) {
                    synchronized (exceptions) {
                        exceptions.add(e); // 기타 예외 처리
                    }
                } finally {
                    // 응답 시간 계산
                    long endTime = System.nanoTime(); // 종료 시간 기록
                    long responseTime = endTime - startTime; // 응답 시간 계산
                    synchronized (responseTimes) {
                        responseTimes.add(responseTime); // 응답 시간 리스트에 추가
                    }
                }
            });
        }

        latch.await(); // 모든 요청이 완료될 때까지 대기
        executorService.shutdown(); // Executor 서비스 종료


        // 테스트 종료 시간 기록
        long testEndTime = System.nanoTime();

        // 전체 테스트 시간 계산 (단위: 밀리초)
        long totalTestTimeMillis = (testEndTime - testStartTime) / 1_000_000; // nano -> milliseconds


        // 예외 종류 출력
        exceptions.forEach(ex -> System.out.println("예외 종류: " + ex.getClass().getSimpleName()));

        // 테스트 결과 출력
        System.out.println("성공한 대기열 입장 수: " + successCount.get());
        System.out.println("발생한 예외 수: " + exceptions.size());


        // 응답 시간 계산
        List<Long> responseTimeList = new ArrayList<>(responseTimes);
        long fastestResponse = responseTimeList.stream().min(Long::compare).orElse(0L) / 1_000_000; // nanoseconds -> milliseconds
        long slowestResponse = responseTimeList.stream().max(Long::compare).orElse(0L) / 1_000_000; // nanoseconds -> milliseconds
        double averageResponse = responseTimeList.stream().mapToLong(Long::longValue).average().orElse(0) / 1_000_000.0; // nanoseconds -> milliseconds


        // 응답 시간 관련 정보 출력
        System.out.println("최단 응답 시간: " + fastestResponse + "ms");
        System.out.println("최장 응답 시간: " + slowestResponse + "ms");
        System.out.println("평균 응답 시간: " + averageResponse + "ms");
        // 전체 테스트 시간 출력
        System.out.println("전체 테스트 시간: " + totalTestTimeMillis + "ms");

        assertThat(waitingQueueService.getQueueSize(1L)).isEqualTo(numberOfThreads);
    }

 

 

결과

 

결과는 정상적으로 작동한다!

 

 

실제 환경에서 부하테스트를 통해 비교해보자!

우선 서론에 있던 Kafka를 구성하지 않고 Redis로만 대기열을 구성했을 시 부하테스트이다.

 

총 90000명이 Redis 대기열에 입장하는 부하테스트

Redis- 서버 부하테스트

486명이 입장하지 못하고 에러가 생겼다.

 

Redis - 서버 부하테스트

서버에 할당된 cpu의 최대치를 찍어 동작하지 못한 것이다.

 

 

총 90000명이 Kafka를 통해 Redis 대기열에 입장하는 부하테스트

kafka - 서버 부하테스트

총 90000명의 대기열 입장 요청을 Kafka로 무사히 보냈다!

kafka - 서버 모니터링

순차적으로 kafka의 메시지를 받아 대기열 입장 처리를 함으로 써 안정된 cpu 사용률을 보여준다!

 

 

마무리

대규모 트래픽 속에서 안정적인 예매 서비스를 운영하기 위해, 트래픽 분산과 서버 과부하 방지를 고려하여 Kafka, Redis를 조합해 아키텍처를 구성했다.

 

이번 프로젝트를 통해 서버의 부하 방지를 위한 여러가지 방법들을 공부할 수 있었고, 앞으로는 현재 프로젝트 구조를 msa 형태로 분리해 대기열 서비스, 예매 서비스, 영화 서비스 등으로 나눠 kafka를 서비스간 비동기 통신에 쓰이도록 바꿀 예정이다.