Batch Processing Processamento em Lotes no Mundo Corporativo Rodrigo Cândido da Silva @rcandidosilva About Me • JUG Leader do GUJavaSC • http://gujavasc.org • Twitter • @rcandidosilva • Contatos • http://rodrigocandido.me Agenda • Conceitos • Batch Domain Language • Chunk vs. Batchlet • Partitioned Step • Flow, Split e Decision • Listeners e Exceptions • Execution • Integration • Demo Porque Batch? • É muito comum em aplicações • Várias soluções "personalizadas" • Produtos começaram a surgir • Spring Batch • WebSphere Compute Grid • Ideal para sistemas ETL Batch API • Chunk / Batchlet • Implementação de um Step • Contexts • Job e Step at runtime • Persistência de metadados • Listeners • Callback lifecycle events • Partitioning • Processamento paralelo Batch Domain Language • Batch job XML definition • Descreve os steps como um agrupamento de batch artefacts Batch Domain Language <job id="adressJob" version="1.0"> <listeners> <listener ref="MyJobListener"/> </listeners> <step id="buildingData" next="adressStep"> <batchlet ref="GenerateDataBatchlet" /> </step> <step id="adressStep"> <listeners> <listener ref="MyStepListener"/> </listeners> <chunk item-count="10"> <reader ref="adressItemReader" /> <processor ref="adressItemProcessor" /> <writer ref="adressItemWriter" /> </chunk> </step> </job> Chunk vs. Batchlet • Implementam step dentro do job • Chunk • Encapsula padrão ETL • Single Reader, Processor e Writer • Executado por pedaços de dados (chunk) • Chunk output é escrito unitariamente • Batchlet • Promove a execução de um único e simples processo • Executado até o fim produzindo um código de retorno Chunk vs. Batchlet Chunk Batchlet Batchlet @Named public class MyBatchlet { @Process public String process() throws Exception {..} @Stop public void stopMe() throws Exception {..} } public class MyBatchlet implements Batchlet {..} <step id="step1"> <batchlet ref="MyBatchlet"/> </step> Chunk • Step Job <step id="sendStatements"> <chunk reader="accountReader" processor="accountProcessor" writer="emailWriter" item-count="10"/> </step> @Named(“accountReader") ...implements ItemReader... { public Account readItem() { // read account using JPA @Named(“accountProcessor") ...implements ItemProcessor... { public Statement processItems(Account account) { // read Account, return Statement @Named(“emailWriter") ...implements ItemWriter... { public void writeItems(List<Statements> statements) { // use JavaMail to send email Chunk public interface ItemReader<T> { public void open(Externalizable checkpoint); public T readItem(); public Externalizable checkpointInfo(); public void close(); } public interface ItemWriter<T> { public void open(Externalizable checkpoint); public void writeItems(List<T> items); public Externalizable checkpointInfo(); public void close(); } public interface ItemProcessor<T, R> { public R processItem(T item); } Checkpoint • Para tarefas intensivas, longo período de tempo • Checkpoint/restart é bastante utilizado • Basicamente… • Armazena estado do ItemReader, ItemWriter • Método chamados • reader.checkpointInfo() • writer.checkpointInfo() <chunk checkpoint-policy="item" commit-interval="10" item-count="10"> public interface ItemReader<T> { public void open(Externalizable checkpoint); public Externalizable checkpointInfo(); } public interface ItemWriter<T> { public void open(Externalizable checkpoint); public Externalizable checkpointInfo(); } Partitioned Step • Step pode rodar particionado • [N] instâncias do mesmo step em [N] Threads • Uma partição por Thread <step id="step1"> <chunk> <partition> <plan partitions="10" threads="2"/> <reducer /> </partition> </chunk> </step> Partitioned Step • Partition Mapper • Decide dinamicamente o número de partições • Partition Plan • Partition Reducer • Demarca a unidade lógica de trabalho • Partition Collector • Enviar resultados de processamento das partições • Partition Analyzer • Ponto de controle e análise dos resultados enviados Flow, Split e Decision Start End Step I Flow Step III Step IV Chunk Chunk Chunk ItemReader ItemReader ItemReader Step II Task ItemProcess or ItemProcess or ItemWriter ItemWriter Decision ItemProcess or ItemWriter Flow • Define a lista de steps a ser executado (unitário) <flow id="flow-1" next="{flow, step, decision}-id"> <step id="flow_1_step_1"> </step> <step id="flow_1_step_2"> </step> </flow> Split • Define a lista de flows a serem executados (paralelo) • Coletores e analisadores para monitoramento <split > <flow /> <!-- each flow runs on a separate thread --> <flow /> </split> Decision • Possibilita a implementação de workflows Decision @Named public class Decider { public String decide(BatchContext context) throws Exception { String exit = context.getExitStatus(); if (“SUCCESS”.equals(exit)) { return “SKIP”; } <step id="step1"> return exit; <decision id="decision1" ref="Decider"> } <next on="SKIP" to="step3"/> } <next on="*" to="step2"/> </decision> </step> <step id="step2" next="step3"/> <step id="step3"/> Lifecycle STOPPED abandon() restart() STOPPING stop() abandon() start() STARTING STARTED COMPLETED restart() ABANDONED abandon() FAILED Listeners • Step • StepListener, ItemReadListener, ItemProcessListener, ItemWriterListener, ChunkListener, RetryReadListener, RetryProcessListener, RetryWriteListener, SkipReadListener, SkipProcessListener, SkipWriteListener • Job • JobListener @Named public class StepListener { @BatchContext StepContext context; <step id="step1"> <listeners> <listener ref="StepListener"/> </listeners> </step> @BeforeStep public void beforeStep() {..} @AfterStep public void afterStep() {..} } Exceptions <job id="..."> <chunk skip-limit="5" retry-limit="5"> <skippable-exception-classes> <include class="java.lang.Exception"/> <exclude class="java.io.FileNotFoundException"/> </skippable-exception-classes> <retryable-exception-classes> </retryable-exception-classes> <no-rollback-exception-classes> ... </no-rollback-exception-classes> </chunk> </job> JobOperator e Repository • JobOperator • Runtime interface para gerenciamento • start, stop, restart • JobRepository interface commands • JobRepository • Contém informações sobre os jobs • Completos e em execução Execution • JobInstance • Representação lógica de um job runtime • JobExecution • Suporte clustering, segurança, gerenciamento de recursos • StepExecution • Tentativa de rodar um step de um job Integration • Suporte ao Java SE • Application Server Runtime • Suporte clustering, segurança, gerenciamento de recursos • Dependency Injection com CDI • XML descriptors • META-INF/batch-jobs/myJob.xml • Empacotamento • JAR, WAR, EJB Demo • Java EE 7 Samples • Diferentes exemplos de utilização Batch API • https://github.com/javaee-samples/javaee7-samples/tree/master/batch Perguntas ? Referências • https://jcp.org/en/jsr/detail?id=352 • https://java.net/projects/jbatch • http://projects.spring.io/spring-batch/ • http://docs.oracle.com/javaee/7/tutorial/doc/batch-processing.htm • http://www.oracle.com/technetwork/articles/java/batch-1965499.html • https://github.com/javaee-samples/javaee7-samples/ • http://blog.arungupta.me/2014/07/schedule-javaee7-batch-jobs-techtip36/ • http://www.planetjones.co.uk/blog/25-05-2013/introducing-jsr-352-java- batch-ee-7.html Muito obrigado! @rcandidosilva rodrigocandido.me