소개

앱에서 대량의 데이터를 읽거나 분석하는 작업은 처리 속도가 느리면 사용자 경험에 부정적인 영향을 미칠 수 있다. 
이것을 최근에 겪어서 성능 처리에 대한 고민이 깊어졌다. 우선.. RxParallelRunner 이 남용되서 사용되었던 점이 문제였다. 이번 기회에 RxParallelRunner 를 다뤄서 제대로 사용해보고자 한다! 

어떻게 쓰는건데..


대규모 데이터를 병렬로 처리하는 앱 개발 중, 다음과 같은 문제에 직면할 수 있다.
1. 작업 속도 문제
- 처리해야할 데이터가 커지면 작업이 느려진다. 
2. 자원 관리 문제
- 작업이 많아질경우 스레드 수 증가로 GC가 과도하게 발생될 수 있다.
3. 스레드 관리 문제
- 모든 과하면 탈난다. 스레드가 과도하게 생성되면 비효율적이다. 

 

RxParallelRunner 란?

RxParallelRunner는 RxJava를 기반으로 비동기 작업을 병렬로 처리하고 결과를 조합하는 유틸리티 클래스이다. 데이터를 병렬로 처리하는 단순한 접근 방식이다. 작업을 병렬로 실행하면 속도가 개선된다. 그러나 스레드 수 증가로 GC가 과도하게 발생하고 UI에서 성능이 저하될 수 있다. 

아래와 같이 처리하면 concatMap으로 순서대로 데이터를 읽으면서 비동기로 작업을 실행할 수 있다. 각 작업은 병렬로 처리되지만 결과는 순서대로 반환된다. 비동기로 먼저 처리하고 결과를 AndroidSchedulers.mainThread() 메인 스레드에서 처리하게 할 수 있다. 

Observable.fromIterable(dataList)
    .concatMap(data ->
        Observable.fromCallable(() -> processData(data))
            .subscribeOn(Schedulers.io()) 
    )
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(
        result -> System.out.println("Processed: " + result),
        throwable -> System.err.println("Error: " + throwable.getMessage())
    );

*Schedulers.io() 사용하면 비동기로 작업을 실행한다, subscribe() 는 작업 결과를 출력하거나 에러를 처리하게 된다. 

제한된 스레드 풀 활용

RxParallelRunner 활용해 스레드 풀이 제한된 작업을 수행할 수 있다. 스레드 수를 제한하면 GC 호출 빈도를 줄일 수 있다. 여기서는 최대 2개로 스레드 제한을 걸었다. flatMap 을 쓰면 순서에 보장하지 않고 비동기로 처리할 수 있게 된다. 

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        List<Callable<String>> tasks = dataList.stream()
                .map(data -> (Callable<String>) () -> processData(data))
                .collect(Collectors.toList());

        Observable.fromIterable(tasks)
                .flatMap(task ->
                        Observable.fromCallable(task)
                                .subscribeOn(Schedulers.from(executorService))
                )
                .toList()
                .subscribe(
                        results -> System.out.println("Processed: " + results),
                        throwable -> System.err.println("Error: " + throwable.getMessage())
                );

        executorService.shutdown();

데이터 그룹화 및 최적화

작업을 미리 그룹화하면 하나의 스레드가 여러 작업을 처리하는데 도움이 될 수 있다. 데이터가 많을때 스레드 전환 오버헤드를 줄일 수 있다. 

       ExecutorService executorService = Executors.newFixedThreadPool(numThreads);

        List<Callable<List<String>>> groupedTasks = groupedData.stream()
            .map(group -> (Callable<List<String>>) () -> processGroup(group))
            .collect(Collectors.toList());

        Observable.fromIterable(groupedTasks)
            .flatMap(task ->
                Observable.fromCallable(task)
                    .subscribeOn(Schedulers.from(executorService)) 
            )
            .toList()
            .subscribe(
                results -> System.out.println("Processed: " + results),
                throwable -> System.err.println("Error: " + throwable.getMessage())
            );

        executorService.shutdown();

completableFuture Java 기본 라이브러리로 병렬 처리를 수행할 수 있지만, RxJava 만큼 유연하지 않다. 좀 더 복잡한 워크플로우가 필요한 Akka Streams 를 사용하는듯 하다. 

조금 정리해보면..

결론적으로..

CompletableFuture 는 수백건,
RxJava 는 실시간 처리가 필요하고 수천건 단, 백프레셔가 없으면 메모리 문제가 생길 수 있다.
Akka Streams 수백만건에 적합한듯하다.  
*백프레셔 = 데이터 생성 속도와 소비 속도 사이의 불균형을 해결하기 위한 흐름 제어