Java

[Java] Stream 병렬 데이터 처리와 성능

snail voyager 2023. 8. 15. 19:01
728x90
반응형

병렬 스트림

  • 컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성됨
  • 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
  • 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당

순차 스트림을 병렬 스트림으로 변환하기

  • 순차 스트림에 parallel 메서드를 호출하면 함수형 리듀싱 연산이 병렬로 처리
public long parallelSum(long n) {
	return Stream.iterate(1L, i -> i+1)
                .limit(n)
                .parallel()
                .reduce(0L, Long::sum);
}
  • 반대로 sequential() 로 병렬 스트림을 순차 스트림으로 바꿀 수 있음
  • parallel 과 sequential 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미침
stream.parallel()
    	.filter(...)
        .sequential()
        .map(...)
        .parallel()	//파이프라인은 전체적으로 병렬로 실행
        .reduce();

스트림 성능 측정

  • Java Microbenchmark Harness (JMH) 라이브러리로 측정
public long sequentialSum() {
	return Stream.interate(1L, i -> i+1)
                    .limit(N)
                    .reduce(0L, Long::sum);
}
  • 전통적인 for 루프 반복문이 저수준으로 동작하고 박싱 처리가 없기 때문에 더 빠름
  • 병렬처리 버전 parallelSum()는 반복 결과로 박싱 처리, 스레드를 할당하는 오버헤드 증가로 더 느림
  • LongStream.rangeClosed 는 기본현을 직접 사용하므로 박싱 오버헤드가 없음
  • LongStream.rangeClosed 는 쉽게 청크로 분할할 수 있는 숫자 범위를 생산
public long rangedSum() {
	return LongStream.rangeClosed(1, N)
                	.parallel()		//병렬 리듀싱 연산 수행
                	.reduce(0L, Long::sum);
}
  • 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직

병렬 스트림의 올바른 사용법

  • 공유된 상태를 바꾸는 알고리즘을 사용할 때 문제 발생
  • 병렬 스트림과 병렬 계산에서는 공유된 가변 상태를 피해야함

병렬 스트림 효과적으로 사용하기

  • 확신이 서지 않으면 직접 측정하라
  • 박싱을 주의하라. 되도록이면 기본형 특화 스트림을 사용
  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다
    • limit, findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용
  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라
  • 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다
  • 스트림을 구성하는 자료구조가 적절한지 확인하라
    • ArrayList 가 LinkedList 보다 효율적으로 분할 가능
  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다
    • 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 병렬 처리할 수 있을지 알 수 없음
  • 최종 연산의 병합과정 비용을 살펴보라
  • 처리해야 할 데이터가 아주 많거나 각 요소를 처리하는 데 오랜 시간이 걸릴 때 성능을 높일 수 있음
소스 분해성
ArrayList 훌륭함
LinkedList 나쁨
IntStream.range 훌륭함
Stream.iterate 나쁨
HashSet 좋음
TreeSet 좋음

포크/조인 프레임워크

  • 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에
  • 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계
  • 서브태스크를 스레드 풀의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스 구현

RecursiveTask 활용

  • 스레드 풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어야함
  • 추상메서드 compute()는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
	순차적으로 태스크 계산
} else {
	태스크를 두 서브태스크로 분할
    태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출
    모든 서브태스크의 연산이 완료될 때까지 기다림
    각 서브태스크의 결과를 합침
}

포크/조인 프레임워크를 제대로 사용하는 방법

  • join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록
    따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야함
  • RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말아야 한다
  • 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다
  • 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅하기 어렵다
  • 순차 처리보다 무조건 빠를 거라는 생각은 버려야 한다

Spliterator 인터페이스

  • 분할할 수 있는 반복자라는 의미
  • 병렬 작업에 특화
  • 요소의 시퀀스를 반복하며 요소를 분할하고 병렬 처리를 위한 분할 작업을 수행할 수 있는 메서드를 제공
  • 컬렉션 프레임워크에 포함된 모든 자료구조에 디폴트 Spliterator 구현을 제공
public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);	//요소를 하나씩 순차적으로 소비하면서 탐색할 요소가 남았으면 참 반환
    Spliterator<T> trySplit();	//요소를 분할해서 두번째 Spliterator를 생성하는 메서드
    long estimateSize();	//탐색해야할 요소 수
    int characteristics();	//Spliterator의 특성 집합을 포함하는 int를 반환
}
public static void main(String[] args) {
    List<Integer> dataList = new ArrayList<>();
    for (int i = 1; i <= 10; i++) {
        dataList.add(i);
    }

    Spliterator<Integer> spliterator = dataList.spliterator();
    Stream<Integer> parallelStream = StreamSupport.stream(spliterator, true);

    parallelStream
        .map(num -> {
            System.out.println("Mapping: " + num + " (Thread: " + Thread.currentThread().getName() + ")");
            return num * 2;
        })
        .forEach(result -> {
            System.out.println("Result: " + result + " (Thread: " + Thread.currentThread().getName() + ")");
        });
}

Mapping: 3 (Thread: main)
Mapping: 1 (Thread: ForkJoinPool.commonPool-worker-1)
Mapping: 2 (Thread: ForkJoinPool.commonPool-worker-2)
Mapping: 4 (Thread: ForkJoinPool.commonPool-worker-3)
Mapping: 5 (Thread: main)
Mapping: 6 (Thread: ForkJoinPool.commonPool-worker-2)
Mapping: 7 (Thread: ForkJoinPool.commonPool-worker-1)
Mapping: 8 (Thread: ForkJoinPool.commonPool-worker-3)
Mapping: 9 (Thread: main)
Mapping: 10 (Thread: ForkJoinPool.commonPool-worker-1)
Result: 2 (Thread: main)
Result: 6 (Thread: ForkJoinPool.commonPool-worker-1)
Result: 4 (Thread: ForkJoinPool.commonPool-worker-2)
Result: 8 (Thread: ForkJoinPool.commonPool-worker-3)
Result: 10 (Thread: main)
Result: 12 (Thread: ForkJoinPool.commonPool-worker-2)
Result: 14 (Thread: ForkJoinPool.commonPool-worker-1)
Result: 16 (Thread: ForkJoinPool.commonPool-worker-3)
Result: 18 (Thread: main)
Result: 20 (Thread: ForkJoinPool.commonPool-worker-1)

StreamSupport.stream()

java.util.stream.Stream을 생성하기 위해 java.lang.Iterable 또는 java.util.Spliterator를 사용하는 메서드

List<String> dataList = Arrays.asList("one", "two", "three", "four", "five");

// Iterable을 기반으로 스트림 생성
Stream<String> iterableStream = StreamSupport.stream(dataList.spliterator(), false);
iterableStream.forEach(System.out::println);
728x90
반응형