📘 ReentrantLock을 이용한 동시성 제어 (Mutex)
Spring Boot에서 동시성을 제어하는 방법은 여러가지가 있습니다.
JPA에서 사용할 수 있는 DB 레벨의 Lock 메커니즘은 비관적 락(Pessimistic Lock), 낙관적 락(Optimistic Lock) 이 있고,
Redis의 Lettuce (Spin Lock)방식, Redisson의 pub/sub 방식 등이 있습니다.
📘 동시성 제어에 Mutex를 활용하여 얻을수 있는 장점
저는 ReentLock을 이용한 Mutex를 구현하여 동시성을 제어하였습니다.
그 이유는 DB 레벨에서의 동시성 제어는 필요하지 않은 상황이고, 코드 레벨에서의 동시성 제어가 필요한 상황이며,
직접 Lock을 제어할 수 있어 더 세밀한 제어가 가능하기 떄문입니다.
오버헤드 감소
뮤텍스는 공유 자원에 대한 접근 권한을 요청할 때 필요한 시스템 리소스와 처리시간의 오버헤드가 적습니다.
반면, 세마포어는 내부적으로 여러 상태와 조건을 검사해야 하므로 더 많은 오버헤드가 발생할 수 있습니다.
우선순위 역전 문제 방지
뮤텍스는 우선순위 역전 문제를 해결하기 위한 메커니즘을 제공합니다.
낮은 우선순위의 스레드가 자원을 점유하고, 높은 우선순위의 스레드가 Block 되는 문제를 방지해 줍니다.
Dead Lock 방지
또한, ReentrantLock
은 재진입 가능한 구조로 설계되어 있어, 락을 보유하고 있는 스레드가 같은 락을 다시 획득할 수 있습니다.
이 특성은 스레드가 자신이 이미 보유하고 있는 락에 대해 DeadLock에 빠지는 것을 방지합니다.
세마포어는 이런 상황을 직접 컨트롤해야 하며, 복잡성이 증가하고 데드락 발생 가능성이 높아집니다.
그래서 오버헤드가 적고 세마포어보다 성능이 좋으므로 뮤텍스를 채용하였습니다.
📘 구현
단 하나의 스레드만이 공유 자원에 접근 가능하게 할 상황이 필요해 Mutex를 구현하는 방법으로 LeentrantLock을 사용하였습니다.
LockManager
- 특정 인스턴스 이름에 대한 Lock을 개별 관리하며 Thread-Safe한 ConcurrentHashMap을 사용합니다.
getLock()
: 주어진 인스턴스 이름에 대응하는 ReentrantLock을 반환하며, 해당 이름에 대한 락이 없다면 새로운 ReentrantLock을 생성해 Locks에 추가 후 반환합니다.
@Component
@Slf4j
public class LockManager {
private final Map<String, Lock> locks = new ConcurrentHashMap<>();
public Lock getLock(String instanceName) {
return locks.computeIfAbsent(instanceName, id -> new ReentrantLock());
}
}
EventDeliveryCallBack
- Redis 통계 데이터를 쌓는 클래스이며, 여러 스레드로 분산된 Message Queue Channel들의 Event Data를 Receive 합니다.
- Event Data를 받고, 데이터를 가공하기 전 Event의 Instance 이름에 대응하는 Lock을 거는데, 3초안에 Lock을 획득하지 못하면 false를 반환하게 설정 하였습니다.
- Lock이 Thread에서 모든 데이터 가공이 완료되고,. 마지막에
lock.unLock()
을 호출해 공유 자원 접근 권한을 반환합니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class EventDeliveryCallBack implements DeliverCallback {
private final JsonParser jsonParser;
private final LockManager lockManager;
private final RedisService redisService;
private final Props props;
@Override
public void handle(String s, Delivery message) throws IOException {
String routingKey = message.getEnvelope().getRoutingKey();
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
String currentAlarmLevel = "";
String newAlarmLevel = ApplicationConstants.ALARM_LEVEL_1;
int alarmLevel = 1;
String now = DateUtil.getTime();
List<EventDto> eventDtoList = this.mapToEventDto(msg, routingKey);
if (eventDtoList == null || eventDtoList.isEmpty()) {
log.warn("Basic Consume - Invalid Event : {}", msg);
return;
} else if (eventDtoList.size() == 1 && eventDtoList.get(0).getEventType() == EventType.UNKNOWN) {
log.warn("Basic Consume - Invalid Event : {}, {}", EventType.UNKNOWN.toStr(), msg);
return;
}
for (int i = 0; i < eventDtoList.size(); i++) {
EventDto eventDto = eventDtoList.get(i);
String eventTimeStr = String.valueOf(eventDto.getEventTime()).substring(0, 10);
eventDto.setEventTime(Long.parseLong(DateUtil.timestampToDate(Long.parseLong(eventTimeStr), ApplicationConstants.SEOUL_TIMEZONE)));
String eventTime = String.valueOf(eventDto.getEventTime());
if (!StringUtils.hasText(eventDto.getInstanceExtName())) {
log.warn("Basic Consume - Unknown Instance : {}", msg);
}
Lock lock = lockManager.getLock(eventDto.getInstanceName());
try {
if (lock.tryLock(3, TimeUnit.SECONDS)) {
// TODO 1: 현재 알람 레벨을 Redis에서 조회
currentAlarmLevel = redisService.getCurrentAlarmLevel(RedisConstants.INSTANCE_CURRENT_ALARM_LEVEL + eventDto.getInstanceName());
// TODO 2: Redis에 현재 알람 레벨이 없으면 MariaDB에서 조회
if (!StringUtils.hasLength(currentAlarmLevel)) {
redisService.setCurrentAlarmLevel(RedisConstants.INSTANCE_CURRENT_ALARM_LEVEL + eventDto.getInstanceName(), ApplicationConstants.ALARM_LEVEL_1);
}
redisService.setInstanceEventTime(RedisConstants.INSTANCE + eventDto.getInstanceName(), eventTime);
/* ===== 통계 생성용 데이터 Redis Hash에 저장 ===== */
if (eventDto.getPeopleCount() > 0) {
redisService.setPeopleCount(RedisConstants.INSTANCE_COUNT + eventDto.getInstanceName(), eventTime, eventDto.getPeopleCount());
}
// 현재 알람 레벨과 새로운 알람 레빌이 다를때
if (!newAlarmLevel.equals(currentAlarmLevel)) {
Set keys = redisService.getHashKeys(RedisConstants.INSTANCE_LATEST_ALARM_LEVEL + eventDto.getInstanceName());
// 새로운 알람레벨 이외의 다른 Hash 필드 삭제
if (keys != null && keys.size() > 0) {
for (Object key : keys) {
String hashKey = (String) key;
if (!hashKey.equals(newAlarmLevel)) {
redisService.deleteHashKey(RedisConstants.INSTANCE_LATEST_ALARM_LEVEL + eventDto.getInstanceName(), hashKey);
}
}
// 새로운 알람 레벨의 이벤트를 수신한 시간 조회
String alarmLatestTimeStamp = (String) redisService.getHashValue(RedisConstants.INSTANCE_LATEST_ALARM_LEVEL + eventDto.getInstanceName(), newAlarmLevel);
// 이미 알람 레벨 변경 처리를 한 경우 continue
if (StringUtils.hasText(alarmLatestTimeStamp)) {
if (!alarmLatestTimeStamp.equals("-1")) {
continue;
}
// 새로운 알람 레벨 이벤트를 수신한 시간이 현재 시간과 ApplicationConstants.CHECK_ALARM_INTERVAL의 시간만큼 차이가 난다면 새로운 알림으로 변경
// 알람 변경 처리를 완료했다면 Hash 필드의 값을 -1로 설정
if (DateUtil.getSecondsDifference(alarmLatestTimeStamp, now) > ApplicationConstants.CHECK_ALARM_INTERVAL) {
redisService.setCurrentAlarmLevel(RedisConstants.INSTANCE_CURRENT_ALARM_LEVEL + eventDto.getInstanceName(), newAlarmLevel);
redisService.setHashValue(RedisConstants.INSTANCE_LATEST_ALARM_LEVEL + eventDto.getInstanceName(), newAlarmLevel, "-1");
// MariaDB도 업데이트
// TODO ...
// 이미지 저장
if (StringUtils.hasLength(eventDto.getImage())) {
this.saveImageFile(eventDto.getImage(), routingKey);
}
}
} else {
// 알람 레벨 시간이 없을 경우 최초 알람 레벨 시간 저장 + Update MariaDB Alarm Level redisService.setHashValue(RedisConstants.INSTANCE_LATEST_ALARM_LEVEL + eventDto.getInstanceName(), newAlarmLevel, now);
}
} else {
// Redis Hash Key가 존재 하지 않을 경우 최초 알람 레벨 시간 저장 + Update MariaDB Alarm Level redisService.setHashValue(RedisConstants.INSTANCE_LATEST_ALARM_LEVEL + eventDto.getInstanceName(), newAlarmLevel, now);
}
} else {
// Update MariaDB Alarm Level
}
} else {
// Try Lock, 현재 알람 레벨과 수신한 알람 레벨이 같으면 Update MariaDB Alarm Level }
} catch (InterruptedException e) {
log.warn("BasicConsume - Interrupted Exception : {}", e.getMessage());
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
}
저는 따로 테스트 코드는 작성하지 않고 직접 Message Queue에 데이터를 Publish해서 확인 하였지만,
테스트 코드를 작성할 거라면 좀 더 간단한 예제의 코드를 사용해서,
CountDownLatch를 활용해 동시 요청을 보내 동시성을 검증하는 방법으로 진행하면 될 것 같습니다.
'📘 Backend > Java' 카테고리의 다른 글
Coarse-Grained Lock & Fine-Grained Lock (1) | 2024.03.24 |
---|---|
Java - Recursive를 이용한 File & Directory 제거 (0) | 2024.03.21 |
Java Serializable & Parsing (2) | 2023.10.02 |
Java Thread (0) | 2023.04.02 |
Java GC & Equality & Override/Overload (0) | 2023.04.02 |
열심히 살고 싶은 사람의 메모장
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!