미소를뿌리는감자의 코딩

[Aper] 메시지 큐 도입 본문

프로젝트

[Aper] 메시지 큐 도입

미뿌감 2024. 10. 8. 22:13
728x90

1. 참고 사이트

전반적인 메시지 큐에 대한 지식을 아래 글을 통해 습득하였다.

https://f-lab.kr/insight/real-time-chat-system-20240707

 

메시지 큐와 웹소켓을 활용한 실시간 채팅 시스템 구축

이 블로그 포스트는 메시지 큐와 웹소켓을 활용하여 실시간 채팅 시스템을 구축하는 방법에 대해 설명합니다. 메시지 큐와 웹소켓의 역할과 사용법, 그리고 이 두 기술을 결합하여 실시간 채팅

f-lab.kr

또한 아래 블로그의 메시지 큐에 대한 그림을 통해 이해를 더할 수 있었다.

https://velog.io/@noteasymin/%EC%B9%B4%EC%B9%B4%EC%98%A4%ED%86%A1-%EC%8B%9C%EC%8A%A4%ED%85%9C-%EB%94%94%EC%9E%90%EC%9D%B8-WebSocket-%EB%A9%94%EC%8B%9C%EC%A7%80%ED%81%90-SQL-NoSQL

 

카카오톡 시스템 디자인 (WebSocket, 메시지큐, SQL, NoSQL)

Polling 계속해서 Backend에게 요청을 해서 응답을 받음 Long Polling 새로운 메시지가 있는지 계속 서버에게 물어보기 메시지가 있거나 / 타임아웃 할 때 까지 리퀘스트 잡고 있기 WebSocket WebSocket은 Open

velog.io

 

2. 메시지 큐 도입의 필요성 

현재 프로젝트는 WebSocket과 STOMP를 이용하여 채팅이 구현되어 있다.

websocket은 실시간 데이터 전송이 가능하도록 한다. 여기서, STOMP는 webSocket 프로토콜을 확장하여 메시지 브로커와 통신할 수 있도록 설계되었다. 이 프로토콜을 사용하면, 클라이언트가 메시지 브로커에 연결하고, 구독, 전송, 연결 끊기 등의 작업을 할 수 있다.

즉, STOMP는 client 에서 메시지 브로커에 연결하는 것을 돕는다.

 

그렇다면, 여기에서 메시지 큐를 도입하려는 이유는 무엇일까? 바로, coupling과 의존성을 줄이기 위함이다.

보낸 메시지를 큐에 저장함으로서, 서비스가 해당 response에 오랫동안 관여하지 않아도 된다.

 

SimpMessagingTemplate vs. RabbitMQ

SimpMessagingTemplate = WebSocket 기반 메시징을 위한 템플릿 클래스 

템플릿 클래스 : 시스템이나 저장소와의 상호작용을 간단히 해주는 API를 제공함. 즉, Spring Framework에서 제공하는 Utility Class

 

RabbitMQ = 메시지 브로커

말 그대로 '브로커'의 역할을 한다. 클라이언트 ---- 브로커 ---- 서버' 로 메시지가 흐를 수 있도록 돕는다. 브로커 역할을 하면서, 메시지 데이터 처리를 돕는다.

 

- SimpMessagingTemplate

클라이언트가 웹소켓을 통해 서버에 연결이 되어 있을 때만 메시지를 전달할 수 있다. 하지만 채팅의 경우, client가 사이트에 들어와 있지 않아도 메시지를 수신해야 한다. 따라서, SimpMessagingTemplate만으로의 사용은 무리가 있다고 할 수 있다.

 

-RabbitMQ

메시지를 소비하지 않으면 큐에 계속 저장이 됨. 즉, 클라이언트가 서버에 연결 되어 있지 않더라도 큐에 저장해 둠.

RabbitMq vs. Kafka

 

3. 코드

package com.sparta.aper_chat_back.chat.service;

import com.sparta.aper_chat_back.chat.dto.MessageRequestDto;
import com.sparta.aper_chat_back.chat.entity.ChatMessage;
import com.sparta.aper_chat_back.config.RabbitConfig;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class ChatService {

    private final ReactiveMongoTemplate reactiveMongoTemplate;
    private final RabbitTemplate rabbitTemplate;
    private final RabbitAdmin rabbitAdmin;
    private final RabbitConfig rabbitConfig;

    public ChatService(ReactiveMongoTemplate reactiveMongoTemplate, RabbitTemplate rabbitTemplate, RabbitAdmin rabbitAdmin, RabbitConfig rabbitConfig) {
        this.reactiveMongoTemplate = reactiveMongoTemplate;
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitAdmin = rabbitAdmin;
        this.rabbitConfig = rabbitConfig;
    }

    @Transactional
    public Mono<ChatMessage> saveMessage(MessageRequestDto requestDto) {
        String collectionName = "chat_" + requestDto.getChatRoomId();
        ChatMessage chatMessage = new ChatMessage(requestDto);
        return reactiveMongoTemplate.save(chatMessage, collectionName)
                .thenReturn(chatMessage);
    }

    @Transactional
    public Flux<ChatMessage> getChatHistory(Long roomId) {
        String collectionName = "chat_" + roomId;
        return reactiveMongoTemplate.findAll(ChatMessage.class, collectionName);
    }


    @Transactional
    public Mono<ChatMessage> saveAndBroadcastMessage(MessageRequestDto requestDto) {
        String chatRoomQueue = "chat_room_" + requestDto.getChatRoomId() + ".queue";
        String routingKey = "chat.room." + requestDto.getChatRoomId();

        Queue queue = QueueBuilder.durable(chatRoomQueue).build();
        Binding binding = BindingBuilder.bind(queue).to(new TopicExchange(RabbitConfig.CHAT_EXCHANGE)).with(routingKey);
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareBinding(binding);

        rabbitConfig.addQueue(String.valueOf(requestDto.getChatRoomId()));

        return saveMessage(requestDto)
                .doOnSuccess(savedMessage -> {
                    rabbitTemplate.convertAndSend(RabbitConfig.CHAT_EXCHANGE, routingKey, savedMessage);
                });
    }
}

 

코드 하나하나에 대해서 알아보도록 하자.

    @Transactional
    public Mono<ChatMessage> saveMessage(MessageRequestDto requestDto) {
        String collectionName = "chat_" + requestDto.getChatRoomId();
        ChatMessage chatMessage = new ChatMessage(requestDto);
        return reactiveMongoTemplate.save(chatMessage, collectionName)
                .thenReturn(chatMessage);
    }

saveMessage는 들어온 메시지에 대해서 mongosh에 저장할 수 있도록 하는 코드이다.

 

@Transactional
public Flux<ChatMessage> getChatHistory(Long roomId) {
    String collectionName = "chat_" + roomId;
    return reactiveMongoTemplate.findAll(ChatMessage.class, collectionName);
}

위 코드는 채팅방 아이디를 통해서 해당 채팅방에 기록되어 있는 채팅들을 가지고 오는 코드이다.

 

@Transactional
public Mono<ChatMessage> saveAndBroadcastMessage(MessageRequestDto requestDto) {
    String chatRoomQueue = "chat_room_" + requestDto.getChatRoomId() + ".queue";
    String routingKey = "chat.room." + requestDto.getChatRoomId();

    Queue queue = QueueBuilder.durable(chatRoomQueue).build();
    Binding binding = BindingBuilder.bind(queue).to(new TopicExchange(RabbitConfig.CHAT_EXCHANGE)).with(routingKey);
    rabbitAdmin.declareQueue(queue);
    rabbitAdmin.declareBinding(binding);

    rabbitConfig.addQueue(String.valueOf(requestDto.getChatRoomId()));

    return saveMessage(requestDto)
            .doOnSuccess(savedMessage -> {
                rabbitTemplate.convertAndSend(RabbitConfig.CHAT_EXCHANGE, routingKey, savedMessage);
            });
}

chatRoomQueue 라는 큐 아이디를 생성한다. 일반적으로 chat_room_3 과 같은 구조를 지니게 될 것이다.

routingKey 라는 큐를 인식하기 위한 용도의 키를 생성한다. chat.room.3과 같은 키의 구조를 지닌다.

 

다음으로 해당 큐 이름과 routingKey를 바탕으로 큐를 생성한다. 이미 존재하고 있는 큐가 있다면, 새로 생성하지 않고 해당 큐를 이용하게 된다.

또한 Exchange 와도 bind 해주었다. exchange의 경우엔 큐 마다 따로 가지고 있지 않아도 되므로, chat.exchange로 통일 시켜서 진행해 주었다.

 

이렇게 선언된 queue와 binding을 선언해 두고, rabbitConfig에 선언해 두었던 addQueue를 이용해 주었다.

public void addQueue(String chatRoomId) {
    String queueName = "chat_room_" + chatRoomId + ".queue";
    queueNames.add(queueName);
}

이를 통해 queueNames라는 리스트에 새로 생성된 큐의 이름을 추가시켜 주었다.

 

다음으로 routingKey를 이용해서 해당 키를 대상으로 하는 메시지 요청을 교환기에다가 보내게 된다.

 

다음으로 rabbitListner를 아래와 같이 작성해 주었다.

@RabbitListener(queues = "#{rabbitConfig.allQueueNames().toArray(new String[0])}")
public void handleMessage(ChatMessage chatMessage) {
    String destination = "/subscribe/" + chatMessage.getChatRoomId();
    messagingTemplate.convertAndSend(destination, chatMessage);
}

 

이를 통해서, 저장해 주었던 allQueueNames를 대상으로 listen 이 가능하도록 하였다.

public List<String> allQueueNames() {
    return queueNames;
}

 

 

다음으로 RabbitConfig를 알아보도록 하자.

package com.sparta.aper_chat_back.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

@Configuration
public class RabbitConfig {

    public static final String CHAT_EXCHANGE = "chat.exchange";
    private final List<String> queueNames = new ArrayList<>();

    @Bean
    public Exchange chatExchange() {
        return ExchangeBuilder.topicExchange(CHAT_EXCHANGE).durable(true).build();
    }

    public void addQueue(String chatRoomId) {
        String queueName = "chat_room_" + chatRoomId + ".queue";
        queueNames.add(queueName);
    }

    public List<String> allQueueNames() {
        return queueNames;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {
        return new RabbitAdmin(rabbitTemplate);
    }

    @Bean
    public void initializeQueues(RabbitAdmin rabbitAdmin) {
        for (String queueName : queueNames) {
            Queue queue = QueueBuilder.durable(queueName).build();
            rabbitAdmin.declareQueue(queue);
        }
    }
}

 

@Bean으로 선언되어 있는 녀석들은 Spring이 시작될 때, 실행할 수 있도록 한다.

 

따라서, chatExchange로 교환기를 켜주고, rabbitAdmin으로 rabbitAdmin에 대한 것도 켜주도록 한다.

마지막으로 initializeQueues를 통해서, 이전 queueNames라는 리스트로 저장해 두었던 큐에 대해서 다시 활성화시켜 준다.

728x90