미소를뿌리는감자의 코딩

Webflux+ WebSocket + mongosh + netty 을 이용한 채팅 기능 구현 본문

프로젝트

Webflux+ WebSocket + mongosh + netty 을 이용한 채팅 기능 구현

미뿌감 2024. 7. 18. 19:56
728x90

1. 개요

비동기 처리를 목적으로한 reactive programming을 구현하려고 한다.

 

실시간 채팅을 함에 있어서, 비동기 처리가 가능하게 하여, 서버가 DB로 부터 응답을 기다리는 동안 다른 작업을 하여 효율성 있는 서버를 구축하기 위함이다.

 

SSE(Server Sent Event) 연결이 아닌 WebSocket을 선택한 이유는 서버에서 클라이언트로 응답을 보내는 단방향 요청이 아닌, 실시간 채팅인 양방향을 목적으로 하고 있기 때문이다.

 

또한 webflux를 사용함으로 reactive library 또한 사용하여 준다. [ reactor 구현체의 객체로 mono, flux 사용 ]

 

서버 프레임워크로 비동기 처리 및 논블록킹 I/O 모델이 가능한 Netty를 선택하여 주었다.

일반적으로 Spring에 많이 사용되는 Tomcat을 사용하지 않은 이유는 전통적으로 블로킹 I/O 모델을 이용하기 때문이다. 또한 비동기 처리 성능이 Netty에 미치지 못하기 때문이다.

 

마지막으로 비관계형 데이터베이스인 mongosh를 사용하여, 관계형 데이터베이스보다 빠른 처리가 가능하도록 하였다.

 

 

2. 구현

먼저 ReactiveMongoTemplate를 이용하기 위해 이에 대한 Config를 작성해 준다.

@Configuration
public class MongoConfig {
    @Bean
    public ReactiveMongoTemplate reactiveMongoTemplate() {
        return new ReactiveMongoTemplate(MongoClients.create("mongodb://localhost:27017"), "test");
    }
}

 

또한 WebSocket Config도 작성해 준다.

configureMessageBroker은 메시지를 어떻게 받고 보낼 것인지 간단히 적어준 것이며, 

registerStompEndPoints를 어떤 url로 연결을 시킬 것인지 작성하여 준 것이다.

@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/greeting-websocket").setAllowedOrigins("*");
    }

}

 

 

다음으로 controller 부분이다.

    @GetMapping(  "/history")
    public Flux<Message> getChatHistory(@RequestParam Long roomId) {
        log.info("getChatHistory-Controller called!!!!");
        return chatService.getChatHistory(roomId).mergeWith(sink.asFlux().delayElements(Duration.ofMillis(200)));
    }

 

해당 API의 경우엔 채팅방 입장 시, 이전 채팅들을 불러오는 목적을 가진 API다.

이는 실시간 연결이 아닌, 서버에서 클라이언트쪽으로만 보내는 단방향 요청이기에, websocket 연결 전에 호출하여 주도록 한다.

[ 실시간 연결 후 사용한다면, thread의 낭비가 아닐까 싶다.. + 굳이 Flux를 사용해야할까? 생각해보아야할 부분이다.]

 

    @MessageMapping("/hello")
    public Mono<Message> greeting(HelloMessage message) {

        Long roomId = message.getRoomId();
        Long userId = message.getUserId();

        log.info("greeting called for message: {}", message);

        Mono<Message> savedMessage = chatService.saveSocketMessage(message.getMessage(), roomId, userId).cache();

        savedMessage.subscribe(msg -> {
            log.info("Sending message: {}", msg);
            messagingTemplate.convertAndSend("/topic/greetings/" + roomId, msg);
        });
        return savedMessage;
    }

 

다음으로는 메시지를 받고 보내는 것이다.

api/hello로 메시지가 전달되게 되면, 해당 채팅방의 이름으로 생성된 mongoDB에 채팅과 작성자에 대한 정보를 저장한다.

이때, 작성자에 대한 정보는 프론트에서 같이 전달시켜 주어야 한다.

handshaking 이후에는, 기존의 http 요청이 아니라 webSocket 프로토콜을 이용하기에 @AuthenticationPrincipal annotation을 사용할 수 없다.

 

이후 전달받은 메시지를 /topic/greetings/+roomId를 구독하고 있는 client에게 전파하여 준다.

 

다음으로는 Service 부분이다.

@Transactional
public Flux<Message> getChatHistory(Long roomId) {
    String collectionName = "messages_room_" + roomId;
    logger.info("Finding messages in collection: {}", collectionName);

    return reactiveMongoTemplate.findAll(Message.class, collectionName);
}

 

mongosh에서 채팅방에 대한 내용을 모두 찾아서 반환해주는 것이다.

 

@Transactional
public Mono<Message> saveSocketMessage(String message, Long roomnumber, Long id) {

    log.info("saveSocktMessage called with message: {}, roomnumber : {}, id: {}", message, roomnumber, id);
    User user = userRepository.findById(id).orElseThrow();

    Message msg = new Message(message, System.currentTimeMillis(), id, user.getUsername());
    String collectionName = "messages_room_" + roomnumber;
    return reactiveMongoTemplate.save(msg, collectionName);
}

 

다음으로는 reactiveMongoTemplate에 새로운 메시지를 저장해주는 service 코드이다.

.save로 반환되는 Mono<Message>를 다시 프론트에 전달하여 준다.

 

다음으로는 html 코드 부분이다.

-> 백엔드를 주로 공부하고 있어서, 프론트 코드는 조금 엉망이다.....ㅜㅜ...

const stompClient = new StompJs.Client({
    brokerURL: 'ws://localhost:8080/greeting-websocket'
});

우선적으로 WebSocket 연결을 하기 위한 url을 작성해 준다.

 

우선 기본적인 동작 원리는 원하는 websocket api를 구독하고, 해당 event에 메시지가 오면 출력하도록 하는 것이다. 

    const eventSource = new EventSource('/history?roomId=110');
    eventSource.onmessage = function(event) {
        const message = JSON.parse(event.data);
        $("#greetings").append("<li>" + message.username + ": " + message.content + "</li>");
    }

 

위 코드는 이전 채팅 내역을 불러오도록 하는 api 이다. [ 이를 Flux 반환이 아닌 그냥 List 반환을 해야하나 고민 중이다.]

 

stompClient.onConnect = (frame) => {

    setConnected(true);
    console.log('Connected: ' + frame);
    stompClient.subscribe('/topic/greetings/' + roomNumber, (greeting) => {
        const message = JSON.parse(greeting.body);
        showGreeting(message);
    });

};

 

이후 연결이 되면 실시간 채팅을 하기 위한 /topic/greetings/+roomNumber를 구독해준다.

해당 구독에서 메시지가 반환되게 되면, 

function showGreeting(message) {
    $("#greetings").append("<tr><td>" + message.username + "  " + message.content + "</td></tr>");
}

 

이렇게 메시지를 출력되도록 해준다.

 

메시지를 보내는 것은 /app/hello에다가 보내준다. 

function sendName() {
    var nameValue = $("#name").text();
    var message = $("#message").val();
    var userId = $("#userId").text();

    document.getElementById('message').value = '';

    stompClient.publish({
        destination: "/app/hello",
        body: JSON.stringify({'name': nameValue, 'message': message, 'roomId' : roomNumber, 'userId' : userId})
    });
}

이렇게 보내는 이의 이름, 아이디, roomId, 메시지를 포함해서 요청을 보낸다.

 

참고로 connect와 disconnect의 코드는 아래와 같다.

function connect() {
    stompClient.activate();
    console.log("Activated");
}

function disconnect() {
    stompClient.deactivate();
    setConnected(false);
    console.log("Disconnected");
}

 

이렇게 실제 독서 프로젝트에 적용시키기 전에 임시로 코드를 작성하고 테스트 해보았다.

Netty 서버가 아닌 Tomcat 서버로 적용시킨 것이어서, 이 부분은 실제 독서 프로젝트를 적용시키면서 다시 알아보아야할 것 같다.

728x90