병렬 스트림은 Fork/Join 공용 풀을 사용해서 병렬 연산을 수행한다.
직접 스레드를 만들 필요 없이 스트림에 parallel() 메서드만 호출하면, 스트림이 자동으로 병렬 처리된다
스트림에서 parallel() 를 선언하면 스트림은 공용 ForkJoinPool 을 사용하고, 내부적으로 병렬 처리 가능한 스레드 숫자와 작업의 크기 등을 확인하면서, Spliterator 를 통해 데이터를 자동으로 분할한다
공용 풀(Common Pool)은 Fork/Join 프레임워크의 편리한 기능으로, 별도의 풀 생성 없이도 효율적인 병렬 처리를가능하게 한다.
하지만 블로킹 작업이나 특수한 설정이 필요한 경우에는 커스텀 풀을 고려해야 한다.
public class ParallelMain4 {
public static void main(String[] args) {
int processorCount = Runtime.getRuntime().availableProcessors();
ForkJoinPool commonPool = ForkJoinPool.commonPool();
log("processorCount = " + processorCount + ", commonPool = " + commonPool.getParallelism());
long startTime = System.currentTimeMillis();
int sum = IntStream.rangeClosed(1, 8)
.parallel() // 추가
.map(HeavyJob::heavyTask)
.reduce(0, (a, b) -> a + b); // sum();
long endTime = System.currentTimeMillis();
log("time: " + (endTime - startTime) + "ms, sum: " + sum);
}
}
===
// 안티패턴 -> 커스텀 풀 사용 권장
public static void main(String[] args) throws InterruptedException {
// 병렬 수준 3으로 제한
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "3");
// 요청 풀 추가
ExecutorService requestPool = Executors.newFixedThreadPool(100);
int nThreads = 20; // 1, 2, 3, 10, 20
for (int i = 1; i <= nThreads; i++) {
String requestName = "request" + i;
requestPool.submit(() -> logic(requestName));
Thread.sleep(100); // 스레드 순서를 확인하기 위해 약간 대기
}
requestPool.close();
}
private static void logic(String requestName) {
log("[" + requestName + " ] START");
long startTime = System.currentTimeMillis();
int sum = IntStream.rangeClosed(1, 4)
.parallel()
.map(i -> HeavyJob.heavyTask(i, requestName))
.reduce(0, (a, b) -> a + b);
long endTime = System.currentTimeMillis();
log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
주의! 실무에서 공용 풀은 절대! I/O 바운드 작업을 하면 안된다!
병렬 스트림은 반드시 CPU 바운드 작업에만 사용하자!
그렇다면 여러 작업을 병렬로 처리해야 하는데, I/O 바운드 작업이 많을 때는 어떻게 하면 좋을까? 이때는 스레드를 직접 사용하거나, ExecutorService 등을 통해 별도의 스레드 풀을 사용해야 한다.
// 권장 예시 - 커스텀 스레드 풀 사용
// 최대 400개의 스레드를 가진 별도의 풀을 생성해서 병렬 작업 처리에 사용한다.
병렬 스트림( parallel() )을 사용하지 않고, 직접 전용 스레드 풀에 작업을 제출한다
public static void main(String[] args) throws InterruptedException {
ExecutorService requestPool = Executors.newFixedThreadPool(100);
// logic 처리 전용 스레드 풀 추가
ExecutorService logicPool = Executors.newFixedThreadPool(400);
int nThreads = 3; // 1, 2, 3, 10, 20
for (int i = 1; i <= nThreads; i++) {
String requestName = "request" + i;
requestPool.submit(() -> logic(requestName, logicPool));
Thread.sleep(100); // 스레드 순서를 확인하기 위해 약간 대기
}
requestPool.close();
logicPool.close();
}
private static void logic(String requestName, ExecutorService es) {
log("[" + requestName + " ] START");
long startTime = System.currentTimeMillis();
// 1부터 4까지의 작업을 각각 스레드 풀에 제출
Future<Integer> f1 = es.submit(() -> HeavyJob.heavyTask(1, requestName));
Future<Integer> f2 = es.submit(() -> HeavyJob.heavyTask(2, requestName));
Future<Integer> f3 = es.submit(() -> HeavyJob.heavyTask(3, requestName));
Future<Integer> f4 = es.submit(() -> HeavyJob.heavyTask(4, requestName));
int sum;
try {
Integer v1 = f1.get();
Integer v2 = f2.get();
Integer v3 = f3.get();
Integer v4 = f4.get();
sum = v1 + v2 + v3 + v4;
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
===
// 해당 코드는 위의 코드를 간단히
public static void main(String[] args) throws InterruptedException {
ExecutorService requestPool = Executors.newFixedThreadPool(100);
// logic 처리 전용 스레드 풀 추가
ExecutorService logicPool = Executors.newFixedThreadPool(400);
int nThreads = 3; // 1, 2, 3, 10, 20
for (int i = 1; i <= nThreads; i++) {
String requestName = "request" + i;
requestPool.submit(() -> logic(requestName, logicPool));
Thread.sleep(100); // 스레드 순서를 확인하기 위해 약간 대기
}
requestPool.close();
logicPool.close();
}
private static void logic(String requestName, ExecutorService es) {
log("[" + requestName + " ] START");
long startTime = System.currentTimeMillis();
// 1부터 4까지의 작업을 각각 스레드 풀에 제출
List<Future<Integer>> futures = IntStream.range(1, 4)
.mapToObj(i -> es.submit(() -> HeavyJob.heavyTask(i, requestName)))
.toList();
int sum = futures.stream()
.mapToInt(f -> {
try {
return f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).sum();
long endTime = System.currentTimeMillis();
log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}