병렬 스트림은 Fork/Join 공용 풀을 사용해서 병렬 연산을 수행한다.

직접 스레드를 만들 필요 없이 스트림에 parallel() 메서드만 호출하면, 스트림이 자동으로 병렬 처리된다

스트림에서 parallel() 를 선언하면 스트림은 공용 ForkJoinPool 을 사용하고, 내부적으로 병렬 처리 가능한 스레드 숫자와 작업의 크기 등을 확인하면서, Spliterator 를 통해 데이터를 자동으로 분할한다

공용 풀(Common Pool)은 Fork/Join 프레임워크의 편리한 기능으로, 별도의 풀 생성 없이도 효율적인 병렬 처리를가능하게 한다.

하지만 블로킹 작업이나 특수한 설정이 필요한 경우에는 커스텀 풀을 고려해야 한다.

public class ParallelMain4 {

    public static void main(String[] args) {
        int processorCount = Runtime.getRuntime().availableProcessors();
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
        log("processorCount = " + processorCount + ", commonPool = " + commonPool.getParallelism());

        long startTime = System.currentTimeMillis();

        int sum = IntStream.rangeClosed(1, 8)
                .parallel() // 추가
                .map(HeavyJob::heavyTask)
                .reduce(0, (a, b) -> a + b); // sum();

        long endTime = System.currentTimeMillis();
        log("time: " + (endTime - startTime) + "ms, sum: " + sum);
    }
}

===
// 안티패턴 -> 커스텀 풀 사용 권장 
public static void main(String[] args) throws InterruptedException {
        // 병렬 수준 3으로 제한
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "3");

        // 요청 풀 추가
        ExecutorService requestPool = Executors.newFixedThreadPool(100);
        int nThreads = 20; // 1, 2, 3, 10, 20
        for (int i = 1; i <= nThreads; i++) {
            String requestName = "request" + i;
            requestPool.submit(() -> logic(requestName));
            Thread.sleep(100); // 스레드 순서를 확인하기 위해 약간 대기
        }
        requestPool.close();
    }

    private static void logic(String requestName) {
        log("[" + requestName + " ] START");
        long startTime = System.currentTimeMillis();

        int sum = IntStream.rangeClosed(1, 4)
                .parallel()
                .map(i -> HeavyJob.heavyTask(i, requestName))
                .reduce(0, (a, b) -> a + b);

        long endTime = System.currentTimeMillis();
        log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
    }

주의 사항

주의! 실무에서 공용 풀은 절대! I/O 바운드 작업을 하면 안된다!

병렬 스트림은 반드시 CPU 바운드 작업에만 사용하자!

그렇다면 여러 작업을 병렬로 처리해야 하는데, I/O 바운드 작업이 많을 때는 어떻게 하면 좋을까? 이때는 스레드를 직접 사용하거나, ExecutorService 등을 통해 별도의 스레드 풀을 사용해야 한다.

// 권장 예시 - 커스텀 스레드 풀 사용
// 최대 400개의 스레드를 가진 별도의 풀을 생성해서 병렬 작업 처리에 사용한다.
병렬 스트림( parallel() )을 사용하지 않고, 직접 전용 스레드 풀에 작업을 제출한다
public static void main(String[] args) throws InterruptedException {
    ExecutorService requestPool = Executors.newFixedThreadPool(100);

    // logic 처리 전용 스레드 풀 추가
    ExecutorService logicPool = Executors.newFixedThreadPool(400);

    int nThreads = 3; // 1, 2, 3, 10, 20
    for (int i = 1; i <= nThreads; i++) {
        String requestName = "request" + i;
        requestPool.submit(() -> logic(requestName, logicPool));
        Thread.sleep(100); // 스레드 순서를 확인하기 위해 약간 대기
    }
    requestPool.close();
    logicPool.close();
}

private static void logic(String requestName, ExecutorService es) {
    log("[" + requestName + " ] START");
    long startTime = System.currentTimeMillis();

    // 1부터 4까지의 작업을 각각 스레드 풀에 제출
    Future<Integer> f1 = es.submit(() -> HeavyJob.heavyTask(1, requestName));
    Future<Integer> f2 = es.submit(() -> HeavyJob.heavyTask(2, requestName));
    Future<Integer> f3 = es.submit(() -> HeavyJob.heavyTask(3, requestName));
    Future<Integer> f4 = es.submit(() -> HeavyJob.heavyTask(4, requestName));

    int sum;
    try {
        Integer v1 = f1.get();
        Integer v2 = f2.get();
        Integer v3 = f3.get();
        Integer v4 = f4.get();
        sum = v1 + v2 + v3 + v4;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }

    long endTime = System.currentTimeMillis();
    log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}

===

// 해당 코드는 위의 코드를 간단히 

public static void main(String[] args) throws InterruptedException {
    ExecutorService requestPool = Executors.newFixedThreadPool(100);

    // logic 처리 전용 스레드 풀 추가
    ExecutorService logicPool = Executors.newFixedThreadPool(400);

    int nThreads = 3; // 1, 2, 3, 10, 20
    for (int i = 1; i <= nThreads; i++) {
        String requestName = "request" + i;
        requestPool.submit(() -> logic(requestName, logicPool));
        Thread.sleep(100); // 스레드 순서를 확인하기 위해 약간 대기
    }
    requestPool.close();
    logicPool.close();
}

private static void logic(String requestName, ExecutorService es) {
    log("[" + requestName + " ] START");
    long startTime = System.currentTimeMillis();

    // 1부터 4까지의 작업을 각각 스레드 풀에 제출
    List<Future<Integer>> futures = IntStream.range(1, 4)
            .mapToObj(i -> es.submit(() -> HeavyJob.heavyTask(i, requestName)))
            .toList();

    int sum = futures.stream()
            .mapToInt(f -> {
                try {
                    return f.get();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).sum();

    long endTime = System.currentTimeMillis();
    log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}