자바의 Thread

7 분 소요

1. Thread의 기본

운영체제 레벨에서 Thread는 명령어 포인터(instruction pointer)와 스택 포인터(stack pointer) 두가지를 가지고 있다.

  • 명령어 포인터 : 다음에 실행할 명령어를 가리킴
  • 스택 포인터 : Thread의 독립적인 local data를 저장하는 메모리 영역을 가리킴.

CPU는 한번에 한 명령어만 처리할 수 있다.
따라서 한 CPU를 통해 멀티쓰레드 동작을 위해서 CPU는 응용 프로그램 Thread 사이에 처리 시간을 공유해야 한다.

이러한 CPU 처리 시간 공유는 스케줄러(schedular)에 의해 처리된다.
스케줄러는 CPU가 얼마나 한 Thread를 얼마동안 처리해야 하는지를 결정하는데 이런 스케줄링 전략은 다양한 방식에 의해 구현될수 있지만 보통은 우선순위 스케줄링 전략을 기반으로 한다.

자바에서 Thread 우선 순위는 Thread.setPriority(int) 를 통해 설정 할 수 있다.
최저 1에서 최고 10까지 선택할 수 있고 명시적으로 설정하지 않을 경우 default는 5이다.

그런데 스케줄링이 우선순위를 기반으로만 동작하게 된다면 낮은 우선순위의 Thread는 작업을 수행할 충분한 시간을 얻지 못할 수 있다. 이러한 상태를 기아(starvation) 상태라 한다. 따라서 스케줄러는 이러한 경우에 대한 고려도 필요하다.

멀티 Thread 환경에서 처리할 Thread의 변경은 문맥 교환(context switch) 이라고 한다.
문맥 교환은 여러 과정으로 구성된다.

  1. 현재 실행중인 thread를 나중에 다시 시작할 수 있도록 상태 저장.
  2. 저장 이후 thread를 대기상태로 만듬.
  3. 다른 thread 처리를 위해 대기 상태인 다른 thread를 복원

2. 멀티 Thread

멀티쓰레딩을 통해 동작이 동시에 실행되도록 할 수 있다. 만약 실행하는 Thread의 수가 프로세서의 수를 초과하게 되면 완벽한 동시성이 될 수 없다. 하지만 스케줄러는 Thread 사이를 빠르게 전환하며 여러 동작이 동시에 실행 되도록 한다.

멀티 Thread는 필수적이지만 아래와 같이 복잡성 증가, 자원 소비 증가 등이 동반된다.

2.1. 자원 소비 증가

각 thread는 지역변수 및 매개변수를 저장하기 위해 전용 메모리 영역이 할당된다.
이 메모리 영역은 thread가 생성될대 할당되어 thread가 종료되면 회수된다.
프로세서 입장에서는 thread가 많으면 context switch를 해야 하기 때문에 오버헤드가 생긴다.

2.2. 복잡성 증가

멀티 thread는 실행의 불확식성을 동반한다. 이러한 불확실성으로 인해 디버깅이 어려워지고 멀티 thread 조절은 또 다른 버그를 만들 위험도 있다.

2.3. 데이터 불일치

2개 이상의 서로 다른 thread가 동일한 변수를 동시에 변경하려고 할때 이 변수는 경쟁조건(race condition)에 노출된다.
경쟁조건이란 코드 실행 순서가 일정하지 않기 때문에 발생한다.

public class RaceCondition {
    int sharedResource = 0;
	
    public void startTwoThreads() {
        Thread t1 = new Thread(new Runnable() {
            public void run() {
                sharedResource++;
            }
        });
        t1.start();
		
        Thread t2 = new Thread(new Runnable() {
            public void run() {
                sharedResource--;
            }
        });
        t2.start();
    }
}

위 예에서 sharedResource는 경쟁조건에 노출된다.

위의 또 다른 문제점은 sharedResource++, sharedResource-- 연산이다.
context switch는 바이트코드 연산 사이에서도 발생할 수 있는데 위의 --, ++ 연산의 경우 내부적으로 3가지 연산의 조합이기 때문에 더 예상할 수 없는 결과를 발생시킨다.

만약 위와같은 문제를 피하기 위해서는 코드가 각 thread에서 상호 배타적으로 동작해야 하고 이러한 영역을 원자 영역(atomic region) 이라 한다.

자바에서 원자 영역을 만들어주는 가장 기본적인 동기화 매커니즘으로 synchronized 키워드가 있다.

3. Thread Safe

여러 thread에서 객체에 접근할 때 객체가 항상 정확한 상태를 유지하는 것을 Thread safe 하다고 말한다.
즉, 객체의 상태가 경쟁조건에 빠지지 않는 것이다.

Thread safe는 여러방법이 있겠지만 기본적으로 객체 상태에 대한 접근을 제어할 수 있도록 객체를 동기화 함으로써 가능해진다.

참고로 이렇게 두개 이상의 쓰레드가 특정 자원을 공유하고 있을 때 한번에 하나의 쓰레드에게만 접근을 허용하고자 하는 영역을 임계 영역(critical section) 이라고 한다.

동기화는 현재 임계 영역에서 실행되는 thread가 있는지 확인하는 잠금 매커니즘(locking machanism)으로 만들어 진다.

자바는 아래와 같은 암시적 잠금과 명시적 잠금 매커니즘을 가지고 있다.

  1. 암시적 잠금
    • synchronized 키워드
  2. 명시적 잠금
    • java.util.concurrent.locks.ReentrantLock
    • java.util.concurrent.locks.ReentrantReadWriteLock

3.1. 암시적 잠금

synchronized 키워드에 의해 구현되며 자세한 내용은 (Java)synchronized.md 문서를 참고한다.

3.2. 명시적 잠금

좀 더 고급화된 잠금 전략으로 ReentrantLock 또는 ReentrantReadWriteLock 클래스를 이용할 수 있다.
(try-catch로 묶여야 해서 코드가 조금 지저분해지는 단점이 있다.)

3.2.1. ReentrantLock

ReentrantLocksynchronized는 같은 의미를 가진다.
두 기법 모두 어느 한 thread가 임계영역에 들어온 경우 다른 thread의 접근을 차단한다.

int sharedResource;
private ReentrantLock mLock = new ReentrantLock();

public void changeState() {
    mLock.lock();
    try{
        sharedResource++;
    }finally{
        mLock.unlock();
    }
}

3.2.2. ReentrantReadWriteLock

ReentrantLocksynchronized과 같은 기법은 다른 모든 thread가 공유 변수를 단순히 읽는 것조차 유해하다고 판단하는 방어적인 전략이다.

ReentrantReadWriteLock의 경우 읽으려는 thread는 동시에 실행되게 두면서, 쓰기는 차단한다.

int sharedResource;
private ReentrantReadWriteLock mLock = new ReentrantReadWriteLock();

public void changeState() {
    mLock.writeLock().lock();
    try{
        sharedResource++;
    }finally{
        mLock.writeLock().unlock();
    }
}

public int readState() {
    mLock.readLock().lock();
    try{
        return sharedResource;
    }finally{
        mLock.readLock().unlock();
    }
}

참고로 ReentrantReadWriteLock은 thread가 허용 또는 차단되야 하는지 확인하기 때문에 상대적으로 복잡하고 성능이 떨어진다.
ReentrantReadWriteLock 적용의 좋은 사례는 읽기 thread가 다수 있으면서 쓰기 thread는 적은 경우이다.

4. 소비자-생성자 패턴

멀티 thread의 흔한 사례로로 한 thread는 데이터를 생산하고, 한 thread는 데이터를 소비하는 소비자-생산자 패턴(consumer-producer pattern)이 있다.

public class ConsumerProducer {
    private LinkedList<Integer> list = new LinkedList<Integer>();
    private final int LIMIT = 10;
    private Object lock = new Object();
	
    public void produce() throws InterruptedException {
        int value = 0;
        while(true) {
            synchronized(lock) {
                while(list.size() == LIMIT){
                    // 생산 최대치면 wait를 걸어서 lock 사용 권한을 넘겨준다.
                    lock.wait();
                    // 나중에 consume()에서 lock을 풀어주면 여기 다음 구문이 실행된다.
                }
                list.add(value++);
				
                // notify는 굳이 필요없을 수도 있지만 lock을 바로 풀어줘서 좀 더 빠르게 실행되게 할 수 있는 효과가 있다.
                lock.notify();
            }
        }
    }
	
    public void consume() throws InterruptedException {
        while(true) {
            synchronized(lock) {
                while(list.size() == 0){
                    // 전부 소비하고 없으면 wait를 걸어서 lock 사용 권한을 넘겨준다.
                    lock.wait();
                    // 나중에 produce()에서 lock을 풀어주면 여기 다음 구문이 실행된다.
                }
                int value = list.removeFirst();
				
                // notify는 굳이 필요없을 수도 있지만 lock을 바로 풀어줘서 좀 더 빠르게 실행되게 할 수 있는 효과가 있다.
                lock.notify();
            }
        }
    }
}
final ConsumerProducer cp = new ConsumerProducer();

new Thread(new Runnable() {
    public void run() {
        try{
            cp.produce();
        }catch(InterruptedException e){
			
        }
    }
}).start();

new Thread(new Runnable() {
    public void run() {
        try{
            cp.consume();
        }catch(InterruptedException e){
			
        }
    }
}).start();

5. Thread 통신

5.1. 파이프 (Pipe)

java.io 패키지의 하나로 같은 프로세스 내에서 두 쓰레드 간의 단방향 통신을 제공한다.
(생산자 쓰레드는 파이프에 데이터를 기록하고, 소비자 쓰레드는 데이터를 읽는다.)

자바의 파이프는 POSIX의 pipe operator(쉘에서 | 문자)와 비슷한 기능을 하지만 프로세스 간의 통신을 하는 POSIX pipe 와는 달리 VM 위 스레드 사이에서 출력을 리다이렉팅 한다.

파이프 자체는 두 개의 연결된 쓰레드(생산자, 소비자)에서 단방향으로만 접근할 수 있는 circular buffer(원형 버퍼)이다.
그리고 앞의 소비자-생산자 패턴에서 볼 수 있듯 파이프는 Thread safe 한 방법이다.

파이프는 시간이 오래 걸리는 어떤 작업(task)에 대해 쓰레드를 나눠 분할할 수 있게 하고, 쓰레드로 나눠진 작업(task)이 있을때 하나의 task가 끝나면 다음 task로 데이터를 옮길 수 있게한다.

파이프는 binary 데이터나 character 데이터를 전송할 수 있다.

  1. PipedInputStream : binary 데이터 전송을 위한 생산자
  2. PipedOutputStream : binary 데이터 전송을 위한 소비자
  3. PipedWriter : character 전송을 위한 생산자
  4. PipedReader : character 전송을 위한 소비자

파이프의 수명은 connection이 형성될때부터 닫힐때까지 작동한다.
이 과정을 크게 세 가지로 나누면 setup, data transfer, disconnection으로 나뉜다.

POSIX : portable operating system interface(이식 가능 운영체제 인터페이스)의 약자로써 서로 다른 UNIX OS의 공통 API를 정리하여 이식성이 높은 유닉스 응용 프로그램을 개발하기 위한 목적으로 IEEE가 책정한 애플리케이션 인터페이스 규격이다. POSIX의 마지막 글자 X는 유닉스 호환 운영체제에 보통 X가 붙는 것에서 유래한다. 규격의 내용은 커널로의 C 언어 인터페이스인 시스템 콜 뿐 아니라, 프로세스 환경, 파일과 디렉터리, 시스템 데이터베이스(암호 파일 등), tar 압축 포맷 등 다양한 분야를 아우른다.

pipe 사용 예

PipedInputStream pipedInputStream = new PipedInputStream();
PipedOutputStream pipedOutputStream = new PipedOutputStream();
pipedInputStream.connect(pipedInputStream);

Thread inputThread = new Thread(new Runnable() { 
    @Override 
    public void run() { 
        try { 
            String string = "Hello Pipe!";
            pipedOutputStream.write(string.getBytes()); 
        }catch(IOException e) { 
            e.printStackTrace(); 
        } 
    } 
});

Thread outputThread = new Thread(new Runnable() { 
    @Override 
    public void run() { 
        try { 
            int data = pipedInputStream.read(); 
            for(; data != -1; data = pipedInputStream.read()) { 
                System.out.print((char)data); 
            } 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 
});

inputThread.start(); 
outputThread.start(); 

통신은 blocking 매커니즘을 가진 소비자-생산자 패턴이다.
파이프가 가득 차면 write()는 차단되고, 파이프가 비면 read()는 차단된다.

통신 완료 후에는 아래 코드로 종료한다.

pipedInputStream.close(); 
pipedOutputStream.close();

둘 중 하나만 닫혀도 충분하다.
단, 쓰기를 닫으면 파이프는 분리되지만 버퍼안의 데이터는 읽을 수 있다. 읽기를 닫으면 퍼버는 즉시 지워진다.

5.2. 공유 메모리 (Shared memory)

동일한 프로세스 내의 모든 스레드는 각각의 stack 영역을 제외한 모든 영역(Code, Data, Heap)을 공유한다. 이 중 Heap 영역(공유 메모리)을 이용하여 스레드간의 통신을 할 수 있다.

아래와 같이 공유메모리(heap)에 쓴 데이터는 같은 프로세스 내의 모든 쓰레드에서 접근이 가능하다.

shared_memory

객체 인스턴스의 경우 인스턴스는 각 쓰레드의 stack에 저장되지만 객체 자체는 heap에 저장되기 때문에 참조를 전달하여 다른 쓰레드에서 접근할 수 있다.

5.2.1. 시그널링

만일 두 스레드가 순서대로 실행되야 하며, 두 스레드 간에 Shared Memory를 사용해 통신한다면 어떻게 해야 할까? 앞서 Pipe의 예시처럼 어떤 state를 polling하여 구현할 수 있다.
Shared Memory에 state를 나타내는 변수를 만들고, 무한 루프를 돌며 state 변수가 변하는 것을 체크하는 것이다.

이 방법도 물론 잘 동작하지만, 이러한 busy waiting은 성능 저하를 초래한다.

이런것보다 Java의 built-in signaling mechanism을 이용하면 더 효율적으로 작동하게 할 수 있는데, java.lang.Object에 정의되어 있는 wait(), notify(), notifyAll() 세 개의 메서드를 사용하는 것이다.

자세한 내용은 Thread 상태를 조절하는 메서드 포스팅을 참고한다.

5.3. 블로킹 큐 (Blocking Queue)

쓰레드 시그널링은 정교한 설정이 가능한 low-level mechansim으로 다양하게 적용할 수 있지만 그만큼 고려해야 할 부분이 많아 버그를 만들기 쉽다.

자바에서는 이를 위해 단방향 통신에 대해 추상화된 high-level signaling mechansim을 제공하는데 그게 java.util.concurrent.BlockingQueue 이다.

blocking queue

java.util.concurrent.BlockingQueue 인터페이스들의 구현체로는 여러 가지가 있는데, Array로 구현되어 고정 크기를 가지는 ArrayBlockingQueue, Linked List로 구현된 LinkedBlockingQueue, Priority를 가지는 PriorityBlockingQueue, insert와 remove가 동시에 이루어지는, 크기가 항상 0으로 유지되는 SynchronousQueue 등이 있다.

API 문서

Blocking queue 사용 예

class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { queue.put(produce()); }
        } catch (InterruptedException ex) { ... handle ...}
    }
    Object produce() { ... }
}

class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { consume(queue.take()); }
        } catch (InterruptedException ex) { ... handle ...}
    }
    void consume(Object x) { ... }
}

class Setup {
    void main() {
        BlockingQueue q = new SomeQueueImplementation();
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

Producer는 BlockingQueue.put()을 하고, Consumer는 BlockingQueue.take()를 하는 것만으로 Thread Safe한 통신을 구현할 수 있다. 내부적으로 atomical하게 작동하도록 lock을 컨트롤 해 주기 때문이다.

카테고리:

업데이트:

댓글남기기