본문 바로가기

Java/Java 기초

ParallelStream의 결과를 List에 저장

ParallelStream은 Java에서 제공하고 있는 Stream 중 하나로 Collection 속의 데이터를 병렬 처리하기 위해서 사용된다. 최근에 팀에서  ParallelStream을 사용하던 코드에서 이슈가 있었는데,  ParallelStream으로 처리된 결과를 List에 저장하여 추가적인 작업으로 후처리하려다가 문제가 발생한 것이다. 간단한 코드로 옮겨보자면 다음과 같은 구조의 코드였다.

 

import java.util.ArrayList;
import java.util.List;

public class ListTest {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();

        for (int i = 0; i < 100000; i++) {
            numbers.add(i);
        }

        List<Integer> evenNumbers = new ArrayList<>();
        numbers.parallelStream().filter(number -> number % 2 == 0).forEach(evenNumbers::add);
        System.out.println(evenNumbers.size());
    }
}

 

1 에서 100000까지의 정수를 List에 추가한 후 이중 짝수인 수들을 새로운 리스트에 추가하여 사이즈를 출력하려고 한다. 하지만 실행된 결과를 보면 우리가 원하는 방식으로 작동하지 않는 것을 알 수 있다. 데이터는 중간중간 비어있고, List 사이즈도 기대되는대로 50000이 아니라 실행시마다 바뀌는 것을 볼 수 있다. 심지어 어떤 경우에는 ArrayIndexOutOfBoundsException이 발생하기도 한다. 이는 동시성을 제어하지 못하고 있기 때문에 발생하는 문제로 List에 여러 Thread들이 접근하게 되면서 한 Thread에서 List를 사용하면서 동시에 다른 Thread도 List를 사용하고 있기 때문에 충돌이 발생하게 되는 것이다.

 

우리의 의도대로 작동하게 만드는 방법은 두 가지 있다.

 

첫번째는 List를 synchronizedList로 만드는 것이다. synchronizedList는 동시성을 제어할 수 있는 형태의 List로 synchronized는 기본적으로 사용하고자 하는 자원을 Thread가 점유하는 시점에 Lock을 걸어 Thread Safe하게 작동하도록 한다. 하지만 Thread가 사용할 때마다 Lock을 잡았다가 Unlock하는 작업이 반복되기 때문에 성능이 떨어지게 된다. 구현은 다음과 같이 한다.

 

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class ListTest {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();

        for (int i = 0; i < 100000; i++) {
            numbers.add(i);
        }

        List<Integer> evenNumbers = Collections.synchronizedList(new ArrayList<>());
        numbers.parallelStream().filter(number -> number % 2 == 0).forEach(evenNumbers::add);
        System.out.println(evenNumbers.size());
    }
}

 

두번째는 ParallelStream이 끝나는 시점에 collect를 호출하여 List로 수집한 뒤, 수집된 List로 처리하는 방법이다.

 

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class ListTest {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();

        for (int i = 0; i < 100000; i++) {
            numbers.add(i);
        }

        List<Integer> evenNumbers = numbers.parallelStream().filter(number -> number % 2 == 0).collect(Collectors.toList());
        System.out.println(evenNumbers.size());
    }
}

 

위 방식으로 로직을 구성하면 parallelStream이 종료되는 시점에 return되는 값들을 List에 수집하여 돌려주게 되는데, 이 collect 함수에는 병렬처리와 동시성을 확인하여 처리하는 부분이 포함되어 있기 때문에 안정적으로 목적을 달성할 수 있다. 참고로 collect 함수 내부는 다음과 같다.

 

    @Override
    @SuppressWarnings("unchecked")
    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
            container = evaluate(ReduceOps.makeRef(collector));
        }
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

 

ParallelStream은 병렬처리를 가능하게 해주기 때문에 효과적인 작업처리가 가능하다. 하지만 동시성과 병렬처리에 대한 고민없이 구현하는 경우 리스크가 있기 때문에 충분히 고민하고 설계 후 사용하는 것을 추천한다.

'Java > Java 기초' 카테고리의 다른 글

Simple Java HttpServer and HttpURLConnection  (0) 2021.03.01
Java Custom Annotation  (0) 2020.05.13
Java Reflection 사용하기 (7)  (0) 2019.02.18
Java Reflection 사용하기 (6)  (0) 2019.02.15
Java Reflection 사용하기 (5)  (0) 2019.02.15