ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 15. ItemReader (DB)
    BackEnd/Spring Batch 2021. 12. 31. 00:25
    반응형

      배치 애플리케이션은 실시간적 처리가 어려운 대용량 데이터를 다루며 이 때 DB I/O의 성능문제와 메모리 자원의 효율성 문제를 해결할 수 있어야 합니다. 스프링 배치에서는 대용량 데이터 처리를 위해 Cusor Based / Paging Based 두 가지 처리 방식을 제시하고 있습니다.

     

    Cursor Based 처리

    • JDBC ResultSet의 기본 메커니즘 사용
    • 현재 행에 커서를 유지하며 다음 데이터를 호출하면 다음 행으로 커서를 이동하며 데이터 반환이 이루어지는 Streaming 방식의 I/O
    • ResultSet이 open 될 때마다 next() 메소드가 호출되어 Database의 데이터가 반환되고 객체와 매핑이 이루어짐
    • DB Connection이 연결되면 배치 처리가 완료될 때까지 데이터를 읽어오기 때문에 DB와 SocketTimeout을 충분히 큰 값으로 설정 필요
    • 모든 결과를 메모리에 할당하기 때문에 메모리 사용량이 많아지는 단점 존재
    • Connection 연결 유지 시간과 메모리 공간이 충분하다면 대량의 데이터 처리에 적합할 수 있음 (fetchSize 조절)

     

    Paging Based 처리

    • 페이징 단위로 데이터를 조회하는 방식으로 Page Size만큼 한번에 메모리로 가지고 온 다음 한 개씩 read
    • 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 대량의 데이터를 처리하더라도 SocketTimeout 예외가 거의 일어나지 않음
    • 시작 행 번호를 지정하고 페이지에 반환시키고자 하는 행의 수를 지정한 후 사용: Offset, Limit
    • 페이징 단위의 결과만 메모리에 할당하기 때문에 메모리 사용량이 적어지는 장점 존재
    • Connection 연결 유지 시간이 길지 않고 메모리 공간을 효율적으로 사용해야 하는 데이터 처리에 적합할 수 있음

    Cusor Based vs Paging Based

     

    JdbcCursorItemReader

      Cursor 기반의 JDBC 구현체로서 ResultSet과 함께 사용되며 Datasource에서 Connection을 얻어와서 SQL을 실행합니다. Thread 안정성을 보장하지 않기 때문에 멀티 스레드 환경에서 사용할 경우 동시성 이슈가 발생하지 않도록 별도 동기화 처리가 필요합니다. 

     

    API

    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.*;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.database.JdbcCursorItemReader;
    import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
    import org.springframework.batch.item.json.JacksonJsonObjectReader;
    import org.springframework.batch.item.json.JsonItemReader;
    import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
    import org.springframework.batch.item.xml.StaxEventItemReader;
    import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.oxm.xstream.XStreamMarshaller;
    
    import javax.sql.DataSource;
    import java.sql.Types;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    @RequiredArgsConstructor
    @Configuration
    public class JdbcCursorConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource;
    
        @Bean
        public Job job() {
            return jobBuilderFactory.get("batchJob")
                    .incrementer(new RunIdIncrementer())
                    .start(step1())
                    .build();
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .<Customer, Customer>chunk(10)
                    .reader(customItemReader())
                    .writer(customItemWriter())
                    .build();
        }
    
        @Bean
        public JdbcCursorItemReader<Customer> customItemReader() {
            return new JdbcCursorItemReaderBuilder()
                    .name("jdbcCursorItemReader")
                    .fetchSize(10) // chunk size와 동일하게 해준다.
                    .sql("select id, firstName, lastName, birthdate from customer where firstName like ? order by lastName, firstName")
                    .beanRowMapper(Customer.class)
                    .queryArguments("A%")
                    .maxItemCount(3)
                    .currentItemCount(2)
                    .maxRows(100)
                    .dataSource(dataSource)
                    .build();
        }
    
        @Bean
        public ItemWriter<Customer> customItemWriter() {
            return items -> {
                for (Customer item : items) {
                    System.out.println(item.toString());
                }
            };
        }
    }

     

    JpaCursorItemReader

      Cursor 기반의 JPA 구현체로서 EntityManagerFactory 객체가 필요하며 쿼리는 JPQL을 사용합니다. (Spring Batch 4.3버전부터 지원)

     

    API

    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.*;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.database.JdbcCursorItemReader;
    import org.springframework.batch.item.database.JpaCursorItemReader;
    import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
    import org.springframework.batch.item.database.builder.JpaCursorItemReaderBuilder;
    import org.springframework.batch.item.json.JacksonJsonObjectReader;
    import org.springframework.batch.item.json.JsonItemReader;
    import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
    import org.springframework.batch.item.xml.StaxEventItemReader;
    import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.oxm.xstream.XStreamMarshaller;
    
    import javax.persistence.EntityManagerFactory;
    import javax.sql.DataSource;
    import java.sql.Types;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    @RequiredArgsConstructor
    @Configuration
    public class JpaCursorConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final EntityManagerFactory entityManagerFactory;
    
        @Bean
        public Job job() {
            return jobBuilderFactory.get("batchJob")
                    .incrementer(new RunIdIncrementer())
                    .start(step1())
                    .build();
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .<Customer, Customer>chunk(2)
                    .reader(customItemReader())
                    .writer(customItemWriter())
                    .build();
        }
    
        @Bean
        public JpaCursorItemReader<Customer> customItemReader() {
    
            HashMap<String, Object> parameters = new HashMap<>();
            parameters.put("firstname", "A%");
    
            return new JpaCursorItemReaderBuilder()
                    .name("jpaCursorItemReader")
                    .queryString("select c from Customer c where firstname like :firstname")
                    .entityManagerFactory(entityManagerFactory)
                    .parameterValues(parameters)
    //                .maxItemCount(10)
    //                .currentItemCount(2)
                    .build();
        }
    
        @Bean
        public ItemWriter<Customer> customItemWriter() {
            return items -> {
                for (Customer item : items) {
                    System.out.println(item.toString());
                }
            };
        }
    }

     

    JdbcPagingItemReader

      Paging 기반의 JDBC 구현체로서 쿼리에 시작 행 번호(offset)와 페이지에서 반환할 행 수(limit)를 지정해서 SQL을 실행합니다. 스프링 배치에서 offset과 limit을 PageSize에 맞게 자동으로 생성해 주며 페이징 단위로 데이터를 조회할 때 마다 새로운 쿼리가 실행됩니다. 페이지마다 새로운 쿼리를 실행하기 때문에 페이징 시 결과 데이터의 순서가 보장될 수 있도록 order by 구문이 작성되어야 합니다. 멀티 스레드 환경에서 Thread 안정성을 보장하기 때문에 별도의 동기화를 할 필요가 없습니다.

     

    API

     

    PagingQueryProvider

      쿼리 실행에 필요한 쿼리문을 ItemReader에게 제공하는 클래스로 데이터베이스마다 페이징 전략이 다르기 때문에 각 데이터베이스 유형마다 다른 PagingQueryProvider를 사용합니다. 아래 API가 해당되며 select, from, sortKey 절은 필수로 설정해야 하며 where, group by 절은 필수가 아닙니다.

    • .selectClause
    • .fromClause
    • .whereClause
    • .groupClause
    • .sortKeys
    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.*;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.database.*;
    import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
    import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
    import org.springframework.batch.item.database.builder.JpaCursorItemReaderBuilder;
    import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
    import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
    import org.springframework.batch.item.json.JacksonJsonObjectReader;
    import org.springframework.batch.item.json.JsonItemReader;
    import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
    import org.springframework.batch.item.xml.StaxEventItemReader;
    import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.jdbc.core.BeanPropertyRowMapper;
    import org.springframework.oxm.xstream.XStreamMarshaller;
    
    import javax.persistence.EntityManagerFactory;
    import javax.sql.DataSource;
    import java.sql.Types;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    @RequiredArgsConstructor
    @Configuration
    public class JdbcPagingConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        public final DataSource dataSource;
    
        @Bean
        public Job job() throws Exception {
            return jobBuilderFactory.get("batchJob")
                    .incrementer(new RunIdIncrementer())
                    .start(step1())
                    .build();
        }
    
        @Bean
        public Step step1() throws Exception {
            return stepBuilderFactory.get("step1")
                    .<Customer, Customer>chunk(10)
                    .reader(customItemReader())
                    .writer(customItemWriter())
                    .build();
        }
    
        @Bean
        public JdbcPagingItemReader<Customer> customItemReader() throws Exception {
    
            HashMap<String, Object> parameters = new HashMap<>();
            parameters.put("firstname", "A%");
    
            return new JdbcPagingItemReaderBuilder<Customer>()
                    .name("jdbcPagingItemReader")
                    .pageSize(10)
                    .fetchSize(10)
                    .dataSource(dataSource)
                    .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                    .queryProvider(createQueryProvider())
    				.parameterValues(parameters)
                    .build();
        }
    
        @Bean
        public PagingQueryProvider createQueryProvider() throws Exception {
            SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
            queryProvider.setDataSource(dataSource);
            queryProvider.setSelectClause("id,firstName,lastName,birthdate");
            queryProvider.setFromClause("from customer");
            queryProvider.setWhereClause("where firstName like :firstname");
    
            Map<String, Order> sortKeys = new HashMap<>(1);
            sortKeys.put("id", Order.ASCENDING);
    
            queryProvider.setSortKeys(sortKeys);
    
            return queryProvider.getObject();
        }
    
       /* @Bean
        public JdbcPagingItemReader<Customer> customItemReader() {
            JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
            reader.setDataSource(this.dataSource);
            reader.setFetchSize(5);
            reader.setPageSize(5);
            reader.setSaveState(true);
            reader.setRowMapper(new CustomerRowMapper());
            HashMap<String, Object> parameters = new HashMap<>();
            parameters.put("firstname", "A%");
            reader.setParameterValues(parameters);
            MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
            queryProvider.setSelectClause("id, firstName, lastName, birthdate");
            queryProvider.setFromClause("from customer");
            queryProvider.setWhereClause("where firstname like :firstname");
            Map<String, Order> sortKeys = new HashMap<>(1);
            sortKeys.put("id", Order.ASCENDING);
            queryProvider.setSortKeys(sortKeys);
            reader.setQueryProvider(queryProvider);
            return reader;
        }*/
    
        @Bean
        public ItemWriter<Customer> customItemWriter() {
            return items -> {
                for (Customer item : items) {
                    System.out.println(item.toString());
                }
            };
        }
    }

     

    JpaPagingItemReader

      Paging 기반의 JPA 구현체로서 EntityManagerFactory 객체가 필요하며 쿼리는 JPQL을 사용합니다.

     

    API

    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.*;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.database.*;
    import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
    import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
    import org.springframework.batch.item.database.builder.JpaCursorItemReaderBuilder;
    import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
    import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
    import org.springframework.batch.item.json.JacksonJsonObjectReader;
    import org.springframework.batch.item.json.JsonItemReader;
    import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
    import org.springframework.batch.item.xml.StaxEventItemReader;
    import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.CustomAutowireConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.jdbc.core.BeanPropertyRowMapper;
    import org.springframework.oxm.xstream.XStreamMarshaller;
    
    import javax.persistence.EntityManagerFactory;
    import javax.sql.DataSource;
    import java.sql.Types;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    @RequiredArgsConstructor
    @Configuration
    public class JpaPagingConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final EntityManagerFactory entityManagerFactory;
    
        @Bean
        public Job job() throws Exception {
            return jobBuilderFactory.get("batchJob")
                    .incrementer(new RunIdIncrementer())
                    .start(step1())
                    .build();
        }
    
        @Bean
        public Step step1() throws Exception {
            return stepBuilderFactory.get("step1")
                    .<Customer, Customer>chunk(10)
                    .reader(customItemReader())
                    .writer(customItemWriter())
                    .build();
        }
    
        @Bean
        public JpaPagingItemReader<Customer> customItemReader() {
    
            return new JpaPagingItemReaderBuilder<Customer>()
                    .name("jpaPagingItemReader")
                    .entityManagerFactory(entityManagerFactory)
                    .pageSize(10)
                    .queryString("select c from Customer c join fetch c.address")
                    .build();
        }
    
        @Bean
        public ItemWriter<Customer> customItemWriter() {
            return items -> {
                for (Customer customer : items) {
                    System.out.println(customer.getAddress().getLocation());
                }
            };
        }
    }

     

    [참고자료]

    인프런-스프링 배치 - Spring Boot 기반으로 개발하는 Spring Batch

    반응형

    'BackEnd > Spring Batch' 카테고리의 다른 글

    17. ItemWriter (File)  (0) 2021.12.31
    16. ItemReaderAdapter  (0) 2021.12.31
    14. ItemReader (Json)  (0) 2021.12.30
    13. ItemReader (XML)  (0) 2021.12.30
    12. ItemReader (File)  (0) 2021.12.30

    댓글

Designed by Tistory.