📦 Database/NoSQL

Redis Publish/Subscribe Pattern (Spring Data Redis)

신건우 2023. 9. 13. 15:07

구현 요구사항

  • 딥러닝 엔진에서 MQTT 데이터 전송
  • 클러스터링/데이터 미러링된 RabbitMQ 클러스터 5대 노드에서 각 라우팅 키에 맞게 Exchange -> Routing Key -> Quorum Queue에 메시지들이 쌓이고 Slave Queue에 메시지가 미러링 됩니다.
  • Redis 설치
  • Backend(Spring Data Redis)에서 미러링 된 Slave Queue에서 데이터를 가져와서 Redis Pub/Sub 채널을 열어줍니다.
  • Spring Redis(Backend)에서 소켓을 열어줍니다.
  • 소켓의 URL은 WebSocketConfig 클래스에 나온것처럼 ws://localhost:18080/ws입니다.
  • 프론트엔드(React/TypeScript 환경)에서 백엔드의 소켓에 접속해 데이터를 받아옵니다.

Spring Data Redis Pub-Sub 공식 문서

My Github Repository

RabbitMQ 등 다른 메시지 큐도 많은데 Redis의 Pub/Sub 기능을 사용하는 이유는,

성능Push Based Subscription 방식 때문입니다.

이벤트를 저장하지 않기 때문에 속도가 매우 뺴르며 Publish한 데이터를 저장할 필요가 없습니다.

그리고 Pub/Sub뿐 아니라 다양한 Json을 주고 받을 떄 저장이 필요한데 이 경우도 RedisJson을 활용하는 등 활용할 기능이 많습니다.


두 번째로는 Push Based Subscription입니다.

Kafka의 경우 Subscriber가 Pull 요청을 보내야 Message를 받아오는데,

Redis는 Publisher가 Publish하면 자동으로 모든 Subscriber에게 Message를 BroadCasting 합니다.


📕 Redis vs Kafka

Redis

  • 모든 Subscriber에게 BroadCasting을 원하고, 데이터를 저장하지 않을 때
  • 속도가 중요하고, 데이터 손실을 감수할 수 있는 경우
  • 시스템에서 전송된 메시지를 보관하는 것을 원하지 않을 경우 (성능)
  • 처리할 데이터의 양이 크지 않을 때

Kafka

  • 신뢰성 있는 메시지의 전달 보장을 위할 때
  • 메시지 소비 후에도 큐의 메시지를 복제 & 보관하기 원할 때
  • 속도가 큰 문제가 아닌 경우
  • 데이터의 크기가 클 경우

Redis 설치

도커 컨테이너 사용 X


📕 설치

Debian 기반

apt-get -y update
apt-get -y upgrade
apt-get -y install redis-server firewalld
systemctl start redis-server && systemctl enable redis-server

firewall-cmd --permanent --add-port=6379/tcp && firewall-cmd --permanent --add-service=redis && firewall-cmd --reload

RPM 기반

dnf -y update
dnf -y upgrade
dnf -y install redis-server firewalld
systemctl start redis-server && systemctl enable redis-server

firewall-cmd --permanent --add-port=6379/tcp && firewall-cmd --permanent --add-service=redis && firewall-cmd --reload

📕 각종 설정값 변경 => /etc/redis/redis.conf

# 최대 메모리 사양 
# -> 최대 사용 메모리 사양을 256mb로 설정한다. 단위는 mb나 g 등 필요에 맞춰 적어주면된다. 
maxmemory 256mb 

# maxmemory 에 설정된 용량을 초과했을때 삭제할 데이터 선정 방식
# - noeviction : 쓰기 동작에 대해 error 반환 (Default)
# - volatile-lru : expire 가 설정된 key 들중에서 LRU algorithm 에 의해서 선택된 key 제거
# - allkeys-lru : 모든 key 들 중 LRU algorithm에 의해서 선택된 key 제거
# - volatile-random : expire 가 설정된 key 들 중 임의의 key 제거
# - allkeys-random : 모든 key 들 중 임의의 key 제거
# - volatile-ttl : expire time(TTL)이 가장 적게 남은 key 제거 (minor TTL)
maxmemory-policy volatile-ttl

# 프로세스 포트 
# -> port 부분은 초기에 주석처리가 되어 있는데, 디폴트 값으로 6379 포트에서 동작한다. 
# -> 만약, 6379가 아닌 다른 포트를 설정하고 싶다면 주석을 해제하고 포트번호를 입력하면된다. 
port 1234 

# 외부접속 허용 
# -> 기본 실행 환경은 localhost(127.0.0.1)로 되어있다. 
# -> 만약, 모든 외부접속에 대한 허용을 하고 싶다면, 0.0.0.0 으로 변경하면 된다. 
bind 0.0.0.0 

# 비밀번호 설정 
# -> 서버 접속에 비밀번호를 적용시키고 싶다면 아래와 같이 수정하자. 
requirepass [접속 패스워드 입력] 

# DB 데이터를 주기적으로 파일로 백업하기 위한 설정입니다.
# Redis 가 재시작되면 이 백업을 통해 DB 를 복구합니다.
#save 900 1      # 15분 안에 최소 1개 이상의 key 가 변경 되었을 때
#save 300 10     # 5분 안에 최소 10개 이상의 key 가 변경 되었을 때
#save 60 10000   # 60초 안에 최소 10000 개 이상의 key 가 변경 되었을 때

비밀번호를 암호화해서 넣고 싶다면 아래 명령어로 암호화 해서 지정 가능합니다.

# 암호화된 비밀번호가 필요하다면, 터미널에 다음 명령어로 생성 가능하다. 
echo "MyPassword" | sha256sum

Spring Data Redis (BackEnd)

📕 build.gradle Dependency 추가

// Redis
implementation'org.springframework.boot:spring-boot-starter-data-redis'

// WebSocket
implementation 'org.springframework.boot:spring-boot-starter-websocket'

// Project Reactor  
implementation 'io.projectreactor.netty:reactor-netty-http'

📕 WebSocketConfig

WebSocketMessageBrokerConfigurer : 인터페이스를 구현해 STOMP로 메시지 처리 구성합니다.

configureMessageBroker() 함수 : 메시지를 중간에서 라우팅할 때 사용하는 메시지 브로커를 구성하는 함수입니다.

  • 보통 /topic, /queue를 사용합니다.
  • /topic은 한명이 Message를 발행했을 때 해당 토픽을 구독하고 있는 N명에게 메시지를 브로드캐스팅 할 때 사용합니다.
  • /queue는 한명이 Message를 발행했을 때 발행한 1명에게 다시 정보를 보내는 경우에 사용합니다.
    enableSimpleBrokerRelay
  • 해당 주소를 구독하는 클라이언트에게 메시지를 보냅니다.
  • 즉, 인자에는 구독 요청의 prefix를 넣고, 클라이언트에서 1번 채널을 구독하고자 할 때는 /sub/1 형식과 같은 규칙을 따라야 합니다.
    setApplicationDestinationPrefixes
  • 메시지 발행 요청의 prefix를 넣습니다.
  • /로 시작하는 메시지만 해당 Broker에서 받아서 처리하고, 클라이언트에서 WebSocket에 접속할 수 있는 endpoint를 지정합니다.
  • 만약 /app으로 설정한다면, 실제 구독 신청 URL은 /app/topic처럼 시작 URL을 지정합니다.

registerStompEndpoints() 함수 : Socket Endpoint를 등록하는 함수입니다.

  • ws라는 Endpoint에 Interceptor를 추가해 Socket을 등록합니다.
@Configuration  
@RequiredArgsConstructor  
@EnableWebSocketMessageBroker  
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {  

    @Override  
    public void configureMessageBroker(MessageBrokerRegistry config) {  
        // 이 토픽을 구독하면 Subscriber 들에게 메시지를 브로드캐스팅 함  
        config.enableStompBrokerRelay("/topic");  

        // 메시지 발행 요청할 때 사용  
        config.setApplicationDestinationPrefixes("/");  
    }  

    @Override  
    public void registerStompEndpoints(StompEndpointRegistry registry) {  
        // WebSocket 연결 엔드포인트 설정, ex) ws://localhost:18080/ws  
        registry  
                .addEndpoint("/ws")  
                .setAllowedOriginPatterns("http://localhost:3000")  
                .addInterceptors(new HttpSessionHandshakeInterceptor());  
    }  
}

📕 HttpHandshakeInterceptor

  • 웹소켓은 처음 Connect 시점에 Handshake라는 작업이 수행됩니다.
  • Handshake 과정은 HTTP 통신 기반으로 이루어지며 GET 방식으로 통신을 하게 됩니다.
  • 이때, HTTTP Request Header의 Connection 속성은 Upgrade로 되어야 합니다.
  • HTTP에 존재하는 Session을 WebSocket Session으로 등록합니다, SESSION 변수는 static 변수로 String 타입입니다.
public class HttpHandshakeInterceptor implements HandshakeInterceptor {  
    @Override  
    public boolean beforeHandshake(ServerHttpRequest request,   
                                   ServerHttpResponse response,   
                                   WebSocketHandler wsHandler,   
                                   Map<String, Object> attributes) throws Exception {  
        if (request instanceof ServletServerHttpRequest servletRequest) {  
            HttpSession session = servletRequest.getServletRequest().getSession();  
            attributes.put(SESSION, session);  
        }  
        return true;  
    }  

    @Override  
    public void afterHandshake(ServerHttpRequest request,   
                               ServerHttpResponse response,   
                               WebSocketHandler wsHandler,   
                               Exception exception) {}  
}

📕 RedisController

  • @MessageMapping 발행 경로를. @SendTo or @SendToUser를 사용하면 구독 경로를 지정합니다.
  • @MessageMapping URL은 WebSocketConfig에서 설정한 DestinationPrefixes를 뺸 URL을 입력해주면 됩니다.
  • 만약 DestinationPrefixes를 /app이라고 가정하고 나머지 URL은 /topic/message라고 가정하면,
  • ex) @MessageMapping("topic/message)" 처럼 /app을 뺸 나머지 URL을 입력하면 됩니다.

  • 즉, @MessageMapping에 설정한 URL로 클라이언트로부터 요청 메시지를 받으면, @SendTo로 설정한 URL을 구독한 클라이언트들에게 메시지를 보냅니다.
  • @SendTo는 1:N으로 메시지를 보낼때 사용하는 구조이며, 보통 경로가 /topic으로 시작합니다.
  • @SendToUser 는 보통 1:1로 메시지를 보낼때 사용하는 구조이며, 보통 경로가 /queue로 시작합니다.
@Controller  
@RequiredArgsConstructor  
public class RedisController {  
    private final RedisTemplate<String, Object> template;  

    @MessageMapping("/topic/message")  
    @SendTo("/topic/message")  
    public String getData() {  
        return Objects.requireNonNull(template.opsForValue().get("데이터")).toString();  
    }  
}

📕 RedisConfig

RedisConnectionFactory

  • application.yml에 지정된 Redis의 접속정보로 Connection을 합니다.

MessageListenerAdapter

  • Topic을 Subscribe하는 Listener들을 정의합니다.
  • 이 Listener는 밑에서 설명하겠습니다.

RedisMessageListenerContainer

  • Redis Channel(Topic)로 부터 메시지를 받고, 주입된 리스너들에게 비동기로 Dispatch 하는 역할을 수행하는 컨테이너입니다.
  • 즉, 발행된 메시지 처리를 위한 리스너들을 설정할 수 있습니다.

MessageListenerAdaper\

  • RedisMessageListenerContainer로부터 메시지를 Dispatch 받고, 실제 메시지를 처리하는 비즈니스 로직이 담긴 Subscriber Bean을 추가 해 줍니다.

RedisTemplate

  • Redis서버와 상호작용하기 위한 RedisTemplate 관련 설정을 해 줍니다.
  • Redis 서버에는 bytes 코드만이 저장되므로 key와 value에 Serializer를 설정해 줍니다.
  • Json 포맷 형식으로 메시지를 교환하기 위해 ValueSerializer에 Jackson2JsonRedisSerializer로 설정해 줍니다.

Topic

  • 공유를 위해 Channel Topic을 빈으로 등록해 단일화 시켜줍니다.
@Configuration  
public class RedisConfig {  

    @Value("${spring.data.redis.host}")  
    private String host;  

    @Value("${spring.data.redis.port}")  
    private int port;  

    // Redis 연결 설정  
    @Bean  
    public RedisConnectionFactory factory() {  
        return new LettuceConnectionFactory(host, port);  
    }  

    @Bean  
    public MessageListenerAdapter listener(RedisSubscriber subscriber) {  
        return new MessageListenerAdapter(subscriber, "onMessage");  
    }  

    // Redis Channel(Topic)로 부터 메시지를 받고, 주입된 리스너들에게 비동기로 Dispatch 하는 역할  
    // Pub & Sub을 처리하는 Listener    @Bean  
    public RedisMessageListenerContainer listenerContainer() {  
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();  
        container.setConnectionFactory(factory());  
        return container;  
    }  

    // 어플리케이션에서 사용할 Redis Template    @Bean  
    public RedisTemplate<String, Object> template() {  
        RedisTemplate<String, Object> template = new RedisTemplate<>();  
        template.setConnectionFactory(factory());  
        template.setKeySerializer(new StringRedisSerializer());  
        template.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));  
        return template;  
    }  

    @Bean  
    ChannelTopic topic() {  
        return new ChannelTopic("message");  
    }  
}

📕 RedisMessageReceiver

위 Redis Config의 MessageListenerAdapter에 추가되는 Listener 클래스 입니다.

receive() 함수는 메시지를 발행하는 역할입니다.

Redis Template를 이용해 들어온 메시지를 변환하여 수신합니다.

@Slf4j  
@Service  
@RequiredArgsConstructor  
public class RedisMessageReceiver {  

    private final RedisTemplate<String, Object> template;  

    public void receive(String message) {  
        template.convertAndSend("channel", message);  
    }  
}

📕 RedisSubscriber

  • Redis로부터 온 메시지를 역직렬화하여 메시지를 Topic 명과 함께 전달합니다.
  • onMessage() 함수는 메시지를 구독 및 전달하는 함수입니다.
@Service  
@RequiredArgsConstructor  
public class RedisSubscriber implements MessageListener {  

    private final RedisTemplate<String, Object> template;  

    // Redis로부터 온 메시지를 역직렬화 하여 메시지 전달  
    @Override  
    public void onMessage(Message message) {  
        String publishMessage = template.getStringSerializer().deserialize(message.getBody());  
        assert publishMessage != null;  
        template.convertAndSend("/topic/message", publishMessage);  
    }  
}

📕 실행 결과

😯 RabbitMQ 서버

딥러닝 엔진과 백엔드 서버를 실행시키고 채널이 오픈 되며 잠잠하던 그래프에 변동이 생겼습니다.

Exchange를 거쳐 맞는 Routing Key를 가진 Quorum Queue에 MQTT 데이터가 쌓입니다.



😯 Spring Log

아래 사진은 백엔드 서버를 키고 딥러닝 엔진 돌려서 RabbitMQ에 있는 데이터를 Spring Redis가 가져와서 스프링 로그를 찍은 사진입니다.

  • RabbitMQ 서버의 amqp 포트인 5672와 guest 계정으로 잘 데이터를 받아왔으며,
  • 프론트엔드와 소켓이 Connect된 것을 확인할 수 있습니다.

이제 프론트엔드에서 Redis Channel을 Subscribe 해새 데이터를 소켓을 이용해 넘겨보겠습니다.


React (FrontEnd)

RabbitMQ Quorum Queue에 쌓인 데이터를 백엔드의 Redis에서 받아서 Pub/Sub으 구조로

Redis Channel을 Subscribe하고 백엔드 <-> 프론트엔드 소켓을 열어 데이터를 받아옵니다.


Broker URL을 위에 나온 것처럼 ws://localhost:18080/ws로 설정해주었고,

Subscribe Topic은 /topic/message를 주었습니다.

import React, { useEffect, useState } from 'react';  
import {Client} from '@stomp/stompjs';  

interface RedisState {  
    messages: string[];  
    subscribed: boolean;  
    client: Client;  
}  

const RedisSocketSubscriber: React.FC<RedisState> = () => {  
    const [messages, setMessages] = useState<string[]>([]);  
    const [subscribed, setSubscribed] = useState(false);  
    const [client, setClient] = useState<Client>();  

    // Life Cycle Hooks  
    useEffect(() => {  
        subscribeToRedis();  
        return () => {  
            unSubscribeFromRedis();  
        };  
    }, []);  

    // 구독 함수  
    const subscribeToRedis = () => {  
        const client = new Client({  
            brokerURL: 'ws://localhost:18080/ws',  

            debug: (str: string) => {  
                console.log(str);  
            },  
        });  

            console.log('Socket Connected');  

            // 1번째 파라미터로 Redis Subscribe명, 2번째는 콜백 함수  
            client.subscribe('/topic/message', (frame) => {  
                    const newMessage = `Test - Redis: ${frame.body}`;  
                    setMessages((prevMessages) => [...prevMessages, newMessage]);  
                },  
                {  
                    id: 'Test-Subscribe',  
                });  
            setSubscribed(true);  
        };  

        // 오류 메시지의 세부 정보 출력  
        client.onStompError = (frame) => {  
            console.error('STOMP error', frame.headers['message']);  
            console.log('Error Details:', frame.body);  
        };  

        setClient(client);  
        client.activate();  
    };  

    // 구독 해제 함수, 버튼을 클릭하면 구독을 해제함  
    const unSubscribeFromRedis = () => {  
        if (client) {  
            client.unsubscribe('Test-Subscribe');  
            setClient(null);  
            setSubscribed(false);  
        }  
    };  

    return (  
        <div>  
            <h2>Redis Listener</h2>  
            <ul>  
                {messages.map((message, index) => (  
                    <li key={index}>  
                        <p>{message}</p>  
                    </li>  
                ))}  
            </ul>  
            {!subscribed ? (  
                <button onClick={subscribeToRedis}>Subscribe</button>  
            ) : (  
                // 구독 중일 때 해지 버튼  
                <button onClick={unSubscribeFromRedis}>Unsubscribe</button>  
            )}  
        </div>  
    );  
};  

export default RedisSocketSubscriber;

Stomp Client Header에 x-queue-type, x-message-ttl, autoConfirm 옵션을 준 이유는,

RabbitMQ의 쿼럼 큐는 메시지를 받고 ACK를 보내야 하는데 임시로 전부 ACK를 날리게 헤더에 설정했고,

Message-TTL과 Message-Type은 쿼럼 큐의 Arguments와 맟춰준 것이며, 안맟춰주면 소켓이 안열리게 됩니다.

현재 Header 옵션을 주면 에러가 나긴 하지만 헤더를 비우면 일단 잘 데이터를 잘 받아오긴 합니다.

헤더 로직 수정 후 내용 수정하겠습니다.


STOMP Header 옵션을 설정 안해줘도 Quorum Queue의 헤더 옵션이 잘 들어가는 것을 확인했습니다.

위 코드 그대로 사용하면 됩니다.



📕 실행 결과

😯 Web Socket Connected / Topic Subscription 성공

아래 사진을 보시면 소켓이 연결되고 구독 URL인 /topic/message를 구독해서 Message Sucscribe가 잘 된걸 볼 수 있습니다.


😯

Rabbit -> Redis -> Spring Data Redis(Socket) -> Frontend(Socket) 으로 Redis Pub/Sub을 이용해 데이터 전달에 성공했습니다!!

이제 중간에 Spring Data Redis에서 받은 데이터 중 원하는 데이터 필드만 뽑아 엔티티화 해서 MariaDB 저장 후 프론트로 넘기면 되는데 이건 생략 하겠습니다.