RabbitMQ - Channel Basic Consume

2024. 2. 2. 05:08·📘 Backend/기능 개발

📘 RabbitMQ - Channel Basic Consume

이전에 RabbitMQ를 이용한 데이터 파이프라인을 개발 했었는데,

단일 서버의 RabbitMQ Connection을 연결할 때 RabbitConfig를 이용해 Quorum Queue의 데이터를 Subscribe 했었습니다.


이전 포스팅 글

  • RabbitMQ - Publish & Subscribe MQTT Data
  • RabbitMQ - Publish & Subscribe on Linux

이번 요구사항은 여러 서버에 분산된 RabbitMQ들을 클러스터링 하지않고,

Spring Boot 서버에서 멀티 스레딩과 Thread Safe한 방식으로,

각각의 RabbitMQ Connection Factory에 연결 후 각 스레드에서 Channel을 Basic Consume 하는 방식으로 개발합니다.


그 이유는 Quorum Queue에 들어오는 데이터의 양이 많아(약 10분에 1~20000개),

채널의 수를 늘려서 분산 시킴으로써 메시지 처리 성능을 효율적으로 증가시킬 수 있습니다.


이떄 중요한 점은 각 Connection Factory는 각각의 Thread에서 동작하며 ConcurrentHashMap을 사용하여

동시성을 보장할 수 있지만, 동기화 문제가 발생할 수 있어 다른 스레드에서 다른 Connection Factory로

접근하는 일이 없게 잘 설계해야 합니다.


Channel Basic Consume 방식

이 방식은 Qurum Queue에서 데이터를 Direct로 Receive 하지 않고

RabbitMQ Channel을 이용해 DeliveryCallBack, CancelCallBack 인터페이스를 구현해

Basic Consume 한 데이터의 전처리, 후처리 작업을 합니다.

Spring Security의 API Filter와 비슷한 역할이라고 보시면 됩니다.

  • 각 서버당 RabbitMQ Connection Factory를 Thread Safe한 ConcurrentHashMap에 넣어 각각의 스레드로 연결
  • 연결된 RabbitMQ의 Queue 개수에 맞는 RabbitMQ Channel을 생성해 실행하여 데이터를 Consume 합니다.

📘 Channel Consume

📕 RabbitMQ Channel의 basicConsume()을 이용해 데이터를 받는 이유

RabbitMQ Channel을 Consume하려면 DeliveryCallBack / CancelCallBack 인터페이스를 구현한 클래스를 basicConsume()의 파라미터로 주어야 합니다.

이유는, DeliveryCallBack에서 Channel의 Queue에서 받아온 메시지의 가공을 처리하며,

Consume이 실패한 메시지에 대한 처리는 CancelCallBack 에서 처리하기 때문입니다.


📕 ConcurrentHashMap 사용 이유

그리고 각 ConnectionFactory, Connection, Channel들은 Thread Safe한 ConcurrentHashMap을 사용합니다.

ConcurrentHashMap을 사용한 이유는 전체 Map에 대한 Lock을 사용하는 것이 아닌,

세그먼트 별로 Lock을 거는 구조이기 때문에 동기화에 드는 부담을 줄일 수 있습니다.


즉, 스레드간 경합을 최소화하고 동시성을 높이며 각 세그먼트 간 충돌을 최소화 시키는게 주 이유입니다.


📕 ApplicationConfig

Spring WebClient의 Connection TimeOut을 설정해주고,

어플리케이션에서 동시에 실행시킬 CorePoolSize, MaxPoolSize, QueueCapacity를 어플리케이션의 부하량을 측정하게 적절하게 넣어줍니다.


TaskExecutor를 HealthCheck 로직과 Counting 로직에서 따로 쓸 수 있게 executor를 2개로 분리하였습니다.

/**  
 * @author 신건우  
 * Spring WebClient / Task Exucutor Bean 등록  
 */  
@EnableAsync  
@Configuration  
@RequiredArgsConstructor  
public class ApplicationConfig {  
    private final Props props;  

    @Bean  
    public WebClient webClient() {  
        HttpClient httpClient = HttpClient.create()  
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)  
                .responseTimeout(Duration.ofMillis(5000))  
                .doOnConnected(i ->  
                        i.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))  
                        .addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS)));  

        return WebClient.builder()  
                .clientConnector(new ReactorClientHttpConnector(httpClient))  
                .build();  
    }  

    @Bean  
    public TaskExecutor executor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(props.getCorePoolSize());  
        executor.setMaxPoolSize(props.getMaxPoolSize());  
        executor.setQueueCapacity(props.getQueueCapacity());  
        executor.setThreadNamePrefix(props.getNamePrefix());  
        executor.initialize();  

        return executor;  
    }  

    @Bean  
    public TaskExecutor healthCheckExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(2);  
        executor.setMaxPoolSize(3);  
        executor.setQueueCapacity(4);  
        executor.setThreadNamePrefix("Health-Check-");  
        executor.initialize();  

        return executor;  
    }  
}

📕 RabbitService

아래 코드에서는 1개의 Connection Factory에 대한 코드만 작성하였지만,

실제론 각 RabbitMQ가 존재하는 Server의 ID를 Entity화해서 Loop를 돌며 서버당 10개의 Channel을 생성하고 있음

/**  
 * @author 신건우  
 * RabbitMQ Connection 생성과 Queue당 1개의 Channel 생성 후 Channel 당 1개의 스레드 할당  
 */  
@Slf4j  
@Service  
@RequiredArgsConstructor  
public class RabbitService {  
    private final TaskExecutor executor;  
    private final Map<Integer, ConnectionFactory> connectionFactoryMap = new ConcurrentHashMap<>();  
    private final Map<Integer, Connection> connectionMap = new ConcurrentHashMap<>();  
    private final Map<Integer, List<Channel>> channelMap = new ConcurrentHashMap<>();  
    private final Map<Integer, String> queueNameMap = new ConcurrentHashMap<>();  
    private final EventDeliveryCallBack eventDeliveryCallBack;  
    private final EventCancelCallBack eventCancelCallBack;  
    private final Props props;  

    @PostConstruct  
    public void init() {  
        log.info("==================== RabbitMQ Connection 초기화 시작 ====================");  
        this.connectRabbitMQ();  
        this.listenEvent();  
        this.checkConnectionAndChannel();  
        log.info("==================== RabbitMQ Connection 초기화 완료 ====================");  
    }  

    // Message Listener  
    public void listen(final Channel channelParam, String queueName) {  
            try {  
                channelParam.basicConsume(queueName, true, eventDeliveryCallBack, eventCancelCallBack);  
            } catch (Exception e) {  
                log.error("[Consume Queue] Consume Failed - Exception : {}, Cause : {}", e.getMessage(), e.getCause());  
            }  
    }  

    /* Listen Thread 생성 */    
    private void listenEvent() {  
        List<Channel> channelList = channelMap.get(1);  

        for (int i = 0; i < props.getQueues().size(); i++) {  
            EventThread thread = new EventThread(this, channelList.get(i), props.getQueues().get(i));  
            executor.execute(thread);  
        }  
    }  

    /* RabbitMQ Connection & Channel 생성 */    
    private void connectRabbitMQ() {  
        // TODO 1: Queue Name을 Map에 넣기  
        for (int i = 0; i < props.getQueues().size(); i++) {  
            queueNameMap.put(i + 1, props.getQueues().get(i));  
            log.info("RabbitMQ Queue 등록 - Queue Name : {}", props.getQueues().get(i));  
        }  

        // TODO 2: Connection Factory 생성 (1개만 필요)  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost(props.getHost());  
        factory.setPort(props.getPort());  
        factory.setUsername(props.getUsername());  
        factory.setPassword(props.getPassword());  
        connectionFactoryMap.put(1, factory);  
        log.info("RabbitMQ Connection Factory Created - Host : {}, Port : {}", props.getHost(), props.getPort());  

        // TODO 3: Connection Factory에서 Connection을 1개만 만들기  
        connectionFactoryMap.forEach((key, connectionFactory) -> {  
            Connection connection = null;  
            try {  
                connection = factory.newConnection();  
                connectionMap.put(1, connection);  
                log.info("RabbitMQ Connection Created");  
            } catch (Exception e) {  
                log.error("RabbitMQ Connection 생성 실패 - {}", e.getMessage());  
            }  

            // TODO 3-1: 이미 채널이 오픈되어 있다면 채널 종료  
            try {  
                List<Channel> channels = channelMap.get(1);  

                if (channels != null && channels.size() > 0) {  
                    channels.stream().forEach(channel -> {  
                        if (channel != null && channel.isOpen()) {  
                            try {  
                                channel.close();  
                            } catch (Exception e) {  
                                log.warn("Create RabbitMQ Connect & Channel Close Error - {}", e.getMessage());  
                            }  
                        }  
                    });  
                    channelMap.remove(1);  
                }  

                // TODO 3-2: 1개의 Connection에 QueueNameMap의 숫자만큼 채널 생성  
                List<Channel> channelList = new ArrayList<>();  

                for (int i = 1; i <= props.getQueues().size(); i++) {  
                    Channel channel = connection.createChannel();  
                    channelList.add(channel);  
                    log.info("RabbitMQ Channel {} Created", i);  
                }  

                channelMap.put(1, channelList);  


            } catch (Exception e) {  
                log.error("Rabbit Connection Failed : {}", e.getMessage());  
                e.printStackTrace();  
            }  
        });  
    }  

    /**  
     * RabbitMQ 서버의 Connection 을 주기적으로 체크하여 연결이 끊겼을 경우 재연결 처리  
     */  
    private void checkConnectionAndChannel() {  
        for (Map.Entry<Integer, Connection> entry : connectionMap.entrySet()) {  
            executor.execute(() -> {  
                while (true) {  
                    this.connectRabbitMQ();  

                    CmnCode cmnCode = cmnCodeService.findCmnCode(ApplicationConstants.CVEDIA_RABBITMQ_CONNECTION_MONITORING_INTERVAL);  

                    int sleep = 10000;  

                    if (cmnCode != null) {  
                        sleep = Integer.valueOf(cmnCode.getCodeValue());  
                    }  

                    try {  
                        Thread.sleep(sleep);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        }  
    }  
}

📕 EventDeliveryCallBack

실질적으로 데이터를 받고 가공하는 로직을 여기에 작성하였습니다.

다른 로직은 볼 필요 없으며, DeliverCallBack 인터페이스를 구현하여 handle() 함수에 로직을 작성합니다.

/**  
 * @author 신건우  
 * RabbitMQ Channel에서 받은 Event를 변환 후 21번 Tomcat 서버로 HTTP API 요청  
 * 이벤트 Label을 차량 (V), 사람 (P), 자전거 (B), 사람/자전거 (A)로 분류  
 * <p>  
 * RabbitMQ Queue Name = Instance Name과 동일함 -> Cam01  
 * Table Column Name = "B", "V", "P" 를 이름 뒤에 붙임  
 * Instance Wire Name = ex) Cam01-P01 , Cam01-B01, Cam01-V01  
 */
@Slf4j  
@Service  
@RequiredArgsConstructor  
public class EventDeliveryCallBack implements DeliverCallback {  
    private final RestApiService restApiService;  
    private final JsonParser jsonParser;  
    private final CountService countService;  
    private static final String IN = "IN";  

    @Override  
    public void handle(String consumerTag, Delivery message) throws IOException {  
        String routingKey = message.getEnvelope().getRoutingKey(); // RabbitMQ Topic과 동일함, Cvedia Instance 이름과 동일하게 설정  
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);  
        Object dto = this.mapToDto(msg);  

        /**  
         * TODO 1: Event Data 변환 & API 전송 & H2 DB 저장  
         *     TODO 1-1: Event의 TimeStamp를 Asia/Seoul이 아닌 UTC로 변환  
         *     TODO 1-2: WireClass의 종류, Wire 이름 구하기  
         *     TODO 1-3: wireName에 맞는 Count 객체를 가져와 Count값을 증가시키기 위함  
         *     TODO 1-4: API를 요청할 때 RoutingKey 뒤에 각각 다른 문자 할당 + Count 수치 증가  
         * TODO 2: 받은 Event를 용도에 맞는 DTO로 매핑  
         * TODO 3: 시간을 iso8601 형식의 UTC로 변환 - 반환값 형식 : yyyy-mm-ddTHH:mm:ssZ  
         * TODO 4: WireClass에 따라 Routing Key에 다른 이니셜 붙임  
         * TODO 5: Cvedia에서 나온 수치들을 Request API를 위한 메세지에 매핑  
         * TODO 6: 이벤트 메시지 변환이 끝나고 마지막 API 요청으로 보낼 Body  
         */  
        if (dto instanceof TripwireDto event) {  
            String inOut = "Counter_01";  

            // TODO 1-1: Event의 TimeStamp를 Asia/Seoul이 아닌 UTC로 변환  
            String eventTime = convertEventTime(event.getSystem_timestamp()); // EventTime -> UTC Time  

            // TODO 1-2: WireClass의 종류, Wire 이름 구하기  
            String objectClass = event.getEvents().get(0).getExtra().getWireClass(); // Person, Vehicle(Bike, Car)  
            String lineName = event.getEvents().get(0).getExtra().getTripwire().getName(); // LineName = Cam01-A01, Cam01-A02  |  Cam02-V01, Cam02-V02  
            String lineLabel = lineName.substring(6, 7); // P or V or B or A  
            String newCameraNameForSaveH2DB = ""; // H2 DB & 뷰어에 저장될 새로운 카메라 이름  

            // TODO 1-3: wireName에 맞는 Count 객체를 가져와 Count값을 증가시키기 위함  
            Count count = null;  
            int i = Integer.parseInt(lineName.substring(7));  
            String num = "";  

            // TODO 1-4: API를 요청할 때 RoutingKey 뒤에 각각 다른 문자 할당 + Count 수치 증가  
            //  1번 조건문 : Bike + Person (A) Line에 "Person"이 카운팅 됐을 경우  
            //  2번 조건문 : Bike + Person (A) Line에 "Bike"가 카운팅 됐을 경우  
            //  3번 조건문 : Car Line에 "Car"가 카운팅 됐을 경우  
            //  4번 조건문 : "Person"만 카운팅하는 카메라의 경우  
            // ex) name|Counter_01/count|0/event|2023-11-30T11:11:11Z/CAMERA ID|Cam01-P01  
            if (lineLabel.equals("A") && objectClass.equals("Person")) {  
                if (i < 10) {  
                    num = "0" + i;  
                    newCameraNameForSaveH2DB = "NW01C0" + num;  
                } else {  
                    newCameraNameForSaveH2DB = "NW01C0" + i;  
                }  
                count = countService.getOne(newCameraNameForSaveH2DB);  
                count.setCount(count.getCount() + 1);  

            // ex) name|Counter_01/count|0/event|2023-11-30T11:11:11Z/CAMERA ID|Cam01-B01 //  
            } else if (lineLabel.equals("A") && objectClass.equals("Vehicle")) {  
                if (i < 10) {  
                    num = "0" + (i+1);  
                    newCameraNameForSaveH2DB = "NW01C0" + num;  
                } else {  
                    newCameraNameForSaveH2DB = "NW01C0" + (i + 1);  
                }  
                count = countService.getOne(newCameraNameForSaveH2DB);  
                count.setCount(count.getCount() + 1);  
            }  

            // ex) name|Counter_01/count|1/event|2023-11-30T11:11:11Z/CAMERA ID|Cam01-V01  
            else if (lineLabel.equals("V") && objectClass.equals("Vehicle")) {  
                if (i < 10) {  
                    num = "0" + i;  
                    newCameraNameForSaveH2DB = "NW01C0" + num;  
                } else {  
                    newCameraNameForSaveH2DB = "NW01C0" + i;  
                }  
                count = countService.getOne(newCameraNameForSaveH2DB);  
                count.setCount(count.getCount() + 1);  
            }  

            // ex) name|Counter_01/count|0/event|2023-11-30T11:11:11Z/CAMERA ID|Cam01-P01  
            else if (lineLabel.equals("P") && objectClass.equals("Person")) {  
                if (i < 10) {  
                    num = "0" + i;  
                    newCameraNameForSaveH2DB = "NW01C0" + num;  
                } else {  
                    newCameraNameForSaveH2DB = "NW01C0" + i;  
                }  
                count = countService.getOne(newCameraNameForSaveH2DB);  
                count.setCount(count.getCount() + 1);  
            }  

            countService.updateCount(count);  
            requestResultToApi(inOut, count.getCount(), eventTime, newCameraNameForSaveH2DB);  
            log.info("\uD83D\uDE2F\uD83D\uDE2F\uD83D\uDE2F\uD83D\uDE2F\uD83D\uDE2F : {}", newCameraNameForSaveH2DB);  
        }  
    }  

    /* -------------------- Util -------------------- */  
    // TODO 2: 받은 Event를 용도에 맞는 DTO로 매핑  
    private Object mapToDto(final String msg) {  
        Object msgObject = null;  

        try {  
            msgObject = jsonParser.mapJson(msg);  
        } catch (Exception e) {  
            log.error("[RabbitMQ Delivery] DTO Mapping 실패 - {}", e.getMessage());  
        }  

        return msgObject;  
    }  

    // TODO 3: 시간을 iso8601 형식의 UTC로 변환 - 반환값 형식 : yyyy-mm-ddTHH:mm:ssZ    
    private String convertEventTime(long time) {  
        return Instant.ofEpochSecond(time).atZone(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"));  
    }  

    // TODO 4: WireClass에 따라 Routing Key에 다른 이니셜 붙임  
    private void requestResultToApi(String inOut, int count, String eventTime, String cameraName) {  
        String convertedMessage = convertResult(inOut, count, eventTime, cameraName);  
        String result = getBodyResult(convertedMessage);  

        restApiService.request(result);  
        log.info("[{}] 데이터 전송 - 방항: {}, 시간: {}", cameraName, IN, eventTime);  
    }  

    // TODO 5: Cvedia에서 나온 수치들을 Request API를 위한 메세지에 매핑  
    //  ex) name|Counter_01/count|0/event|2023-11-30T11:11:11Z/CAMERA ID|Cam01-B  
    private String convertResult(String inOut,  
                                 int count,  
                                 String eventTime,  
                                 String cameraName) {  
        return "name|" + inOut + "/" + "count|" + count + "/event|" + eventTime + "/" + "CAMERA ID|" + cameraName;  
    }  

    // TODO 6: 이벤트 메시지 변환이 끝나고 마지막 API 요청으로 보낼 Body    
    private String getBodyResult(String convertedMessage) {  
        return "--------------------------fc94942040fa9be1\n" +  
                "Content-Disposition: form-data; name=\"eventinfo\"\n" +  
                "Content-Type: text/plain\n\n" +  
                convertedMessage + "\n" +  
                "--------------------------fc94942040fa9be1--";  
    }  
}

📕 EventCancelCallBack

RabbitMQ의 데이터 Consume에 실패했을시 호출되는 콜백입니다.

/**  
 * @author 신건우  
 * RabbitMQ Channel Consume이 취소 됬을때 호출되는 콜백  
 */  
@Slf4j  
@Service  
public class EventCancelCallBack implements CancelCallback {  
    @Override  
    public void handle(String consumerTag) throws IOException {  
        log.warn("RabbitMQ Consumer Canceled - {}", consumerTag);  
    }  
}

📕 EventThread

RabbitMQ 채널을 별도의 스레드로 실행시키기 위한 EventThread 입니다.

/**  
 * @author 신건우  
 * Consume RabbitMQ Channel  
 */
 @Slf4j  
@AllArgsConstructor  
public class EventThread extends Thread {  
    private RabbitService rabbitService;  
    private Channel channel;  
    private String queueName;  

    @Override  
    public void run() {  
        rabbitService.listen(channel, queueName);  
        log.info("RabbitMQ Channel {} Thread Start", channel.getChannelNumber());  
    }  
}

📘 Util

📕 RestApiService

Spring WebClient를 이용해 Request를 Block 하지 않고 Subscribe하여 Async한 Request를 보내는 Service입니다.

/**  
 * @author 신건우  
 * Spring WebClient를 이용한 Async Rest API Service  
 */
@Slf4j  
@Service  
@RequiredArgsConstructor  
public class RestApiService {  
    private final WebClient webClient;  
    private final Props props;  

    // TODO 1: 7번서버 Tomcat - CameraVcasysDataInpController.wd로 요청  
    public void request(String data) {  
//        CountDownLatch latch = new CountDownLatch(1);  

        Flux.just(0)  
                .flatMap(i -> {  
                    return webClient.post()  
                            .uri(props.getRequestUrl())  
                            .contentType(MediaType.TEXT_PLAIN)  
                            .body(BodyInserters.fromValue(data))  
                            .retrieve()  
                            .bodyToMono(Void.class)  
                            .onErrorResume(e -> {  
                                log.error("Request API Failed : {}", e.getMessage());  
                                return Mono.error(e);  
                            });  
                }).subscribe(response -> {  
                    log.info("API Request - Success");  
                });  
    }  

    // TODO 2: Cvedia Instance Health Check API  
    public Mono<String> getRequest(final String uri) {  
        return webClient.get().uri(uri).retrieve().bodyToMono(String.class);  
    }  

    public Mono<String> postRequest(final String uri, final Object data) {  
        return webClient.post().uri(uri).bodyValue(data).retrieve().bodyToMono(String.class);  
    }  
}

📕 JsonParser

RabbitMQ의 Queue에서 나온Json 규격에 맞는 DTO를 파싱하는 서비스입니다.

JsonNode의 Depth에 있는 필드에 따라 다른 DTO로 변환합니다.

/**  
 * @author 신건우  
 * Json Parsing Service  
 */
@Slf4j  
@Service  
@RequiredArgsConstructor  
public class JsonParser {  
    private final ObjectMapper mapper;  

    public Object mapJson(String data) throws Exception {  
        if (data == null || data.isEmpty()) {  
            throw new IllegalAccessException("Json is Empty");  
        }  

        JsonNode rootNode = mapper.readTree(data);  
        JsonNode classNode = rootNode.path("events");  

        // Event 노드가 존재하면 SecuRT Event        
        if (!classNode.isMissingNode()) {  
            JsonNode imgNode = classNode.get(0).path("image");  
            JsonNode subClassNode = classNode.get(0).path("extra").path("class");  

            // Event 노드 중 class가 있으면 SecuRT - Tripwire Event            
            if (!subClassNode.isMissingNode()) {  
                return this.parseTripwireEvent(data);  
            }  
            // Event 노드 중 class가 없으면 SecuRT - Area Crowd Event            
            else {  
                // class가 없는 데이터 중 image 필드가 존재하면 SecuRT - Area Crowd (Image) Event      
                if (!imgNode.isMissingNode()) {  
                    return this.parseCrowdImageEvent(data);  
                } else {  
                    return this.parseCrowdEvent(data);  
                }  
            }  
        }  
        // Event 노드가 없으면 Estimation Event        
        else {  
            return this.parseEstimationEvent(data);  
        }  
    }  

    /* ------------------------------ 각 Event별 Parsing ------------------------------ */    
    private TripwireDto parseTripwireEvent(final String data) {  
        TripwireDto tripwireDto = null;  

        try {  
            tripwireDto = mapper.readValue(data, TripwireDto.class);  
        } catch (JsonMappingException e) {  
            log.error("[Json] Tripwire 파싱 실패 with JsonMappingException - {}", e.getMessage());  
        } catch (JsonProcessingException e) {  
            log.error("[Json] Tripwire 파싱 실패 with JsonProcessingException - {}", e.getMessage());  
        }  

        return tripwireDto;  
    }  

    private AreaCrowdDto parseCrowdEvent(final String data) {  
        AreaCrowdDto areaCrowdDto = null;  

        try {  
            areaCrowdDto = mapper.readValue(data, AreaCrowdDto.class);  
        } catch (JsonMappingException e) {  
            log.error("[Json] AreaCrowd 파싱 실패 with JsonMappingException - {}", e.getMessage());  
        } catch (JsonProcessingException e) {  
            log.error("[Json] AreaCrowd 파싱 실패 with JsonProcessingException - {}", e.getMessage());  
        }  

        return areaCrowdDto;  
    }  

    private AreaCrowdImageDto parseCrowdImageEvent(final String data) {  
        AreaCrowdImageDto areaCrowdImageDto = null;  

        try {  
            areaCrowdImageDto = mapper.readValue(data, AreaCrowdImageDto.class);  
        } catch (JsonMappingException e) {  
            log.error("[Json] AreaCrowdImage 파싱 실패 with JsonMappingException - {}", e.getMessage());  
        } catch (JsonProcessingException e) {  
            log.error("[Json] AreaCrowdImage 파싱 실패 with JsonProcessingException - {}", e.getMessage());  
        }  

        return areaCrowdImageDto;  
    }  

    private EstimationDto parseEstimationEvent(final String data) {  
        EstimationDto estimationDto = null;  

        try {  
            estimationDto = mapper.readValue(data, EstimationDto.class);  
        } catch (JsonMappingException e) {  
            log.error("[Json] Estimation 파싱 실패 with JsonMappingException - {}", e.getMessage());  
        } catch (JsonProcessingException e) {  
            log.error("[Json] Estimation 파싱 실패 with JsonProcessingException - {}", e.getMessage());  
        }  

        return estimationDto;  
    }  
}

📘 HealthCheck Thread

AI Engine의 분석 Instance가 죽으면 특정 sec마다 자동으로 재실행 시키는 API 요청을 보내는 데몬 스레드입니다.

/**  
 * @author 신건우  
 * Cvedia Instance Health Check Thread  
 * Status 4 = 실행중, 나머지는 실행중이 아님  
 */  
@Slf4j  
@Service  
@RequiredArgsConstructor  
public class InstanceHealthCheck extends Thread {  
    private final TaskExecutor healthCheckExecutor;  
    private final ObjectMapper mapper;  
    private final RestApiService restApiService;  
    private final Props props;  

    @PostConstruct  
    public void init() {  
        this.monitoringInstanceConnection();  
    }  

    @Override  
    public void run() {  
        while (true) {  
            props.getInstances().stream().forEach(name -> {  
                try {  
                    String uri = props.getCvediaUrl() + "/api/instance/get?instance_name=" + name;  
                    String instanceStatement = restApiService.getRequest(uri).block();  
                    InstanceDto[] instances = mapper.readValue(instanceStatement, InstanceDto[].class);  

                    if (instances != null && instances.length > 0) {  
                        Arrays.stream(instances).forEach(instance -> {  

                            if (instance.getState() == 4) {  
                                log.info("Instance 상태 : 실행 중");  
                            }  

                            if (instance.getState() == 0 || instance.getState() == 1 || instance.getState() == 3 || instance.getState() == 5) {  
                                String startUri = props.getCvediaUrl() + "/api/instance/start";  

                                InstanceDto requestBody = new InstanceDto();  
                                requestBody.setInstanceName(instance.getInstanceName());  
                                requestBody.setSolution(instance.getSolution());  

                                try {  
                                    String requestBodyStr = mapper.writeValueAsString(requestBody);  
                                    restApiService.postRequest(startUri, requestBodyStr).block();  

                                    log.info("Instance 시작 - 인스턴스 이름 : {}", requestBody.getInstanceName());  
                                } catch (Exception e) {  
                                    log.error("Instance 시작 실패 with Exception : {}", e.getMessage());  
                                }  
                            }  
                        });  
                    }  
                } catch (Exception e) {  
                    log.warn("Instance Monitoring Failed with an Exception : {}", e.getMessage());  
                }  
            });  

            try {  
                Thread.sleep(10000);  
            } catch (InterruptedException e) {  
                log.info("===== Instance Health Check Thread 종료 =====");  
            }  
        }  
    }  

    private void monitoringInstanceConnection() {  
        this.setUncaughtExceptionHandler((t, e) -> {  
            log.error("Instance Health Check Thread - 치명적인 오류 발생 : {}", e.getMessage());  
        });  
        this.setDaemon(true);  
        healthCheckExecutor.execute(this);  
    }  
}
저작자표시 (새창열림)

'📘 Backend > 기능 개발' 카테고리의 다른 글

Recording RTSP Stream  (0) 2024.07.29
Nginx Static Contents Server  (4) 2024.07.23
Access Control System - 전체 코드 리팩터링 & 기능 테스트 / 추가  (9) 2023.10.13
부하테스트 - Event 기반 Data Simulator 개발  (2) 2023.09.26
'📘 Backend/기능 개발' 카테고리의 다른 글
  • Recording RTSP Stream
  • Nginx Static Contents Server
  • Access Control System - 전체 코드 리팩터링 & 기능 테스트 / 추가
  • 부하테스트 - Event 기반 Data Simulator 개발
신건우
신건우
조용한 개발자
  • 신건우
    우주먼지
    신건우
  • 전체
    오늘
    어제
    • 분류 전체보기 (422)
      • 📘 Frontend (71)
        • Markup (1)
        • Style Sheet (2)
        • Dart (8)
        • Javascript (12)
        • TypeScript (1)
        • Vue (36)
        • React (2)
        • Flutter (9)
      • 📘 Backend (143)
        • Java (34)
        • Concurrency (19)
        • Reflection (1)
        • Kotlin (29)
        • Python (1)
        • Spring (42)
        • Spring Cloud (5)
        • Message Broker (5)
        • Streaming (2)
        • 기능 개발 (5)
      • 💻 Server (6)
        • Linux (6)
      • ❌ Error Handling (11)
      • 📦 Database (62)
        • SQL (31)
        • NoSQL (2)
        • JPQL (9)
        • QueryDSL (12)
        • Basic (4)
        • Firebase (4)
      • ⚙️ Ops (57)
        • CS (6)
        • AWS (9)
        • Docker (8)
        • Kubernetes (13)
        • MSA (1)
        • CI & CD (20)
      • 📚 Data Architect (48)
        • Data Structure (10)
        • Algorithm (8)
        • Programmers (17)
        • BaekJoon (5)
        • CodeUp (4)
        • Design Pattern (4)
        • AI (0)
      • ⚒️ Management & Tool (8)
        • Git (7)
        • IntelliJ (1)
      • 📄 Document (10)
        • Project 설계 (6)
        • Server Migration (3)
      • 📄 책읽기 (2)
        • 시작하세요! 도커 & 쿠버네티스 (2)
      • 🎮 Game (4)
        • Stardew Vally (1)
        • Path of Exile (3)
  • 블로그 메뉴

    • 링크

      • Github
    • 공지사항

    • 인기 글

    • 태그

      Lock #Thread #Concurrency
      React #Markdown
      GStreamer #Pipeline
    • 최근 댓글

    • 최근 글

    • hELLO· Designed By정상우.v4.10.0
    신건우
    RabbitMQ - Channel Basic Consume
    상단으로

    티스토리툴바