📚 Semaphore - Multi Threading 작업 순서 제어 & 동기화
멀티쓰레드 환경에서 여러 쓰레드가 협력하여 순차적으로 작업을 처리해야 할 때, 동기화(Synchronization)는 매우 중요합니다.
이번 글에서는 Semaphore와 Custom Barrier 클래스를 활용하여
모든 쓰레드들이 첫 번째 작업을 전부 완료할 떄 까지 Semaphore를 이용해 Blocking
시키고 이후 두 번째 작업을 수행
하도록 동기화하는 방법을 작성 하겠습니다.
예시로 구현할 로직은 아래와 같습니다.
- 20개의 Thread로 진행
- 작업 1과 2가 있고 20개의 Thread가 작업 1을 모두 완료한 상태가 아니면 작업 2로 넘어가지 않음
- 각 작업 당 Thread들의 실행 순서는 중요하지 않고
20개의 Thread 모두 작업 1이 완료
된 후에만 작업 2로 넘어가기
📚 Barrier Class (Shared Resource)
20개의 Thread들이 공통으로 사용할 공용 리소스인 Barrier 클래스입니다.
Semaphore
- 0으로 초기화를 해서 다른 Thread에서 acquire()를 호출해도 대기상태로 Blocking 시킴
- 🧙♀️ Semaphore는 ReentrantLock과 다르게 다른 Thread에서 release()를 호출하는 것이 가능하다는걸 기억하기
counter 값
- 각 Thread가 작업을 수행 후 최대 Thread 수에 도달할 때 까지 counter를 증가시킵니다.
- counter를 계속 증가 시키다가 counter의 값이 numberOfWorkers와 동일해지면 (마지막 Thread의 작업 차례가 오면) Semaphore를 19개 release 합니다.
- release된 19개의 Thread는 모두 작업1이 완료된 상태이므로 다음 작업인 작업2로 넘어갑니다.
- 마지막 Thread의 작업이 종료 된 후 작업 2의 시작을 알리기 위해
========== 작업 끝 ==========
문자열을 출력 해주었습니다. - 🧙♀️ 왜 release()를 19개만 호출하냐? => 마지막 Thread는 acquire()로 대기중인 스레드가 아니기 때문에 작업이 가능합니다.
ReentrantLock
- 20개의 Thread들이 Barrier에 들어와서 counter 값을 증가시키는 작업의 Race Condition 방지를 위해
waitForOthers()
함수의 counter 값을 증가 시키는 로직에 에 임계 영역 쳐주기
Barrier Class
public class Barrier {
private final int numberOfWorkers;
private final Semaphore semaphore = new Semaphore(0); // 처음에는 0으로 시작, 모든 쓰레드 대기
private int counter = 0; // 작업을 완료한 쓰레드 수 추적
private final Lock lock = new ReentrantLock();
public Barrier(int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
}
// 첫번째 Thread가 작업 완료 후 나머지 모든 스레드들을 기다리고, 나머지 모든 스레드가 작업 1을 완료했으면 전부 깨워서 작업 2 수행
public void waitForOthers() throws InterruptedException {
lock.lock();
boolean isLastWorker = false;
try {
counter++;
if (counter == numberOfWorkers) {
isLastWorker = true; // 마지막 쓰레드인지 확인
}
} finally {
lock.unlock();
}
if (isLastWorker) {
semaphore.release(numberOfWorkers - 1); // 마지막 쓰레드는 모든 다른 쓰레드를 깨움
System.out.println("==================== 작업 끝 ====================");
} else {
semaphore.acquire(); // 다른 쓰레드는 대기
}
}
}
📚 Worker Thread
실제로 작업을 수행할 Thread를 구현합니다.
Barrier 클래스를 멤버로 가지고 있고, 다른 Thread들을 기다리기 위해 동기화된 지점을 설정하는데 이 Barrier 클래스를 이용합니다.
그리고 단순히 task()
함수에서 콘솔 출력 작업인 작업 1,2를 수행 하는데,
모든 스레드가 작업 1을 우선 완료하고 Barrier 클래스의 waitForOthers()
로 다른 Thread가 모두 작업 1을 완료할 때까지 대기 후 작업 2를 수행합니다.
Worker Thread
public class CoordinatedWorkRunner implements Runnable {
private final Barrier barrier;
public CoordinatedWorkRunner(Barrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
task();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void task() throws InterruptedException {
// 첫 번째 작업 수행
System.out.println(Thread.currentThread().getName()
+ " 작업 1 수행");
barrier.waitForOthers(); // 다른 쓰레드들이 도착할 때까지 대기
// 두 번째 작업 수행
System.out.println(Thread.currentThread().getName()
+ " 작업 2 수행");
}
}
📚 실행 해보기
이제 Thread와 Barrier를 만들었으니 실행 해보겠습니다.
Thread의 개수는 20개로 맟추고 단순히 For문을 돌며 모든 Thread를 실행 시킵니다.
아래 결과값을 보면 Semaphore를 이용해 여러 Thread들을 동기화 해서 작업 순서 제어에 성공 하였습니다.
실제 코드를 블로그에 적을 순 없으니 간단한 예시를 만들어 멀티스레딩 환경에서의 작업 순서 제어를 구현해보았습니다.
public class SemaphoreBarrier {
public static void main(String [] args) throws InterruptedException {
int numberOfThreads = 20;
List<Thread> threads = new ArrayList<>();
// 공유 자원- > 모든 스레드가 이 Barrier 객체를 가지며 내부 counter 값으로 Semaphore를 통해 Thread Blocking이 일어남
Barrier barrier = new Barrier(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
threads.add(new Thread(new CoordinatedWorkRunner(barrier)));
}
for(Thread thread: threads) {
thread.start();
}
}
}
결과값
> Task :SemaphoreBarrier.main()
Thread-4 작업 1 수행
Thread-3 작업 1 수행
Thread-5 작업 1 수행
Thread-14 작업 1 수행
Thread-6 작업 1 수행
Thread-11 작업 1 수행
Thread-1 작업 1 수행
Thread-2 작업 1 수행
Thread-10 작업 1 수행
Thread-0 작업 1 수행
Thread-17 작업 1 수행
Thread-13 작업 1 수행
Thread-19 작업 1 수행
Thread-18 작업 1 수행
Thread-16 작업 1 수행
Thread-7 작업 1 수행
Thread-15 작업 1 수행
Thread-12 작업 1 수행
Thread-9 작업 1 수행
Thread-8 작업 1 수행
==================== 작업 끝 ====================
Thread-4 작업 2 수행
Thread-2 작업 2 수행
Thread-0 작업 2 수행
Thread-13 작업 2 수행
Thread-7 작업 2 수행
Thread-3 작업 2 수행
Thread-8 작업 2 수행
Thread-18 작업 2 수행
Thread-16 작업 2 수행
Thread-6 작업 2 수행
Thread-1 작업 2 수행
Thread-11 작업 2 수행
Thread-9 작업 2 수행
Thread-15 작업 2 수행
Thread-5 작업 2 수행
Thread-19 작업 2 수행
Thread-17 작업 2 수행
Thread-10 작업 2 수행
Thread-14 작업 2 수행
Thread-12 작업 2 수행
전체 코드
public class SemaphoreBarrier {
public static void main(String [] args) throws InterruptedException {
int numberOfThreads = 20;
List<Thread> threads = new ArrayList<>();
// 공유 자원- > 모든 스레드가 이 Barrier 객체를 가지며 내부 counter 값으로 Semaphore를 통해 Thread Blocking이 일어남
Barrier barrier = new Barrier(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
threads.add(new Thread(new CoordinatedWorkRunner(barrier)));
}
for(Thread thread: threads) {
thread.start();
}
}
static class Barrier {
private final int numberOfWorkers;
private final Semaphore semaphore = new Semaphore(0); // 처음에는 0으로 시작, 모든 쓰레드 대기
private int counter = 0; // 작업을 완료한 쓰레드 수 추적
private final Lock lock = new ReentrantLock();
public Barrier(int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
}
// 첫번째 Thread가 작업 완료 후 나머지 모든 스레드들을 기다리고, 나머지 모든 스레드가 작업 1을 완료 했으면 그떄 전부 꺠워서 작업 2 수행
public void waitForOthers() throws InterruptedException {
lock.lock();
boolean isLastWorker = false;
try {
counter++;
if (counter == numberOfWorkers) {
isLastWorker = true; // 마지막 쓰레드인지 확인
}
} finally {
lock.unlock();
}
if (isLastWorker) {
semaphore.release(numberOfWorkers - 1); // 마지막 쓰레드는 모든 다른 쓰레드를 깨움
System.out.println("==================== 작업 끝 ====================");
} else {
semaphore.acquire(); // 다른 쓰레드는 대기
}
}
}
static class CoordinatedWorkRunner implements Runnable {
private final Barrier barrier;
public CoordinatedWorkRunner(Barrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
task();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void task() throws InterruptedException {
// 첫 번째 작업 수행
System.out.println(Thread.currentThread().getName()
+ " 작업 1 수행");
barrier.waitForOthers(); // 다른 쓰레드들이 도착할 때까지 대기
// 두 번째 작업 수행
System.out.println(Thread.currentThread().getName()
+ " 작업 2 수행");
}
}
}
'📘 Backend > Java' 카테고리의 다른 글
Java Reflection - Dynamic Object Creation(Constructor<?>) (1) | 2024.12.30 |
---|---|
Object를 이용한 Thread 간 통신 / 동기화 (5) | 2024.10.10 |
Producer-Consumer Pattern with RabbitMQ (0) | 2024.06.25 |
Java에서 Windows Power Shell 명령어 실행 (0) | 2024.04.01 |
Coarse-Grained Lock & Fine-Grained Lock (1) | 2024.03.24 |