RxParallelRunner 소개
소개
앱에서 대량의 데이터를 읽거나 분석하는 작업은 처리 속도가 느리면 사용자 경험에 부정적인 영향을 미칠 수 있다.
이것을 최근에 겪어서 성능 처리에 대한 고민이 깊어졌다. 우선.. RxParallelRunner 이 남용되서 사용되었던 점이 문제였다. 이번 기회에 RxParallelRunner 를 다뤄서 제대로 사용해보고자 한다!
대규모 데이터를 병렬로 처리하는 앱 개발 중, 다음과 같은 문제에 직면할 수 있다.
1. 작업 속도 문제
- 처리해야할 데이터가 커지면 작업이 느려진다.
2. 자원 관리 문제
- 작업이 많아질경우 스레드 수 증가로 GC가 과도하게 발생될 수 있다.
3. 스레드 관리 문제
- 모든 과하면 탈난다. 스레드가 과도하게 생성되면 비효율적이다.
RxParallelRunner 란?
RxParallelRunner는 RxJava를 기반으로 비동기 작업을 병렬로 처리하고 결과를 조합하는 유틸리티 클래스이다. 데이터를 병렬로 처리하는 단순한 접근 방식이다. 작업을 병렬로 실행하면 속도가 개선된다. 그러나 스레드 수 증가로 GC가 과도하게 발생하고 UI에서 성능이 저하될 수 있다.
아래와 같이 처리하면 concatMap으로 순서대로 데이터를 읽으면서 비동기로 작업을 실행할 수 있다. 각 작업은 병렬로 처리되지만 결과는 순서대로 반환된다. 비동기로 먼저 처리하고 결과를 AndroidSchedulers.mainThread() 메인 스레드에서 처리하게 할 수 있다.
Observable.fromIterable(dataList)
.concatMap(data ->
Observable.fromCallable(() -> processData(data))
.subscribeOn(Schedulers.io())
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> System.out.println("Processed: " + result),
throwable -> System.err.println("Error: " + throwable.getMessage())
);
*Schedulers.io() 사용하면 비동기로 작업을 실행한다, subscribe() 는 작업 결과를 출력하거나 에러를 처리하게 된다.
제한된 스레드 풀 활용
RxParallelRunner 활용해 스레드 풀이 제한된 작업을 수행할 수 있다. 스레드 수를 제한하면 GC 호출 빈도를 줄일 수 있다. 여기서는 최대 2개로 스레드 제한을 걸었다. flatMap 을 쓰면 순서에 보장하지 않고 비동기로 처리할 수 있게 된다.
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<String>> tasks = dataList.stream()
.map(data -> (Callable<String>) () -> processData(data))
.collect(Collectors.toList());
Observable.fromIterable(tasks)
.flatMap(task ->
Observable.fromCallable(task)
.subscribeOn(Schedulers.from(executorService))
)
.toList()
.subscribe(
results -> System.out.println("Processed: " + results),
throwable -> System.err.println("Error: " + throwable.getMessage())
);
executorService.shutdown();
데이터 그룹화 및 최적화
작업을 미리 그룹화하면 하나의 스레드가 여러 작업을 처리하는데 도움이 될 수 있다. 데이터가 많을때 스레드 전환 오버헤드를 줄일 수 있다.
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
List<Callable<List<String>>> groupedTasks = groupedData.stream()
.map(group -> (Callable<List<String>>) () -> processGroup(group))
.collect(Collectors.toList());
Observable.fromIterable(groupedTasks)
.flatMap(task ->
Observable.fromCallable(task)
.subscribeOn(Schedulers.from(executorService))
)
.toList()
.subscribe(
results -> System.out.println("Processed: " + results),
throwable -> System.err.println("Error: " + throwable.getMessage())
);
executorService.shutdown();
completableFuture Java 기본 라이브러리로 병렬 처리를 수행할 수 있지만, RxJava 만큼 유연하지 않다. 좀 더 복잡한 워크플로우가 필요한 Akka Streams 를 사용하는듯 하다.
결론적으로..
CompletableFuture 는 수백건,
RxJava 는 실시간 처리가 필요하고 수천건 단, 백프레셔가 없으면 메모리 문제가 생길 수 있다.
Akka Streams 수백만건에 적합한듯하다.
*백프레셔 = 데이터 생성 속도와 소비 속도 사이의 불균형을 해결하기 위한 흐름 제어
'IT' 카테고리의 다른 글
Internal, external gateway 에 대해서 (1) | 2024.11.25 |
---|---|
가상 면접 사례로 배우는 대규모 시스템 설계 기초 4장 (1) | 2024.10.06 |
[VMware Tanzu] Spring Boot 밋업 with Josh Long (0) | 2024.09.29 |
Spring Batch 5 migration guide (0) | 2024.09.01 |
몽고디비 모델링, 자주사용하는 연산자 정리 (0) | 2024.08.19 |
댓글
이 글 공유하기
다른 글
-
Internal, external gateway 에 대해서
Internal, external gateway 에 대해서
2024.11.25 -
가상 면접 사례로 배우는 대규모 시스템 설계 기초 4장
가상 면접 사례로 배우는 대규모 시스템 설계 기초 4장
2024.10.06 -
[VMware Tanzu] Spring Boot 밋업 with Josh Long
[VMware Tanzu] Spring Boot 밋업 with Josh Long
2024.09.29 -
Spring Batch 5 migration guide
Spring Batch 5 migration guide
2024.09.01