-
[Spring Batch] ItemReader(4) - 기존서비스Java/Spring Batch 2021. 6. 13. 09:54
기존서비스
대부분의 회사에는 현재 서비스 중인 웹 또는 다른 형태의 자바 애플리케이션이 존재한다. 이들 애플리케이션은 수많은 분석, 설계, 테스트, 버그 수정 과정을 거쳤다. 이런 애플리케이션을 구성하는 코드는 실전에서 테스트됐으며 이상이 없음이 검증됐다.
그럼 배치 처리에서 기존 애플리케이션의 코드를 사용할 수는 없을까? 고객 객체를 읽어들이는 배치 처리 예제를 생각해보자. 다만 지금까지의 예제처럼 Customer 객체의 매핑 대상이 단일 테이블이나 파일이 아니라 여러 데이터베이스의 여러 테이블에 산재돼 있다고 가정한다. 또한 물리적으로 고객 데이터를 지우지 않는다. 그 대신 삭제됐다는 플래그를 기록한다. 웹 기반 애플리케이션에는 이미 고객 데이터를 조회하는 서비스가 존재한다. 배치 처리에서 해당 서비스를 사용하려면 어떻게 해야 할까? 이 절에서는 기존 스프링 서비스를 호출해서 ItemReader에 데이터를 공급하는 방법을 알아본다.
4장에서는 스프링 배치가 태스크릿에서 별도의 기능을 사용할 수 이쎄 하는 어댑터 몇 가지를 살펴봤다. 이런 어댑터 중에 특히 중요한 것들로는 org.springframework.batch.core.step.tasklet.CallableTaskletAdapter, org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter, org.springframework.batch.core.step.tasklet.SystemCommandTasklet이 있다. 이 세개의 어댑터는 다른 엘리먼트를 래핑해서 스프링 배치가 해당 엘리먼트와 통신할 수 있게 하는 데 사용된다. 스프링 배치에서 기존 서비스를 사용할 때도 같은 패턴을 사용한다.
입력 데이터를 읽을 때는 org.springframework.batch.item.adapter.ItemReaderAdapter를 사용한다. RepositoryItemReader가 리포지토리의 참조와 해당 리포지터리에서 호출할 메서드 이름을 전달받는 것과 비슷하게, ItemReaderAdapter는 호출 대상 서비스의 참조와 호출할 메서드 이름을 의존성으로 전달받는다. ItemReaderAdapter를 사용할 때는 다음 두가지를 염두에 두어야 한다.
- 매번 호출할 때마다 반환되는 객체는 ItemReader가 반환하는 객체다. 사용하는 서비스가 Customer 객체 하나를 반환한다면, 이 객체는 ItemProcessor로 전달되며 마지막에는 ItemWriter로 전달된다. 서비스가 Customer 객체의 컬렉션을 반화한다면, 이 컬렉션이 단일 아이템으로 ItemProcessor와 ItemWriter로 전달되므로 개발자가 직접 컬렉션 내 객체를 하나씩 꺼내면서 처리해야 한다.
- 입력 데이터를 모두 처리하면 서비스 메서드는 반드시 null을 반환해야 한다. 이는 스프링 배치에게 해당 스텝의 입력을 모두 소비했음을 나타난다.
이번 예제에서는 입력 데이터 목록이 모두 처리될 때까지 Customer 객체를 반환하도록 하드코딩된 서비스를 사용한다. 목록에 담긴 객체를 모두 소비하면 다음 호출부터는 매번 null을 반환한다. 예제 7-64의 CusomerService는 예제에서 사용할 수 있도록 Customer 객체의 목록을 무작위로 생성한다.
[ 예제 7-64 CustomerService ]
... @Component public class CustomerService{ private List<Customer> customers; private int curIndex; private String[] firstNames = {"Michael", "Warren", "Ann", "Terrence", "Erica", "Laura", "Steve", "Larry"}; private String middleInitial = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; private String[] lastNames = {"Gates", "Darrow", "Donnelly", "Jobs", "Bufett", "Ellision", "Obama"}; private String[] streets = {"4th Street", "Wall Street", "Fifth Avenue", "Mt. Lee Drive", "Jeopardy Lane", "Infinite Loop Drive", "Farnam Street", "Isabella Ave", "S. Greenwood Ave"}; private String[] cities = {"Chicago", "New York", "Hollywood", "Aurora", "Omaha", "Aherton"}; private String[] states = {"IL", "NY", "CA", "NE"}; private Random generator = new Random(); public CustomerService(){ curIndex = 0; customers = new ArrayList<>(); for(int i=0; i<100; i++){ customers.add(buildCustomer()); } } private Customer buildCustomer(){ Custsomer customer = new Customer(); customer.setId((long) generator.nextInt(Integer.MAX_VALUE)); customer.setFirstName(firstNames[generator.nextInt(firstNames.length - 1)]); customer.setMiddleInitial(String.valueOf(middleInitial.charAt( generator.nextInt(middleInitial.length() - 1)))); customer.setLastName(lastNames[generator.nextInt(lastNames.length - 1)]); customer.setAddress(generator.nextInt(9999) + " " + strrets[generator.nextInt(streets.length - 1)]); customer.setCity(cities[generator.nextInt(cities.length - 1)]); customer.setStates(states[generator.nextInt(states.length - 1)]); customer.setZipCode(String.valueOf(generator.nextInt(99999))); return customer; } public Customer getCustomer(){ Customer cust = null; if(curIndex < customers.size()){ cust = customers.get(curIndex); curIndex++; } return cust; } }
[ 예제 7-65 CustomerService를 호출하는 ItemReaderAdapfter의 구성 ]
... @Bean public ItemReaderAdapter<Customer> customerItemReader(CustomerService customerService){ ItemReaderAdapter<Customer> adapter = new ItemReaderAdapter<>(); adapter.setTargetObject(customerService); adapter.setTargetMethod("getCustomer"); return adpter; } ...
커스텀 입력
스프링 배치는 자바 애플리케이션에서 일반적으로 사용하는 거의 모든 타입의 리더를 제공한다. 그러나 스프링 배치가 제공하는 ItemReade처리가 가능한 입력 형식을 사용할 때라도 커스텀 ItemReader를 만들어야 할 때도 있다. ItemReader 인터페이스의 read 메소드를 구현하는 것은 쉬운 작업이다. 그러나 개발한 리더(사람을 말하는 것이 아님)를 재시작할 수 있게 하려면 어떻게 해야할까? 매번 잡을 실행할 때마다 상태를 어떻게 해야 할까? 매번 잡을 실행할 때마다 상태를 어떻게 유지할까? 이 절에서는 잡의 여러 실행에 걸쳐 상태를 관리할 수 있는 ItemReader 구현 방법을 살펴본다.
앞서 언급했듯이 스프링의 ItemReader 인터페이스를 구현하는 것은 매우 간단하다. 사실 조금만 고치면 앞 절에서 사용한 CustomerService를 ItemReader로 변환할 수 있다. 변환에 필요한 작업은 ItemReader 인터페이스를 구현하고 getCustomer() 메소드를 read()로 바꾸는 것이 전부다.
[ 예제 7-66 CustomerItemReader ]
... @Component public class CustomerItemReader implements ItemReader<Customer>{ private List<Customer> customers; private int curIndex; private String[] firstNames = {"Michael", "Warren", "Ann", "Terrence", "Erica", "Laura", "Steve", "Larry"}; private String middleInitial = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; private String[] lastNames = {"Gates", "Darrow", "Donnelly", "Jobs", "Bufett", "Ellision", "Obama"}; private String[] streets = {"4th Street", "Wall Street", "Fifth Avenue", "Mt. Lee Drive", "Jeopardy Lane", "Infinite Loop Drive", "Farnam Street", "Isabella Ave", "S. Greenwood Ave"}; private String[] cities = {"Chicago", "New York", "Hollywood", "Aurora", "Omaha", "Aherton"}; private String[] states = {"IL", "NY", "CA", "NE"}; private Random generator = new Random(); public CustomItemReader(){ curIndex = 0; customers = new ArrayList<>(); for(int i=0; i<100; i++){ customers.add(buildCustomer()); } } private Customer buildCustomer(){ Custsomer customer = new Customer(); customer.setId((long) generator.nextInt(Integer.MAX_VALUE)); customer.setFirstName(firstNames[generator.nextInt(firstNames.length - 1)]); customer.setMiddleInitial(String.valueOf(middleInitial.charAt( generator.nextInt(middleInitial.length() - 1)))); customer.setLastName(lastNames[generator.nextInt(lastNames.length - 1)]); customer.setAddress(generator.nextInt(9999) + " " + strrets[generator.nextInt(streets.length - 1)]); customer.setCity(cities[generator.nextInt(cities.length - 1)]); customer.setStates(states[generator.nextInt(states.length - 1)]); customer.setZipCode(String.valueOf(generator.nextInt(99999))); return customer; } @Override public Custsomer read(){ Customer cust = null; if(curIndex < customers.size()){ cust = customers.get(curIndex); curIndex++; } return cust; } }
매번 실행할 때마다 예제 7-66의 CustomerItemReader가 완전히 새로운 데이터 목록을 만든다는 사실은 중요하지 않으므로 무시해도 되겠지만, 해당 CustomerItemReader는 잡을 실행할 때마다 목록의 처음부터 재시작한다는 점을 기억하기 바란다. 이처럼 처음부터 재시작 하는 것이 일반적으로 필요한 기능이겠지만, 꼭 그런 것만은 아니다.
레코드 백만 개 중에 50만 개를 처리한 뒤 에러가 발생했을 때는 에러가 발생한 청크부터 다시 시작하기를 원할 것이다.
스프링 배치가 JobRepository에 리더의 상태를 저장해서 이전에 종료된 지점부터 리더를 다시 시작할 수 있게 하려면 추가로 ItemStreaqm 인터페이스를 구현해야 한다. 예제 7-67처럼 ItemStream 인터페이스는 open, update, close 같은 세 개의 메서드로 구성된다.
[ 예제 7-67 ItemStream 인터페이스 ]
package org.springframework.baatch.item; public interface ItemStream{ void open(ExecutionContext executionContext) throws ItemStreamException; void update(ExecutionContext executionContext) throws ItemStreamException; void close() thorws ItemStreamException; }
ItemStream 인터페이스의 세 메서드는 각 스텝을 실행하는 도중에 스프링 배치가 개별로 호출한다.
- open 메서드는 ItemReader에서 필요한 상태를 초기화하려고 호출한다. 이 초기화는 잡을 재시작할 때 이전 상태를 복원하는 것 외에도 특정파일을 열거나 데이터베이스에 연결하는 것을 포함한다. 예를 들면 open 메서드는 처리된 레코드의 개수를 가져오는 데 사용할 수 있으므로, 다시 잡을 실행할 때는 해당 레코드 숫자만큼의 레코드를 건너뛸 수 있을 것이다.
- update 메서드는 스프링 배치가 잡의 상태를 갱신하는 처리에 사용한다. 얼마나 많은 레코드나 청크가 처리됐는지를 기록하는 데 update 메서드를 사용한다.
- close 메서드는 파일을 닫는 것 처럼 리소스를 닫는 데 사용한다.
- open 메서드와 update 메서드 내에서는 ExecutionContext(https://jooy-p.tistory.com/30)에 접근할 수 있다는 것을 알아챘을 것이다. ExecutionContext는 ItemReader 구현체에서는 접근할 수 없었다. 이 ExecutionContext에 대한 참조는 잡이 재시작 됐을 때 스프링 배치가 open 메서드에게 리더의 이전 상태를 알려주는 용도로 사용한다. 또한 각 아이템이 처리됨에 따라 update 메서드에게 리더의 현재상태(현재 처리 중인지)를 알려주는 데 사용된다.
- 마지막으로 close 메서드는 ItemReader에서 사용된 모든 리소스를 정리하는데 사용된다.
현재 개발 중인 ItemReader에서 어떻게 ItemReader을 사용해야 하는지 궁금할 수도 있겠다. ItemStream에는 read 메서드가 없기 때문이다. 답은 간단하다. 개발할 필요가 없다. 대신 org.springframework.batch.item.ItemStreamSupport라는 유틸리티 클래스를 상속할 것이다. ItemStreamSupport는 컴포넌트의 이름으로 고유 킼를 생성하는 getExecutiontextKey라는 유틸리티 메서드를 제공할 뿐만 아니라 ItemStream 인터페이스도 구현한다. 예제 7-68은 ItemStreamSupport 인터페이스를 구현하도록 변경한 CustomerItemReeader의 코드를 나타낸다.
[ 예제 7-68 ItemStreamSupport 인터페이스를 구현한 CustomerItemReader 구현체
... @Component public class CustomerItemReader implements ItemReader<Customer>{ private List<Customer> customers; private int curIndex; dprivate String INDEX_KEY="current.index.customers"; //추가 private String[] firstNames = {"Michael", "Warren", "Ann", "Terrence", "Erica", "Laura", "Steve", "Larry"}; private String middleInitial = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; private String[] lastNames = {"Gates", "Darrow", "Donnelly", "Jobs", "Bufett", "Ellision", "Obama"}; private String[] streets = {"4th Street", "Wall Street", "Fifth Avenue", "Mt. Lee Drive", "Jeopardy Lane", "Infinite Loop Drive", "Farnam Street", "Isabella Ave", "S. Greenwood Ave"}; private String[] cities = {"Chicago", "New York", "Hollywood", "Aurora", "Omaha", "Aherton"}; private String[] states = {"IL", "NY", "CA", "NE"}; private Random generator = new Random(); public CustomItemReader(){ curIndex = 0; customers = new ArrayList<>(); for(int i=0; i<100; i++){ customers.add(buildCustomer()); } } private Customer buildCustomer(){ Custsomer customer = new Customer(); customer.setId((long) generator.nextInt(Integer.MAX_VALUE)); customer.setFirstName(firstNames[generator.nextInt(firstNames.length - 1)]); customer.setMiddleInitial(String.valueOf(middleInitial.charAt( generator.nextInt(middleInitial.length() - 1)))); customer.setLastName(lastNames[generator.nextInt(lastNames.length - 1)]); customer.setAddress(generator.nextInt(9999) + " " + strrets[generator.nextInt(streets.length - 1)]); customer.setCity(cities[generator.nextInt(cities.length - 1)]); customer.setStates(states[generator.nextInt(states.length - 1)]); customer.setZipCode(String.valueOf(generator.nextInt(99999))); return customer; } @Override public Custsomer read(){ Customer cust = null; if(curIndex < customers.size()){ cust = customers.get(curIndex); curIndex++; } return cust; } //추가 public void close() throws ItemStreamException{} //추가 public void open(ExecutionContext executionContext) throws ItemStreamException{ if(executionContext.containsKey(getExecutionContextKey(INDEX_KEY))){ int index = executionContext.getInt(getExecutionContextKey(INDEX_KEY)); if(index == 50) curIndex = 51; else curIndex = index; }else { curIndex = 0; } } //추가 public void update(ExecutionContext executionContext) throws ItemStreamException{ executionContext.putInt(getExecutionContextKey(INDEX_KEY), curIndex); } }
먼저 클래스가 ItemStreamSuport 인터페이스를 구현하게 변경했다. 그 뒤 close, open, upate 메서드를 추가했다. update 메서드에서는 현재 처리 중인 레코드를 나타내는 키-값 쌍을 추가했다. open 메서드는 update 메서드에서 값을 설정했는지 여부를 체크한다. 값이 설정돼 있으면 잡을 재시작한 것을 의미한다. run 메서드에서는 50번째 Customer 객체를 처리한 뒤에는 RuntimeException을 던지는 코드를 추가해 잡을 강제로 종료했다. open 메서드로 돌아가 open 메서드 내에서는 복원하려는 인덱스가 50이면 이는 조금 전에 run 메서드 내에 추가한 예외 코드 때문에 발생한 것이므로 해당 레코드를 건너뛰게 한다. 그렇지 않으면 처리를 재시도한다. ExecutionContext에서 사용되는 키에 대한 참조가 ItemStreamSupport가 제공하는 getExecutionContextKey 메서드를 사용해서 전달된다는 점을 알아챘을 것이다.
추가로 하나 더 해야 하는 작업은 새 ItemReader 구현체를 구성하는 것이다. 예제에서 작성한 ItemReader는 별도의 의존성이 없으므로 이름만 제대로 지정해서 빈을 정의하기만 하면 된다.
[ 예제 7-69 CustomerItemReader ]
... @Bean public CustomerItemReader customerItemReader(){ CustomerItemReader customerItemReader = new CustomerItemReader(); customerItemReader.setName("customerItemReader"); return customerItemReader; }
전체 소스
AcornPublishing/definitive-spring-batch
스프링 배치 완벽 가이드 2/e. Contribute to AcornPublishing/definitive-spring-batch development by creating an account on GitHub.
github.com
스프링 배치 완벽 가이드, 마이클 미넬라
'Java > Spring Batch' 카테고리의 다른 글
[Spring Batch] 에러처리(레코드 건너뛰기, 잘못된 레코드 로그 남기기, 입력이 없을 때의 처리) (0) 2021.08.10 [Spring Batch] ItemReader(3) - 저장프로시저(SP, Stored Procedure) (0) 2021.05.31 [Spring Batch] ItemReader(2) - JPA (0) 2021.05.30 [Spring Batch] ItemReader(1) - JDBC (0) 2021.05.30 [Spring Batch] Step (0) 2021.05.15