자바 7(포크/조인 프레임워크)이 등장하기 전에는 데이터를 서브파트로 분할하여 각각 스레드로 할당해야 된다.
- 의도치 않은 레이스 컨디션이 발생하지 않도록 적적한 동기화를 추가하여 마지막으로 부분 결과를 합쳐야 한다.
7.1 병렬 스트림
병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. (parallelStream)
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() //스트림을 병렬 스트림으로 변환
.reduce(0L,Long::sum);
}
- 이전 코드와 다른점은 parallel 메서드를 통해서 스트림이 여러 청크로 분활되어 있다는 것이다.
- 마지막으로 리듀싱 연산으로 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출한다.
@Override
@SuppressWarnings("unchecked")
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
- parallel 메서드을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 불리언 플래그가 설정된다.
@Override
@SuppressWarnings("unchecked")
public final S sequential() {
sourceStage.parallel = false;
return (S) this;
}
- 병렬 스트림 ↔️ 순차 스트림의 변경은 flag(sourcesStage.parallel)의 설정 값이라고 생각할 수 있고, 마지막 flag값에 따라서 전체 파이프라인이 병렬인지 순차인지 결정하게 된다.
스트림 성능 측정
병렬화를 이용하면 순차나 반복 형식에 비해 성능이 더 좋아질 것이라 추측했다.(추측 하지말고 직접 측정하자..!)
- 자바 마이크로벤치마크 하니스(Java Microbenchmark Harness) JHM 라이브러리를 이용해 벤치마크를 구현하자.
- JMH를 이용하면 간단하고, 어노테이션 기반 방식을 지원하며, 안정적으로 자바 프로그램이나 자바 가상 머신(JVM)을 대상으로 하는 다른 언어용 벤치마크를 구현할 수 있다.
병렬 스트림 사용법
total을 접근할 때마다 데이터 레이스(다수의 쓰레드에서 동시에 데이터에 접근하는) 문제가 일어난다.
- 동기화로 문제를 해결하다보면 결국 병렬화라는 특성이 없어진다.
public static long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
public static class Accumulator {
private long total = 0;
public void add(long value) {
total += value;
}
}
- total += value는 아토믹 연산이 아니다.
- 병렬 스트림이 올바르게 동작하려면 공유된 가변 상태를 피해야 한다.
효과적으로 사용하기
확신이 서지 않으면 직접 측정하라.
- 순차 스트림에서 병렬 스트림으로 쉽게 바꿀 수 있다.
반드시 적절한 벤치마크로 직접 성능을 측정하자 박싱을 주의하라.
- 되도록이면 기본형 특화 스트림을 사용하자.
순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산들을 피하라.
- limit나 findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다.
- findFirst 보다는 findAny(순서와 상관X)를 사용하자
- 정렬된 스트림에 순서가 상관없다면 .unordered()를 호출하면 비정렬된 스트림을 얻을 수 있다.
- 정렬된 스트림 보다 비정렬된 스트림에 순서와 의존하는 연산을 사용하는 것이 좋다.
스트림에서 수행하는 전체 파이프라인 비용을 고려하라.
- 처리해야할 요소 수가 N, 하나의 요소를 처리하는 데 드는 비용이 Q라하면 전체 스트림 파이프라인 처리비용은 N*Q로 예상할 수 있다.
- Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.
소량의 데이터에서는 병렬 스트림이 도움되지 않는다.
- 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문이다.
스트림을 구성하는 자료구조가 적절한지 확인하라.
- ArrayList는 LinkedList보다 효율적으로 분할할 수 있다.
- LinkedList를 분활하려면 모든 요소를 탐색해야 되지만 ArrayList는 요소를 탐색하지 않고 분할할 수 있다.
- range 팩토리 메서드로 만든 기본형 스트림도 쉽게 분해할 수 있다.
- 커스텀 Spliterator를 구현해서 분해 과정을 완벽하게 제어할 수 있다.
스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
- map이나 SIZED 스트림은 크기를 알고 있기에 스트림 분할을 할 수 있고 병렬 처리가 수월하지만, 필터 연산같은 경우 스트림의 길이를 예측할 수 없기 때문에 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.
최종 연산의 병합 과정(예: collector의 combiner 메서드) 비용을 살펴보라.
- 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있다.
🌴스트림 소스와 분해성
소스 | 분해성 |
ArrayList | 훌륭함 |
LinkedList | 나쁨 |
IntStream.range | 훌륭함 |
Stream.iterate | 나쁨 |
HashSet | 좋음 |
TreeSet | 좋음 |
7.2 포크/조인 프레임워크
포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.
서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
- 병렬 스트림은 내부적으로 사용하는 ForkJoinPool은 프로세서 수, 즉 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.
RecursiveTask
protected abstract V compute();
compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.(divide-and-conquer)
if(태스크가 충분히 작거나 더 이상 분할 할 수 없을 경우) {
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출
모든 서브태스크의 연산이 완료될 때까지 기다린다.
각 서브태스크의 결과를 합침
}
포크/조인 프레임워크를 이용해서 병렬 합계 수행
//RecursiveTask를 상속받아 포크/조인 프레임워크에서 사용할 태스크를 생성한다.
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
private static final long THRESHOLD = 10_000;//이 값 이하의 서브태스크는 더 이상 분할할 수없다.
//메인 태스크의 서브태스크를 재귀적으로 만들 때 사용할 비공개 생성자
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
//메인 태스크를 생성할 때 사용할 공개 생성자
public ForkJoinSumCalculator(long[] numbers){
this(numbers,0,numbers.length);
}
@Override
protected Long compute() {
int length = end - start; //더할 배열의 길이
if (length <= THRESHOLD){
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork(); //ForkJoinPool의 다른 쓰레드로 새로 생성한 태스크를 비동기로 실행
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start+length/2, end);
Long rightResult = rightTask.compute(); //두 번째 서브태스크를 동기 실행한다. 이 때 추가로 분할이 일어날 수 있다
Long leftResult = leftTask.join(); //첫 번째 서브태스크의 결과를 읽거나 아직 결과가 없으면 기다린다.
return leftResult + rightResult; //두 서브태스크의 결과를 조합한 결과
}
private long computeSequentially() { //더 이상 분할할 수 없을 때 서브태스크의 결과를 계산하는 알고리즘
long sum = 0;
for (int i = start; i<end; i++){
sum+= numbers[i];
}
return sum;
}
}
ForkJoinSumCalculator를 실행하면 다음과 같은 흐름으로 진행된다.
- LongStream으로 1부터 n(10_000_000)까지의 배열을 생성한다.
-
생성한 배열을 전달해 ForkJoinSumCalculator 태스크를 만든다.
-
ForkJoinSumCalculator 를 ForkJoinPool로 전달한다.
-
풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하며 작업을 수행한다.
-
compute 메서드는 병렬로 실행할 수 있을만큼 태스크가 작아졌는지 확인하며, 태스크가 아직 크다고 생각하면 숫자를 반으로 분할해 새로운 ForkJoinSumCalculator로 할당한다.
-
다시 ForkJoinPool이 새로 생성된 ForkJoinSumCalculator를 실행한다. 이 과정이 재귀적으로 반복되며 주어진 조건(TRASHOLD)을 만족할 때까지 태스크 분할을 반복한다.
-
각 서브태스크는 순차적으로 처리되어 포킹 프로세스로 만들어진 이진 트리의 태스크를 루트에서 역순으로 방문하여 부분 결과를 합쳐 최종 결과를 계산해 반환한다.
Stream API 병렬 데이터 처리하기
목차
catsbi.oopy.io
소프트웨어의 필요한 곳에서 언제든 가져다 쓸 수 있도록 ForkJoinPool을 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장한다.
- 일반적으로 둘 이상의 ForkJoinPool은 사용하지 않는다.
- 인수가 없는 디폴트 생성자를 이용했는데, 이는 JVM에서 이용할 수 있는 모든 프로세서가 자유롭게 풀에 접근할 수 있음을 의미한다.
Runtime.getRuntime().availableProcessors() 는 실제 프로세서외에 하이퍼스레딩과 관련된 가상 프로세서의 개수를 포함하여 반환하는데, ForkJoinPool이 저장될 때는 이 반환값으로 풀에 사용할 스레드 수를 결정한다.
흠 이런식으로 이미 스레드 수를 결정하는데 fork랑 compute를 호출하면서 스레드 수가 늘어나는건가..? 아직 이해 못하겠는 부분
분산된 작업의 크기가 너무 작아지면 병렬 수행의 속도는 순차 수행의 속도보다 느려진다. (병렬 수행의 효과가 상쇄)
-일반적으로 프로세싱 코어의 개수를 초과하는 병렬 작업은 효율적이지 않다.-
포크/조인 프레임워크 제대로 사용하는 방법
join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다.
- 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다.
RecursiveTask 내에서는 ForkJoinPool의 메서드 대신에 compute나 fork 메서드를 직접 호출하자.
- invoke 메서드는 순차 코드에서 병렬 계산을 시작할 때만 사용하자
서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.
- 왼,오른쪽 작업 모두에 fork 메서드를 호출하는 것이 자연스러워 보이나, 한쪽 작업에서는 compute를 호출하는 것이 효율적이다.
- 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.
- fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움이 되지 않는다.
멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠를 거라는 생각은 버려야 한다.
- 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 성능 개선할 수 있다.
- 각 서브태스크의 실행시간은 새로운 태스크를 포킹하는 데 드는 시간보다 길어야 한다.
작업 훔치기
포크/조인 프레임워크에서는 밑에 상황으로 발생하는 문제를 작업 훔치기(work stealing)라는 기법으로 해결한다.
- 이상적: 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 CPU 코어에서 태스크를 실행할 것이고 크기가 같은 각각의 태스크는 같은 시간에 종료될 것이다.
- 현실적: 분할 기법이 효율적이지 않았기 때문일 수도 있고 예기치 않게 디스크 접근 속도가 저하되었거나 외부 서비스와 협력하는 과정에서 지연이 생길 수 있기 때문이다.
작업 훔치기 기법에서는 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다.
각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날 때마다 큐의 head에서 다른 태스크를 가져와서 작업을 처리한다.
- 다른 쓰레드에 비해서 빠르게 일을 처리한 쓰레드는 유휴 상태로 바뀌는 것이 아니라 다른 쓰레드 큐의 tail에서 작업을 훔쳐온다.
- 모든 큐가 빌 때까지 이과정이 반복되기 때문에 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업 부하를 비슷한 수준으로 유지할 수 있다.
![](https://blog.kakaocdn.net/dn/bqSU8P/btsoSme0MhV/3BkoUFidWb6iAnVcfpdQFk/img.png)
7.3 Spliterator 인터페이스
- Iterator처럼 소스의 요소 탐색 기능을 제공한다는 점은 같지만 병렬 작업에 특화되어 있다.
public interface Spliterator<T> { //T는 Spliterator에서 탐색하는 요소의 형식을 가리킨다.
boolean tryAdvanace(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
boolean tryAdvanace(Consumer<? super T> action);
- Spliterator의 요소를 하나씩 순차적으로 소비하며 탐색해야 하는 요소가 남아 있다면 참을 반환한다.
Spliterator<T> trySplit();
- Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성하는 메서드다.
long estimateSize();
- 탐색해야 할 요소 수 정보를 제공할 수 있다.
int characteristics();
- Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.
분할 과정
이 분할 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받는다.
스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.
- Spliterator에 trySplit을 호출하면 두 번째 Spliterator가 생성된다.
- 1번으로 인해서 2개의 Spliterator에 trySplit를 다시 호출하면 4 개의 Spliterator가 생성된다.
- 늘어난 Spliterator 만큼 trySplit을 호출하며 결과가 null이 될 때까지 이 과정을 반복한다.
- Spliterator에 호출한 모든 trySplit의 결과가 null이면 재귀 분할 과정이 종료된다.
Spliterator 특성
특성 | 의미 |
ORDERED | 리스트처럼 요소에 정해진 순서가 있으므로 요소 탐색 및 분할시 이 순서에 주의해야 한다. |
DISTINCT | x,y 두 요소를 방문했을 때 x.equals(y)는 false여야 한다. |
SORTED | 탐색된 요소는 미리 정의된 순서를 따른다. |
SIZED | 크기가 알려진 소스(예: Set)로 Spliterator를 생성했으므로 estimatedSize()는 정확한 값을 반환한다. |
NON-NULL | 탐색하는 요소는 Null이 아니다. |
IMMUTABLE | 이 Spliterator의 소스는 불변으로 요소를 탐색하는 동안 CUD 동작을 할 수 없다. |
CONCURRENT | 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다. |
SUBSIZED | 이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED특성을 갖는다. |
커스텀 Spliterator 구현하기
문자열의 단어 수를 계산하는 단순한 메서드를 구현해보고 Spliterator를 구현해서 적용하자
public static int countWordsIteratively(String s){ //문자열의 모든 문자를 하나씩 탐색
int counter = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) {
if (Character.isWhitespace(c)){
lastSpace = true;
} else {
if (lastSpace) counter++; //공백 문자를 만나면 true이기 때문에 공백 다음 문자가 오면 ++
lastSpace = false;
}
}
return counter;
}
함수형으로 단어 수를 세는 메서드 재구현하기
String은 기본형 스트림이 없어 Stream<Charater>를 사용해야 한다.
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCounter accumulate(Character c){
if (Character.isWhitespace(c)){
return lastSpace ? this : new WordCounter(counter,true);
} else {
return lastSpace ? new WordCounter(counter+1,false) : this;
}
}
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.counter,
wordCounter.lastSpace); //counter 값만 더할 것이므로 마지막 공백은 신경쓰지 않는다.
}
public int getCounter() {
return counter;
}
}
WordCounter는 불변 클래스이므로 새로운 WordCounter클래스를 어떤 상태로 생성할 것인지 정의한다.
public static int countWords(Stream<Character> stream){
WordCounter wordCounter = stream.reduce(new WordCounter(0,true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();
}
Stream<Character> stream = IntStream.range(0, inputStr.length())
.mapToObj(inputStr::charAt);
countWords(stream); //동일한 값
지금은 스트림에서 리듀싱 연산을 수행할 때 지금까지 발견한 단어 수를 누적할 변수와 마지막 문자의 공백 여부를 기억하는 논리값 두 가지 공유 변수를 이용했다.
WordCounter 병렬로 수행하기
위에 작성한 countWords()에 stream.parallel()으로 인자를 전달하면 잘못된 값이 나오는 것을 알 수 있다.
- 순차 스트림을 병렬 스트림으로 바꿀 때 스트림 분할 위치에 따라 잘못된 결과가 나올 수 있다.
- 단어 끝에서 문자열을 분할하는 문자 Splliterator를 사용해 임의의 위치에서 분할하지 말고 단어가 끝나는 위치에서만 분할해야 한다.
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++)); //현재 문자를 소비한다.
return currentChar < string.length(); //소비할 문자가 남아있으면 True
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if (currentSize < 10) {
return null; // 파싱할 문자열을 순차 처리할 수 있을 만큼 충분히 작아졌음을 알리는 null을 반환한다.
}
for (int splitPos = currentSize /2 + currentChar; //파싱할 문자열의 중간을 분할 위치로 설정
splitPos < string.length(); splitPos++){
if (Character.isWhitespace(string.charAt(splitPos))) { //다음 공백이 나올 때까지 분할 위치를 뒤로 이동 시킨다.
Spliterator<Character> spliterator =
new WordCounterSpliterator(string.substring(currentChar, splitPos)); //문자열+" "일 때 공백 잘라서 들어감
currentChar = splitPos; //위 spliterator의 시작 위치를 분할 위치로 설정한다.
return spliterator; //공백을 찾았고 문자열을 분리했으므로 루프 종료
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}
public boolean tryAdvance(Consumer<? super Character> action)
: 문자열에서 현재 인덱스에 해당하는 문자를 Consumer에 제공하고 인덱스를 증가시킨다. 인수로 전달된 Consumer는 스트림을 탐색하며 적용해야 하는 함수 집합이 작업을 처리할 수 있게 소비한 문자를 전달하는 자바 내부 클래스이다.
public WordCounter accumulate(Character c){
if (Character.isWhitespace(c)){
return lastSpace ? this : new WordCounter(counter,true);
} else {
return lastSpace ? new WordCounter(counter+1,false) : this;
}
}
- WordCounter의 accumulate 메서드만 적용한다.
- 예제에서는 단어 중간을 분할하지 않도록 빈 문자가 나올때까지 분할 위치를 이동시켰다.
- 지금은 한계값이 10개로 설정했지만 너무 많은 태스크를 만들지 않도록 더 높은 한계값을 설정해야 한다.
- ORDERED: 문자열의 문자 등장 순서가 유의미함
- SIZED: estimatedSize 메서드의 반환값이 정확함
- SUBSIZED: typSplit으로 생성된 Spliterator도 정확한 크기를 가진다.
- NONNULL: 문자열은 NULL이 아니다.
- IMMUTABLE: 문자열 자체가 불변 클래스라서 문자열을 파싱하면 속성이 추가되지 않는 정보를 알려준다.
WordCounterSpliterator 활용
StreamSupport.stream 팩토리 메서드는 두 번째 파라미터로 병렬 스트림 생성 여부를 지시한다.
Spliterator<Character> wordCounterSpliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(wordCounterSpliterator, true);
System.out.println("countWord = " + countWords(stream));
'책 > 모던 자바 인 액션' 카테고리의 다른 글
9. 리팩터링, 테스팅, 디버깅 (0) | 2023.08.02 |
---|---|
8. 컬렉션 API 개선 (0) | 2023.07.28 |
6. 스트림으로 데이터 수집 (0) | 2023.07.21 |
5. 스트림 활용 (0) | 2023.07.17 |
4. 스트림 소개 (0) | 2023.07.14 |