BackEnd/spring

대용량 데이터 수집을 병렬 처리로 최적화하기

연향동큰손 2025. 9. 15. 16:23

프로젝트를 진행하면서 고용24에서 제공하는 "내일배움카드 교육" 정보를 OPEN API로 호출하는 방법 대신 DB에 저장하여 조회하는 방식으로 변경하기 위해 모든 데이터를 저장을 해봤다.

 

국민내일배움카드 훈련과정 API는 총 3가지로 분류된다.

  1. 훈련과정 목록
  2. 훈련과정,기관 정보
  3. 훈련일정

훈련과정의 총 개수는 88366개이다.

 

모든 훈련과정을 저장하기 위해 훈련과정 목록을 저장했고 모든 지역의 훈련과정을 순차적으로 검색했을때 총 1시간 37분이 걸렸다.

 

이렇게 저장된 훈련과정 ID를 사용하여 훈련과정,기관정보, 훈련일정을 검색하기 위해서는 하나의 훈련 ID당 두번의 API호출이 필요했다.

즉, 상세정보를 저장하기 위한 총 조회 횟수는 16만번 이상이 된다.

 

88366개의 데이터를 순차적으로 돌면서 상세정보를 수집하니 총 5시간 45분이 걸렸다.

 

데이터 수집 과정에서 너무 많은 시간이 소요되는 문제점을 확인하게 되었고, 최적화 필요성을 느끼게 되었다.


스레드 풀 기반 병렬 처리

 

기존에는 한 작업이 완료되어야만 다음 작업이 시작되는 순차적 방식을 사용했기 때문에 전체 요청 시간이 길어졌다.


그러나 스레드 풀 기반의 병렬 처리를 도입하면서 여러 작업을 동시에 실행할 수 있었고, 그 결과 데이터 수집에 소요되는 시간을 크게 단축할 수 있었다.

 

Thread Pool(스레드 풀)

스레드 풀이란 스레드를 미리 생성하고, 작업 요청이 발생할 때 마다 미리 생성된 스레드로 해당 작업을 처리하는 방식을 의미한다.

요청이 들어오게 되면 작업큐에 넣고 스레드풀은 작업큐에 들어온 Task일감을 미리 생성해놓은 Tread들에게 일감을 할당하게 된다.

이러한 방식을 사용하게 되면 OPEN API호출과 저장 과정을 병렬처리 하기 때문에 총 소요 시간을 크게 줄일 수 있다.


구현방식

 

우선 병렬처리용 스레드 풀 설정을 위해 ThreadPoolTaskExecutor를 빈으로 등록해준다.

ThreadPoolTaskExecutor은 스프링에서 제공하는 스레드 풀 구현체이다.

여기서 스레드 풀의 기본 동작 방식을 정의할 수 있다. 

@Configuration
public class HrdExecutorConfig {

    @Bean(name = "hrdExecutor")
    public ThreadPoolTaskExecutor hrdExecutor() {
        var ex = new ThreadPoolTaskExecutor();
        ex.setCorePoolSize(8); //스레드 8개 상시 대기      
        ex.setMaxPoolSize(16); //최대 16개까지 확장 가능
        ex.setQueueCapacity(2000); //작업 대기열 크기(TaskQueue에 최대 2000개의 작업 대기)
        ex.setThreadNamePrefix("hrd-"); //스레드 이름
        ex.initialize(); //스레드 풀 초기화
        return ex;
    }
}
  • setCorePoolSize : 풀에서 항상 유지되는 최소 스레드 수(0으로 설정하면 작업이 없을 때는 스레드 유지 X)
  • setMaxPoolSize : 작업이 몰려서 큐가 꽉차게 되면 최대 스레드 개수로 늘림
  • setQueueCapacity : 작업 대기열 크기
  • setThreadNamePrefix : 풀에서 생성된 스레드 이름
  • initialize : 스레드 풀 초기화

 

이제 이 스레드 풀을 저장로직이 있는 Service 클래스에서 사용하여 병렬처리를 적용시키면 된다.

 public int harvestFullMonthsAheadParallel(int months, String area1, String ncs1,
                                              int pageSize, int maxItems, int concurrencyIgnored) {
        LocalDate today = LocalDate.now(KST);
        LocalDate end   = today.plusMonths(months);

        Specification<HrdCourseCatalog> spec = Specification.allOf(
                betweenDates(today, end),
                (area1 == null || area1.isBlank()) ? null : eqArea(area1),
                (ncs1  == null || ncs1.isBlank())  ? null : startsWithNcs(ncs1)
        );

        int processed = 0;
        int page = 0;

        while (true) {
            Pageable pageable = PageRequest.of(page, Math.max(pageSize,1), Sort.by("traStartDate").ascending());
            Page<HrdCourseCatalog> slice = repo.findAll(spec, pageable);
            if (slice.isEmpty()) break;

            // 이 페이지의 작업들을 모두 비동기 제출
            List<CompletableFuture<Void>> futures = new ArrayList<>();

            for (HrdCourseCatalog c : slice) {
                String torgId = ensureTorgId(c); // torgId 확보
                if (isBlank(torgId)) continue;

                // runAsync에 hrdExecutor를 넘겨서 '내가 구성한 스레드 풀'에서 실행되도록 함.
                // → corePoolSize(8) 만큼 동시에 실행, 나머지는 풀 큐에 대기.
                futures.add(CompletableFuture.runAsync(() -> {
                    try {
                        // 각 태스크는 풀의 워커 스레드에서 실행됨.
                        // getCourseFull() 내부 @Transactional 덕분에 호출마다 독립 트랜잭션이 열리고, 작업이 끝나면 커밋/종료된다.
                        hrdSearchService.getCourseFull(c.getTrprId(), c.getTrprDegr(), torgId);
                    } catch (Exception ex) {
                        // 개별 작업 실패는 전체 진행을 멈추지 않도록 로그만 남김
                        log.warn("harvest-full fail trprId={}, degr={}, torgId={}, msg={}",
                                c.getTrprId(), c.getTrprDegr(), torgId, ex.getMessage());
                    }
                }, hrdExecutor));

                processed++;
                if (maxItems > 0 && processed >= maxItems) break; // 전체 최대 건수 제한
            }

            // 현재 페이지에서 제출한 모든 비동기 작업이 끝날 때까지 대기
            // 페이지 단위로 동기화하여 메모리 사용/백프레셔를 유지 (무한정 큐 적재 방지)
            futures.forEach(CompletableFuture::join);

            if (maxItems > 0 && processed >= maxItems) break;
            if (!slice.hasNext()) break; // 다음 페이지 없으면 종료
            page++;
        }

        log.info("harvest-full-parallel done: processed={}", processed);
        return processed;
    }

 

CompletableFuture.runAsync 비동기 작업 실행을 지원하는 유틸리티 메서드인데, 여기서는 미리 설정해둔 전용 스레드 (hrdExecutor) 함께 사용하여 내일배움카드 훈련과정 데이터 저장 로직을 여러 스레드에서 동시에 실행하도록 구성하였다.


처리시간 비교

순차적 처리 : 5시간 45분 49초

 

병렬처리 적용 : 1시간 5분 27초

 

비교 결과 병렬처리를 적용했을때 순차 처리 대비 5.28배 정도 수집 속도가 향상된 것을 수치상으로 확인 가능했다.