본문 바로가기

Programming/Java

[Java] 병렬 데이터 처리(병렬 스트림, 포크/조인 프레임워크)

병렬 처리(Parallel Operation)란 멀티 코어 환경에서 하나의 작업을 분할해 각각의 코어가 병렬적으로 처리하는 것이다.

 

자바7 이전에는 데이터 컬렉션을 병렬 처리하기 위해서는 데이터를 분할하고 각각의 스레드로 할당 해야한다. 하지만 스레드는 경쟁 상태(race condition)가 발생할 수 있어 동기화가 필요하고, 마지막에는 각 스레드에서 발생한 부분 결과를 하나로 합치는 과정이 필요하다.

하지만 병렬 스트림과 포크/조인 프레임워크를 사용하면 쉽게 병렬 처리가 가능하다

 

동시성과 병렬성

둘 다 멀티 스레드의 동작 방식이라는 점은 동일하지만 목적이 다르다.

 

동시성(Concurrency)

멀티 작업을 위해 멀티 스레드가 번갈아가며 실행하는 성질이다. 싱글 코어 CPU를 이용한 멀티 작업은 병렬적으로 실행되는 것처럼 보이지만 실제로는 동시성 작업이다.

 

병렬성(Parallelism)

병렬성은 멀티 작업을 위해 멀티 코어를 이용해 동시에 실행하는 성질이다.

 

데이터 병렬성(Data Parallelism)

  • 전체 데이터를 나누어 서브 데이터를 만들고 서브 데이터를 병렬 처리해 작업을 빠르게 종료하는 것
  • 병렬 스트림은 데이터 병렬성을 구현
  • 멀티 코어의 수만큼 큰 요소를 서브 요소로 나누고, 서브 요소를 분리된 스레드에서 병렬로 처리

 

작업 병렬성(Task Parallelism)

  • 서로 다른 작업을 병렬 처리하는 것
  • 예로는 웹서버에서는 각각의 요청을 개별 스레드에서 병렬로 처리

 

병렬 스트림

병렬 스트림은 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 때문에 병렬 스트림을 이용하면 멀티 코어 프로세서가 각각의 청크를 처리하도록 할당이 가능해진다.

parallelStream()을 호출하면 컬렉션에서 바로 병렬 스트림을 리턴한다.

 

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
list.parallelStream().reduce(0, Integer::sum);

 

병렬 스트림은 내부적으로 ForkJoinPool을 사용하는데, ForkJoinPool은 프로세서 수(Runtion.getRuntime().availableProcessors())의 스레드를 가진다.

이를 변경할 수는 있지만 모든 병렬 스트림 연산에 영향을 주기 때문에 기본값을 사용하는 것을 권장한다.

 

순차 스트림 ↔ 병렬 스트림

순차 스트림에 parallel()메소드를 호출하면 기존 스트림을 병렬로 변경이 가능하고, 병렬 스트림에 sequential()메소드를 호출하면 순차 스트림으로 변경할 수 있다.

이 메소드를 호출하면 내부적으로 병렬로 수행해야 함을 의미하는 불리언(boolean) 플래그가 설정되게 된다.

 

메소드를 통해 연산 마다 병렬 실행과 순차 실행을 수행할 지 제어할 수 있게 된다.

stream.parallel()
    .map() // 병렬 실행
    .sequential()
    .reduce(); // 순차 실행

 

 

성능

병렬 스트림이 항상 순차 스트림보다 빠른 것은 아니다.

병렬 스트림은 병렬화하기 위해 스트림을 재귀적으로 분할하고, 스레드를 할당하고, 최종적으로 부분 결과를 하나로 합치는 과정이 필요하기 때문에 오히려 속도가 느릴 수 있다.

 

  1. 요소의 수가 많고 요소당 처리시간이 긴 경우
    • 병렬 처리는 스레드 풀을 생성하고 스레드를 생성하는 추가적인 비용이 발생한다. 때문에 요소의 수가 적다면 오히려 순차 처리가 빠를 수 있다.
    • 멀티 코어에서 데이터의 이동은 오버헤드가 크기때문에 데이터 전송 시간보다 오래 걸리는 작업만 병렬로 처리하는 것이 좋다.
  2. 스트림 소스의 종류
    • ArrayList나 배열은 인덱스로 요소를 관리해 분리가 쉽지만 LinkedList는 분할을 위해서는 모두 탐색을 해야 하기 때문에 느릴 수 있다.
  3. 코어(core)의 수
    • 만약 실행하는 프로세서가 싱글 코어라면 스레드의 수만 증가하고 동시성 작업으로 진행되기 때문에 오히려 성능이 하락한다.
  4. 병렬로 수행하기 어려운 스트림 모델
    • iterate()의 경우, 이전 연산의 결과가 스트림의 입력에 영향을 미친다. 때문에 이전 연산이 완료되어야 다음 스트림으로 넘어갈 수 있기 때문에 분할하기 어려워 성능이 오히려 하락한다.
  5. 박싱의 최소화
    • 박싱과 언박싱은 성능을 크게 하락시키기 때문에 기본형 스트림(IntStream, LongStream, DoubleStream)을 우선 사용해야한다.
  6. 순서에 의존하는 연산
    • 순서에 의존하는 연산은 스트림에서 수행하게 되면 많은 비용이 발생한다.
    • 순서가 중요하지 않다면 findFirst보다 findAny가 좋고, 단순 limit보다 unordered().limit이 더 효율적이다.

 

 

포크/조인 프레임워크

병렬화할 수 있는 작업을 재귀적으로 서브 데이터로 분할한 다음 각각의 결과를 합쳐 전체 결과를 만들도록 설계되었다.

병렬스트림 내부에서 포크/조인 프레임워크를 활용하고 있기 때문에 정확한 동작방식을 이해하면 병렬스트림을 쉽게 이해할 수 있다.

Recursive Task

스레드 풀을 사용하기 위해서는 RecursiveTask<R>의 서브 클래스를 구현하면 된다. 여기서 R은 병렬화를 통해 연산된 결과이다.

 

RecursiveTask를 구현하려면 compute() 추상메소드를 구현하면 된다. 이 메소드는 테스크를 서브 테스크로 분할하는 로직, 더이상 분할이 불가능할 때 서브 테스크의 결과를 생산할 알고리즘을 정의한다.

if(Task is small) { // 테스크가 작아 분할이 불가능
    Execute the task // 순차적으로 테스크 계산
} else {
    // 테스크를 두 서브 테스크로 분할
    ForkJoinTask first = getFirstHalfTask();
    first.fork();
    ForkJoinTask second = getSecondHalfTask();
    second.compute();
    // 모든 서브 테스크의 연산이 완료될 때까지 기다리고, 결과를 합친다.
    first.join();
}

 

_images/forkjoin_2.png

각각의 서브 테스크의 크기가 작아질 때까지 재귀적으로 테스크를 분할한다. 더이상 분할이 불가능하면 서브 테스크를 병렬로 수행하고 나온 부분 결과를 조합해 최종 결과를 만든다.

 

예제

import java.util.concurrent.RecursiveTask;

public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 1000;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    // 서브 테스크를 재귀적으로 만들기 위해 사용할 비공개 생성자
    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int len = end - start; // 서브 테스크의 배열의 길이

        // 정해진 기준값 이하로 배열의 길이가 줄어들 경우 결과를 반환한다.
        if (len <= THRESHOLD) {
            return computeSequentially();
        }

        // 왼쪽 절반으로 분할
        ForkJoinSumCalculator left = new ForkJoinSumCalculator(numbers, start, start + len / 2);
        left.fork(); // ForkJoinFool의 다른 스레드로 새로 생성한 테스크를 비동기로 실행

        // 나머지 절반을 분할
        ForkJoinSumCalculator right = new ForkJoinSumCalculator(numbers, start + len / 2, end);
        Long rightRet = right.compute(); // 두 번째 서브 테스크를 동기로 실행한다.
        Long leftRet = left.join(); // 첫 번째 서브 테스크의 결과를 읽거나, 아직 결과가 나오지 않았다면 대기

        return leftRet + rightRet; // 두 서브 테스크의 결과를 합해 반환환
    }
    
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
long ret = new ForkJoinPool().invoke(task); // 결과 반환

 

 

주의점

  • join()메소드는 결과를 반환할 때까지 블록시키기 때문에 항상 두 서브 테스크를 시작한 뒤 호출해야 한다. 그렇지 않으면, 기존의 순차 스트림보다 느리게 된다.
  • RecursiveTask 내에서는 ForkJoinPoolinvoke()를 사용하면 안되고 compute()fork()메소드를 호출해야 한다.
  • 분리된 서브테스크 중 한 작업에만 compute()작업을 호출해야 한다. 한 테스크는 같은 스레드를 재사용할 수 있어 오베헤드가 감소한다.
  • 병렬 계산은 디버깅이 어렵다. 다른 스레드에서 compute()를 호출하기 때문에 stack trace가 도움이 되지 않는다.

 

작업 훔치기

만약 서브 테스크를 1000개 이상으로 분리하면 각각의 테스크에 CPU을 할당할 수 없어 낭비같아 보일 수 있다. 하지만 실제로는 가능한 많이 분할하는 것이 좋다.

 

 

작업 훔치기(work stealing) 기법은 모든 스레드를 거의 공정하게 분할하게 된다. 각각의 스레드는 자신에게 할당된 테스크를 포함하는 이중 연결리스트를 참조하고, 작업이 끝날 때마다 큐의 헤드에서 다른 테스크를 가져와 작업을 처리한다.

한 스레드는 다른 스레드보다 빠르게 작업을 처리하게 되면 다른 스레드 큐의 꼬리(tail)에서 작업을 훔쳐온다. 모든 큐가 빌 때까지 과정을 반복하게 되는데 이 때문에 테스크의 크기를 작게 나누어야 스레드 간의 작업 부하를 비슷하게 유지할 수 있게 된다.

 

 

Spliterator

'분할할 수 있는 반복자'라는 뜻으로 Iterator처럼 요소 탐색 기능을 제공한다는 점은 동일하지만 병렬 작업에 특화되어 있다. 탐색하는 데이터를 어떻게 병렬화할지 정의한다.

컬렉션 프레임워크는 디폴트 Spliterator 구현을 제공하고 컬렉션은 Spliterator 인터페이스를 구현한다.

 

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action); // 탐색할 요소가 남았는지 여부
    Spliterator<T> trySplit(); // 일부 요소를 분할해 두 번째 Spliterator를 생성
    long estimateSize(); // 탐색해야 할 요소 수 정보
    int characteristics(); // 특성 집합 정보
}

 

  • Spliteraotr 특성
    • ORDERED : 요소에 정해진 순서가 있어 순서에 유의
    • DISTINCT : 두 요소를 방문했을 때 항상 다름(x.equals(y) == false)
    • SORTED : 탐색된 요소는 정해진 순서를 따름
    • SIZED : 크기가 알려진 소스로 Spliterator를 생성하면 estimateSize()는 정확한 크기를 반환
    • NON-NULL : 모든 요소는 NULL이 아님
    • IMMUTABLE : 소스가 변하지 않는다. 요소의 추가, 삭제, 변경이 불가능
    • CONCURRENT : 동기화 없이 여러 스레드에서 소스의 변경이 가능
    • SUBSIZED : 분할되는 모든 Spliterator는 SIZED 특성을 가진다.

 

분할 과정

  1. Spliterator에 trySplit()을 호출하면 두 번째 Splitarator가 생성
  2. 다시 trySplit()을 호출하면 4개, 결과가 null이 될 때까지 반복한다
  3. null이 반환되면 더 이상 분할이 불가능하다는 의미기 때문에 종료된다.

 

 


📚 Reference

신용권, 『이것이 자바다』, 한빛미디어(2015)
라울-게이브리얼 우르마, 『모던 자바 인 액션』, 우정은, 한빛미디어(2018)
https://java-8-tips.readthedocs.io/en/stable/forkjoin.html

 

'Programming > Java' 카테고리의 다른 글

[Java] String.replaceAll() 활용하기  (0) 2021.05.19
[Java] 스트림(Stream) 활용  (0) 2021.05.13
[Java] 스트림(Stream)  (0) 2021.05.11
[Java] 람다(lambda)  (0) 2021.05.06
[Java] Boxing, Unboxing  (0) 2021.05.05