현재 서비스는 Spring Batch 를 활용하여 다음과 같은 기능을 구현하고 있다.

고유 번호, 기업 주요 재무 지표 수집/적재 - #1

진행과정

Dart Api 를 활용하서 고유번호를 파일로 가져와 압축을 해제후 xml 형태의 데이터를 json 형태로 변환 후 rdb 에 저장하는 로직을 구현하였고 10만개가 넘는 기업 정보를 spring batch를 활용하여 자동화 하고자 하였다

이 코드는 Spring Batch를 사용하여 DART API에서 기업 정보를 주기적으로 가져와 처리하는 작업을 정의한 배치 프로세스입니다. 이 작업은 일정한 시간에 맞춰 실행되며, 데이터를 가져와 저장하는 과정에서 Chunk-Oriented Processing이 아닌 Tasklet 기반의 간단한 프로세스를 사용합니다. 각 클래스의 역할과 데이터 처리 과정을 설명하면 다음과 같습니다:

  1. BatchConfig

@Configuration
@RequiredArgsConstructor
@Slf4j
public class DartCorporateApiBatchConfig extends DefaultBatchConfiguration {

    /*
        Chunk-Oriented Processing은 대용량 데이터를 단계별로 처리할 때 유리하며, 대규모 배치 작업에 적합합니다.
        Tasklet은 작업이 단순하고 데이터의 양이 많지 않거나, 단일 작업(예: API 호출, 파일 처리)에서 효율적입니다.
     */

    private final RepeatScheduleTasklet repeatScheduleTasklet;

    @Bean
    public Job dartApiJob(JobRepository jobRepository, Step dartApiStep) {
        return new JobBuilder("dartApiJob", jobRepository)
            .start(dartApiStep)
            .build();
    }

    @Bean
    public Step dartApiStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("dartApiStep", jobRepository)
            .tasklet(repeatScheduleTasklet, transactionManager)
            .build();
    }

}

  1. RepeatScheduleTasklet

@Component
@RequiredArgsConstructor
@Slf4j
public class RepeatScheduleTasklet implements Tasklet {

    private final CorporateService corporateService;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
        throws Exception {
        // CorporateService의 메서드를 호출하여 DART API 정보를 처리
        List<CorporateCodesResponseDto> result = corporateService.fetchAndSaveCorpCodeInfo();
        log.info("DART API 에서 반환된 데이터 수: {}", result.size());
        return RepeatStatus.FINISHED;  // Tasklet 작업이 완료되면 반환
    }
}

  1. DartCorporateApiJobScheduler
package com.tradingtrends.batch.infrastructure.trigger;

import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.util.SoftHashMap;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class DartCorporateApiJobScheduler {

    private final Job job;
    private final JobLauncher jobLauncher;

    @Scheduled(cron = "0 2 16 * * *", zone = "Asia/Seoul")
    public void scheduleDartApiJob() {
        log.info("Scheduled Dart API Job started at: {}", System.currentTimeMillis());
        this.schedulerRun(this.job);
    }

    private void schedulerRun(final Job job) {
        final Map<String, JobParameter<?>> jobParametersMap = new SoftHashMap<>();
        jobParametersMap.put("time", new JobParameter<>(System.currentTimeMillis(), Long.class));
        final JobParameters jobParameters = new JobParameters(jobParametersMap);
        try {
            log.info("Running the Dart API job with parameters: {}", jobParameters);
            this.jobLauncher.run(job, jobParameters);
            log.info("Job executed successfully.");
        } catch (JobExecutionAlreadyRunningException e) {
            log.error("Job execution already running: {}", e.getMessage());
        } catch (JobInstanceAlreadyCompleteException e) {
            log.error("Job instance already complete: {}", e.getMessage());
        } catch (JobParametersInvalidException e) {
            log.error("Invalid job parameters: {}", e.getMessage());
        } catch (JobRestartException e) {
            log.error("Job restart exception: {}", e.getMessage());
        }
    }
}

리펙토링

현재 10만개이상의 데이터를 가져오기 위해서는 많은 시간이 필요하고 Tasklet 기반으로 데이터를 처리하고 있는데 중간에 서버가 다운되면 각각의 스텝은 별도의 트랜잭션안에서 실행되기 때문에 해당 트랜잭션은 롤백되고 db에는 어떠한 데이터도 담지 않게 되는 문제점이 존재한다. 부분적인 트랜잭션 처리가 필요하다.

때문에 spring batch 의 step 작업을 청크 기반 처리를 하여 청크 크기를 알맞게 설정하여 청크 처리 도중 오류가 발생하면 해당 청크는 롤백되고, 처리된 청크 크기만큼만 db에 저장되도록 수정중에 있다.

결론

청크 기반의 Spring Batch는 기본적으로 실패 후 재실행을 지원하도록 설계되어 있습니다.

하나의 압출 파일의 10만 개 이상의 데이터가 들어가 있기 때문에 청크 기반의 Batch Config 작성은 의미가 없다고 판단 Tasklet 기반으로 진행