미소를뿌리는감자의 코딩

Webflux + mongodb 를 통한 채팅 구현 본문

프로젝트

Webflux + mongodb 를 통한 채팅 구현

미뿌감 2024. 5. 23. 12:39
728x90

https://veluxer62.github.io/tutorials/spring-data-mongodb-reactive-tutorial/

 

Spring WebFlux - Spring Data MongoDB Reactive tutorial

Introduction

veluxer62.github.io

 

implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'

이 의존성을 추가해 줌으로 인해 비동기적으로 접근하는 것을 가능하게 한다. "reactive"

따라서 비동기적 처리가 가능한 Webflux와 mongodb가 잘 맞는다. 

 

비동기적 -> 작업이 완료될 때까지 기다리지 않겠다.

논블로킹 방식 -> thread를 blocking하지 않겠다. 

 

 

[mongoTemplate vs. mongoRepository]

 

https://medium.com/@prekshaan95/difference-between-spring-boot-mongotemplate-and-mongorepository-6da468bd716a

 

Difference between spring boot mongoTemplate and mongoRepository.

Both Spring MongoTemplate and MongoRepository are part of the Spring Data MongoDB framework and provide different approaches for…

medium.com

 

mongoTemplate를 이용해서, mongoDB 작동에 전체 권한. 복잡한 쿼리. 업데이트. java object와 mongodb document와 매핑도 할 수 있음. 

 

[기존 코드]

package com.sparta.webfluxchat.repository;

import com.sparta.webfluxchat.entity.Message;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import reactor.core.publisher.Flux;

public interface MessageRepository extends ReactiveMongoRepository<Message, String> {
    Flux<Message> findByOrderByTimestampAsc();
}

 

reactivemongorepository를 사용해서 채팅 데이터를 저장하고 출력하는데, 사용하였다.

하지만, 다른 채팅방마다 다른 collection을 사용하고 싶었기 때문에, mongoTemplate로의 변환이 필요해졌다. 

 

package com.sparta.webfluxchat.config;

import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;

@Configuration
public class MongoConfig {
    @Bean
    public ReactiveMongoTemplate reactiveMongoTemplate() {
        return new ReactiveMongoTemplate(MongoClients.create("mongodb://localhost:27017"), "test");
    }
}

 

따라서 mongoTemplate에 대한 configuration을 작성해주었고, 이를 기반으로 채팅방마다 collection을 만들고 해당 채팅방에서 나온 채팅을 특정 collection에 저장해나갔다. 

 

[Mono, Flux]

처음에는 Mono와 Flux가 어떤 방식으로 비동기적 처리를 하는지에 대한 이해가 잘 되지 않았었다. 

https://khdscor.tistory.com/m/118

 

Spring boot With MongoDB - WebFlux를 통해 비동기적으로 MongoDB에 접근해보자.

들어가기 이전에 채팅방 채팅 내용을 저장하기 위해서 MongoDB를 사용해 본 적이 있었다. Gradle 의존성을 'spring-boot-starter-data-mongodb'로 설정하였었는데, 알고 보니 'spring-boot-starter-data-mongodb-reactive'

khdscor.tistory.com

 

이 블로그에서 설명한 비동기적 처리 과정을 보고 이해가 잘 되었다. 

 

즉, 반환 type이 Mono인 메서드에 대해서, 해당 메서드가 호출 되었을 때, 바로 Mono 객체를 반환한다. 또한, 반환 받은 controller method는 종료한다. 종료된 것과 별개로 data base에서는 요청에 대한 처리를 완료하고, 이를 Mono 객체가 결과를 수신하게 된다.

이를 통해, 비동기적 처리가 가능하게 된다. 

 

 

그렇다면, Mono와 Flux의 차이점은 어떻게 될까.

https://naveen-metta.medium.com/mastering-reactive-programming-with-project-reactors-mono-and-flux-in-java-6a1a81a01f71

 

Mastering Reactive Programming with Project Reactor’s Mono and Flux in Java

Introduction:

naveen-metta.medium.com

 

이 블로그를 참고해서 보았다.

 

정의로 살펴보자면, 둘 다 publisher type 중 하나로

Mono : 0~1개의 item을 반환하기 위해 쓰이며, 

Flux : 0~N개의 item을 반환하기 위해 쓰인다.

 

또한 사용할 수 있는 많은 메서드 중 대표적인 것들 또한 예시 코드와 함께 소개해 준다.

 

[코드]

 

우선 /send 요청의 controller 코드와 service 코드를 확인해 보도록 하자.

 

@GetMapping("/send")
public Mono<Void> sendMessage(@RequestParam String message,
                              @RequestParam Long roomnumber,
                              @AuthenticationPrincipal UserDetailsImpl userDetails) {
    return chatService.saveMessage(message, roomnumber, userDetails.getUser().getId())
            .doOnNext(msg -> sink.tryEmitNext(msg))
            .then();
}

 

return 에서, service에서 .saveMessage를 호출하게 된다. 해당 코드를 먼저 확인해보자. 

 

public Mono<Message> saveMessage(String message, Long roomnumber, Long userId) {
    Message msg = new Message(message, System.currentTimeMillis(), userId);
    String collectionName = "messages_room_" + roomnumber;
    return reactiveMongoTemplate.save(msg, collectionName);
}

 

우선 새로 들어온 message에 대한 내용을 바탕으로 Message 클래스의 새로운 인스턴스를 생성한다.

이후 채팅방 번호를 이용해서 어떤 collection에 정보를 저장해 줄건지 결정하게 된다.

 

또한 .save 메서드의 반환 type은 Mono 이다.

public <T> Mono<T> save(T objectToSave, String collectionName) {
    Assert.notNull(objectToSave, "Object to save must not be null");
    Assert.hasText(collectionName, "Collection name must not be null or empty");
    EntityOperations.AdaptibleEntity<T> source = this.operations.forEntity(objectToSave, this.mongoConverter.getConversionService());
    return source.isVersionedEntity() ? this.doSaveVersioned(source, collectionName) : this.doSave(collectionName, objectToSave, this.mongoConverter);
}

 

이는 저장된 객체를 단일 결과로 반환하는 역할을 하게 된다. 

 

다시 controller 코드로 돌아와서, 

msg -> sink.tryEmitNext(msg) 에 대해서 알아보자. 

메세지 객체에 대해서 sink를 통해 다른 구독자에게도 방출이 되도록 하는 코드이다.

 

이후 .then()을 통해 작업이 완료가 되었음을 알릴 수 있다.

 

 

그렇다면 sink 가 무엇인가?

Sinks API로 비동기 이벤트의 발행과 구독을 제어하는데 유용한 API이다.

 

 

다음으로 /chat 요청의 controller와 service 코드를 확인해 보도록 하자.

 

@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Message> streamMessages(@RequestParam("roomId") Long roomId) {
    return chatService.findMessagesByRoomId(roomId)
            .mergeWith(sink.asFlux().delayElements(Duration.ofMillis(200)));
}

 

MediaType.TEXT_EVENT_STREAM_VALUE 는 응답의 미디어 type이 text/event-stream 인 것을 나타내고 SSE (server sent events)를 사용하여 지속적으로 이벤트를 클라이언트에게 보낼 수 있도록 한다.

 

private final Sinks.Many<Message> sink;

 

우선 sink.asFlux()에 대해서 알아보자면, 

sink는 Sinks.Many<Message> type이다. 즉, 구독자들에게 메시지를 전달하는 역할을 하며, 

asFlux()는 sink를 flux 로 변환하여 여러 구독자들이 이를 구독할 수 있도록 한다.

 

delay ... 는 메시지 간의 방출 시간 텀을 설정하여, 너무 빠르게 방출되지 않도록 하고자 함이다.

 

mergeWith은 2개의 flux를 병합하는 역할으로 

findMessageByRoomId에서 반환된 기존에 있던 메시지와 실시간으로 sink에서 방출되는 메시지를 병합하여 하나의 Flux<Message> 로 만드는 역할을 한다. 

 

public Flux<Message> findMessagesByRoomId(Long roomId) {
    String collectionName = "messages_room_" + roomId;
    logger.info("Finding messages in collection: {}", collectionName);
    return reactiveMongoTemplate.findAll(Message.class, collectionName);
}

 

service 코드는 간단하다. reactiveMongoTemplate에서 해당되는 collection의 메시지들을 다 찾아서 반환해준다.

728x90