프로젝트

[Aper] 메시지 큐 - RabbitMQ 도입

미뿌감 2024. 10. 16. 17:34
728x90

1. 개요

이번에 채팅 기능을 확장하게 되면서, 메시지 큐 도입을 통한 서버 부하 감소를 하고자 하였다.

그 중, rabbitmq를 선택해서 도입하게 되었다.

kafka 보다 rabbitmq를 선택한 이유는 메시지 전송 패턴에 더 적합하다고 판단을 하였기 때문이다. 반면 kafka는 스트리밍에 더 적합한 서비스라고 판단하였다.

 

메시지 큐를 이용을 통해 서버의 부하를 줄일 수 있다는 점이 큰 이점으로 다가왔다.

 

producer가 message와 routing.key를 포함해서 websocket을 이용하여 서버에 요청을 보내게 되면, 서버는 rabbitmq에 이를 전해주게 된다. rabbitmq는 exchange에서 routing.key를 통해서 알맞은 queue로 메시지를 전송하게 되고, 또한 메시지는 해당 큐를 구독하고 있는 client에게로 전달되게 된다.

 

또한 rabbitmq는 4가지 exchange 유형이 있다.

  • /exchange
  • /queue
  • /amq/queue
  • /topic

 

/exchange

destination: "/exchange/chatExchange(교환기)/room.1(라우팅 키)"

이와 같이 rabbitmq 요청 시, /exchange로 시작하는 api와 교환기, 라우팅 키 작성을 통해서 사용할 수 있다.

메시지를 특정 exchange에 전송하며 해당 exchange에서 routing key를 사용하여 어떤 큐로 전달할 지 결정한다.

 

이를 통해 메시지를 여러 큐에 분배하거나, 조건에 맞는 큐에만 메시지를 전송할 수 있다.

 

/queue

destination: "/queue/myQueue"

메시지를 특정 큐에 직접 전송하는 방식이다. 따라서 중간 과정인 exchange에 전달하는 과정을 생략하게 된다.

단순하게 적용할 수 있다는 점에서 이점이 있지만, 다수의 큐에 메시지를 분배해야 할 경우엔 제한적이다.

 

STOMP 서버에서 생성된 큐를 구독하여 해당 큐에 들어온 메시지를 받을 수 있다.

반면 /queue의 default type은 direct로 하나의 큐에게로만 메시지를 넣을 수 있다.

따라서, 다수의 큐에 메시지를 보내기에 한계가 있다.

 

/amq/queue

destination: "/amq/queue/externalQueue"

임시 큐를 의미한다. 이는 클라이언트가 연결될 때 임시로 생성되며, 클라이언트가 연결을 끊으면 자동으로 큐가 삭제된다.

따라서, 임시 데이터 저장 혹은 일회성 메시징에 적합하다.

 

/topic

destination = "/topic/{exchange_name}"

 

일시적이거나 지속적인 topic을 구독하는 데 사용됨.

특정 라우팅 키에 따라 메시지를 "여러 큐"로 라우팅 됨.

 

따라서 이번 프로젝트에는 /topic을 이용해서 라우팅 키에 따라 메시지를 알맞은 queue에 전송할 수 있도록 할 것이다.

 

2. 코드

전체적인 코드 흐름은 아래 블로그를 보면서 공부하였다.

https://dev-gorany.tistory.com/325

 

[Spring Boot] WebSocket과 채팅 (4) - RabbitMQ

들어가기 앞서 STOMP + RabbitMQ Dependency Configuration DTO Controller RabbitMQ HTML Javascript Destination 마치며 [Spring Boot] WebSocket과 채팅 (3) - STOMP [Spring Boot] WebSocket과 채팅 (2) - SockJS [Spring Boot] WebSocket과 채팅 (1)

dev-gorany.tistory.com

 

 

[WebSocketConfig]

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Value("${local.rabbitmq.username}")
    private String rabbitMqUsername;

    @Value("${local.rabbitmq.password}")
    private String rabbitMqPassword;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setPathMatcher(new AntPathMatcher("."));
        registry.setApplicationDestinationPrefixes("/pub");

        registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue")
                .setRelayHost("localhost")
                .setRelayPort(61613)
                .setSystemLogin(rabbitMqUsername)
                .setSystemPasscode(rabbitMqPassword)
                .setClientLogin(rabbitMqUsername)
                .setClientPasscode(rabbitMqPassword);
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws/aper-chat")
                .setAllowedOriginPatterns("*") // have to change allowed origins
                .withSockJS();
    }

//    @Override
//    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
//        messageConverters.add(jacksonMessageConverter());
//        return false;
//    }
//
//    @Bean
//    public MappingJackson2MessageConverter jacksonMessageConverter() {
//        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
//        ObjectMapper objectMapper = new ObjectMapper();
//        objectMapper.registerModule(new JavaTimeModule());
//        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
//        converter.setObjectMapper(objectMapper);
//        return converter;
//    }
}
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Value("${local.rabbitmq.username}")
    private String rabbitMqUsername;

    @Value("${local.rabbitmq.password}")
    private String rabbitMqPassword;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setPathMatcher(new AntPathMatcher("."));
        registry.setApplicationDestinationPrefixes("/pub");

        registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue")
                .setRelayHost("localhost")
                .setRelayPort(61613)
                .setSystemLogin(rabbitMqUsername)
                .setSystemPasscode(rabbitMqPassword)
                .setClientLogin(rabbitMqUsername)
                .setClientPasscode(rabbitMqPassword);
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws/aper-chat")
                .setAllowedOriginPatterns("*") // have to change allowed origins
                .withSockJS();
    }

}

 

 

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.setPathMatcher(new AntPathMatcher("."));
    registry.setApplicationDestinationPrefixes("/pub");

    registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue")
            .setRelayHost("localhost")
            .setRelayPort(61613)
            .setSystemLogin(rabbitMqUsername)
            .setSystemPasscode(rabbitMqPassword)
            .setClientLogin(rabbitMqUsername)
            .setClientPasscode(rabbitMqPassword);
}

우선 MessageBroker를 작성해 주었다. 

enableStompBrokerRelay로 /queue /topic /exchange /amq/queue를 허용해줘서, rabbitmq의 4가지 메시지 처리 방식이 가능하도록 하였다. 또한 Login과 Passcode 설정을 통해서, websocket에서 rabbitmq에 접근을 허용할 수 있도록 하였다.

 

[RabbitConfig]

@Configuration
@EnableRabbit
public class RabbitConfig {

    @Value("${local.rabbitmq.username}")
    private String rabbitMqUsername;

    @Value("${local.rabbitmq.password}")
    private String rabbitMqPassword;
    private static final String CHAT_QUEUE_NAME = "chat.queue";
    private static final String CHAT_EXCHANGE_NAME = "chat.exchange";
    private static final String ROUTING_KEY = "room.*";

    @Bean
    public Queue queue() {
        return new Queue(CHAT_QUEUE_NAME, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(CHAT_EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }


    @Bean
    public MessageListenerAdapter listenerAdapter(StompRabbitController stompRabbitController) {
        return new MessageListenerAdapter(stompRabbitController, "receive");
    }


    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true);
        objectMapper.registerModule(dateTimeModule());

        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean
    public JavaTimeModule dateTimeModule() {
        return new JavaTimeModule();
    }
}

 

코드를 메서드 하나씩 확인하면서 이해해보자.

@Bean
public Queue queue() {
    return new Queue(CHAT_QUEUE_NAME, true);
}

지속되는 큐를 하나 만들어주었다. durable을 true로 설정하여 서버가 꺼지더라도 유지될 수 있도록 하였다.

이 큐는 현재는 사용되고 있지 않지만, 추후에 client가 websocket에 연결되어 있지 않을 때, 메시지를 임시로 저장하는 과정 중에 이용할 계획을 가지고 있다.

 

@Bean
public TopicExchange exchange() {
    return new TopicExchange(CHAT_EXCHANGE_NAME);
}

교환기 또한 설정해 주었다.

 

@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}

또한 binding을 설정해 주어 추후에 새로 생성되는 queue에 대해서 exchange를 binding 할 수 있도록 하였으며, routing key를 이용해서 메시지를 맞는 queue에 전송할 수 있도록 구성하였다.

 

@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true);
    objectMapper.registerModule(dateTimeModule());

    return new Jackson2JsonMessageConverter(objectMapper);
}

@Bean
public JavaTimeModule dateTimeModule() {
    return new JavaTimeModule();
}

 

jsonMessageConverter 작성을 통해, 메시지를 송수신할 때, 메시지를 Json으로 또, Java 객체로 변환하는 것을 도와준다.

즉 직렬화/역직렬화를 도와준다.

 

만약에 jsonMessageConverter를 설정해 주지 않으면 아래와 같은 에러를 뱉게 된다.

2024-10-16T16:56:34.572+09:00 ERROR 4476 --- [aper_chat_back] [nboundChannel-7] .WebSocketAnnotationMethodMessageHandler : Unhandled exception from message handler method

java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.sparta.aper_chat_back.chat.dto.MessageDto
	at org.springframework.amqp.support.converter.SimpleMessageConverter.createMessage(SimpleMessageConverter.java:143) ~[spring-amqp-3.1.6.jar:3.1.6]
	at org.springframework.amqp.support.converter.AbstractMessageConverter.createMessage(AbstractMessageConverter.java:88) ~[spring-amqp-3.1.6.jar:3.1.6]
	at org.springframework.amqp.support.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:70) ~[spring-amqp-3.1.6.jar:3.1.6]
	at org.springframework.amqp.support.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:58) ~[spring-amqp-3.1.6.jar:3.1.6]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.convertMessageIfNecessary(RabbitTemplate.java:1892) ~[spring-rabbit-3.1.6.jar:3.1.6]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1188) ~[spring-rabbit-3.1.6.jar:3.1.6]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1181) ~[spring-rabbit-3.1.6.jar:3.1.6]
	at com.sparta.aper_chat_back.chat.controller.StompRabbitController.enter(StompRabbitController.java:34) ~[main/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:567) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler.handleMatch(SimpAnnotationMethodMessageHandler.java:529) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler.handleMatch(SimpAnnotationMethodMessageHandler.java:93) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:522) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:457) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:152) ~[spring-messaging-6.1.11.jar:6.1.11]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]

에러 코드를 조금 분석해 보면, SimpleMessageConverter가 처리할 수 있는 데이터 타입은 String, Serializable 인터페이스를 구현한 객체들만 지원한다는 의미이다. 

즉, 직렬화를 해서 rabbitMq로 보내주어야지, rabbitMq가 이를 처리할 수 있다.

 

    @Bean
    public MessageListenerAdapter listenerAdapter(StompRabbitController stompRabbitController) {
        return new MessageListenerAdapter(stompRabbitController, "receive");
    }

listenerAdapter은 rabbitMQ 큐에서 수신된 메시지를 자동으로 지정된 메서드로 전달해주는 역할을 한다.

따라서, annotation @RabbitListener를 설정해 두면, 해당 메서드는 rabbitMQ에서 수신된 메시지를 받을 수 있게 된다.

아래는 그 메서드 이다.

@RabbitListener(queues = CHAT_QUEUE_NAME)
public void receive(MessageDto msg) {
    System.out.println("received : " + msg.getMessage());
}

이를 통해서 console에 로그를 찍어볼 수 있다.

 

[StompRabbitController]

@Controller
public class StompRabbitController {

    private final RabbitTemplate template;

    private final static String CHAT_EXCHANGE_NAME = "chat.exchange"; // will be used when the other not joined
    private final static String CHAT_QUEUE_NAME = "chat.queue";
    private final ChatService chatService;


    public StompRabbitController(RabbitTemplate template, ChatService chatService) {
        this.template = template;
        this.chatService = chatService;
    }

    @MessageMapping("chat.enter.{chatRoomId}")
    public void enter(MessageDto chat, @DestinationVariable Long chatRoomId) {
        String sysMsg = "입장하셨습니다.";
        chat.setMessage(sysMsg);
        chat.setRegDate(LocalDateTime.now());
        template.convertAndSend("amq.topic", "room." + chatRoomId, chat);
    }

    @MessageMapping("chat.message.{chatRoomId}")
    public void send(MessageDto chat, @DestinationVariable Long chatRoomId) {
        chat.setRegDate(LocalDateTime.now());
        chatService.saveMessage(chat)
                .doOnSuccess(savedMessage -> {
                    template.convertAndSend("amq.topic", "room." + chatRoomId, chat);
                })
                .subscribe();
    }

    @RabbitListener(queues = CHAT_QUEUE_NAME)
    public void receive(MessageDto msg) {
        System.out.println("received : " + msg.getMessage());
    }
}

 

 

프론트에서 

stomp.subscribe(`/topic/room.${chatRoomId}`, function (content) {
    // 메시지 처리 로직
});

이렇게 topic/room.1 에 대한 구독을 할 수 있다. 

 

    @MessageMapping("chat.enter.{chatRoomId}")
    public void enter(MessageDto chat, @DestinationVariable Long chatRoomId) {
        String sysMsg = "입장하셨습니다.";
        chat.setMessage(sysMsg);
        chat.setRegDate(LocalDateTime.now());
        template.convertAndSend("amq.topic", "room." + chatRoomId, chat);
    }

    @MessageMapping("chat.message.{chatRoomId}")
    public void send(MessageDto chat, @DestinationVariable Long chatRoomId) {
        chat.setRegDate(LocalDateTime.now());
        chatService.saveMessage(chat)
                .doOnSuccess(savedMessage -> {
                    template.convertAndSend("amq.topic", "room." + chatRoomId, chat);
                })
                .subscribe();
    }

 

백 엔드에서는 template.convertAndSend를 이용해서 해당 topic을 구독하고 있는 client에게 메시지를 보낼 수 있다.

 

send에서는 기존에 생성해 두었던, chatService.saveMessage를 이용해서

    public Mono<ChatMessage> saveMessage(MessageDto messageDto) {
        String collectionName = "chat_" + messageDto.getChatRoomId();
        ChatMessage chatMessage = new ChatMessage(messageDto);
        return reactiveMongoTemplate.save(chatMessage, collectionName);
    }

메시지를 mongosh에 저장할 수 있도록 하였다.

 

[MessageDto]

@Getter
public class MessageDto {
    private Long id;
    @JsonProperty("chatRoomId")
    private Long chatRoomId;

    @JsonProperty("senderId")
    private Long memberId;

    @JsonProperty("message")
    private String message;

    @JsonProperty("content")
    private String content;

    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    private LocalDateTime regDate;


    @JsonCreator
    public MessageDto(
            @JsonProperty("chatRoomId") Long chatRoomId,
            @JsonProperty("senderId") Long memberId,
            @JsonProperty("message") String message,
            @JsonProperty("regDate") LocalDateTime regDate) {
        this.id = 1L;
        this.chatRoomId = chatRoomId;
        this.memberId = memberId;
        this.message = message;
        this.regDate = regDate;
    }


    public void setRegDate(LocalDateTime now) {
        this.regDate = LocalDateTime.now();
    }

    public void setMessage(String sysMsg) {
        this.message = sysMsg;
    }

}

 

또한 이용하는 Dto에 대해서 , @Json annotation을 적용해 주었다.

 

 

3. 결과 확인

서로 다른 유저에 대해서,

 

 

이런식으로 채팅이 전달되는 것을 확인할 수 있다.

728x90