Programação Concorrente em Java Prof. Carlos Bazilio Criando Threads ● Existem 2 formas básicas de criarmos threads em Java: – Criando uma classe que estende a classe java.lang.Thread – Criando uma classe que implementa a interface java.lang.Runnable Criando Threads class ThinkParallel1 implements Runnable { ... } class ThinkParallel2 extends Thread { ... } Thread t = new Thread(new ThinkParallel2()); t.start(); Criando Threads ● ● ● Em ambas opções devemos dispor um método chamado run() que inicia a execução da thread Variáveis podem ser passadas às threads através do construtor Membros declarados nas classes são privados de cada thread Criando Threads class ThinkParallel1 implements Runnable { int id; public ThinkParallel1(int pId) { this.id = pId; } public void run() { System.out.println("Thread número: " + id); } } Thread t = new Thread(new ThinkParallel1(10)); t.start(); Ciclo de Vida Nova Thread start() notifyAll(), notify() ou fim sleep() dispatch Pronto Execução yield/runout Fim I/O I/O Bloqueado Fim da execução wait() Suspenso sleep(long) Finalizado Atomicidade, Sincronização de Memória e termo volatile ● A JVM demanda que leituras e escritas de todos os tipos primitivos (exceto long e double) sejam atômicas – ● ● P. ex., teste = true; é sempre atômica Além do controle de leitura/escrita, outra questão a ser abordada é a visualização das atualizações Em Java esta situação é resolvida declarando o recurso como volatile Atomicidade, Sincronização de Memória e termo volatile ● ● Os tipos long e double têm suporte no pacote java.util.concurrent.atomic Por exemplo, temos uma classe AtomicLong para manipulação segura de valores long Blocos Síncronos ● ● ● Recurso em Java para tratar de situações onde possam ocorrer condição de corrida (race condition) Um bloco síncrono sempre está associado a um objeto Isto é possível pois todo objeto contém implicitamente um lock synchronized(object_ref) { ... corpo do bloco ... } Blocos Síncronos class ThinkParallel1 implements Runnable { int id; static int count; public ThinkParallel1(int pId) { this.id = pId; } } public void run() { System.out.println("Thread número: " + id); synchronized(ThinkParallel1.class){ count++; } } Blocos Síncronos ● Implementação alternativa poderia utilizar as classes disponíveis no pacote java.util.concurrent.atomic Blocos Síncronos class ThinkParallel1 implements Runnable { int id; static int count; public ThinkParallel1(int pId) { this.id = pId; } } public void run() { System.out.println("Thread número: " + id); synchronized(this){ // Erro !!! count++; } } Blocos Síncronos ● ● A versão com erro tem o problema de realizar lock às instâncias individualmente Uma alternativa para este uso do synchronize é quando um método inteiro deve ser controlado public synchronized static void atualiza() { count++; } // Equivale a ... public static void atualiza() { synchronized(this){ count++; } } wait / notify / notifyAll ● ● ● ● ● Métodos esperar e sinalizar a disponibilização de um recurso São definidos na classe Object notify e notifyAll só podem ser chamados de métodos synchronized Se + de 1 thread está aguardando um recurso, a escolha da thread a ser acordada é arbitrária Se não existirem threads aguardando, as instruções notify são descartadas wait / notify / notifyAll Exemplo public class Message { private String msg; public Message(String str){ this.msg=str; } public String getMsg() { return msg; } public void setMsg(String str) { this.msg=str; } } wait / notify / notifyAll Exemplo public class Waiter implements Runnable{ private Message msg; public Waiter(Message m){ this.msg=m; } public void run() { String name = Thread.currentThread().getName(); synchronized (msg) { try{ System.out.println(name+" waiting to get notified at time:"+System.currentTimeMillis()); msg.wait(); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println(name+" waiter thread got notified at time:"+System.currentTimeMillis()); //process the message now System.out.println(name+" processed: "+msg.getMsg()); } } } wait / notify / notifyAll Exemplo public class Notifier implements Runnable { private Message msg; public Notifier(Message msg) { this.msg = msg; } public void run() { String name = Thread.currentThread().getName(); System.out.println(name+" started"); try { Thread.sleep(1000); synchronized (msg) { msg.setMsg(name+" Notifier work done"); msg.notify(); // msg.notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } } wait / notify / notifyAll Exemplo public class WaitNotifyTest { public static void main(String[] args) { Message msg = new Message("process it"); Waiter waiter = new Waiter(msg); new Thread(waiter,"waiter").start(); Waiter waiter1 = new Waiter(msg); new Thread(waiter1, "waiter1").start(); Notifier notifier = new Notifier(msg); new Thread(notifier, "notifier").start(); System.out.println("All the threads are started"); } } wait / notify / notifyAll Exemplo waiter waiting to get notified at time:1383760734647 waiter1 waiting to get notified at time:1383760734648 All the threads are started notifier started waiter waiter thread got notified at time:1383760735649 waiter processed: notifier Notifier work done waiter1 waiting to get notified at time:1383760546239 waiter waiting to get notified at time:1383760546241 notifier started All the threads are started waiter waiter thread got notified at time:1383760547241 waiter processed: notifier Notifier work done waiter1 waiter thread got notified at time:1383760547242 waiter1 processed: notifier Notifier work done Pacote java.util.concurrent ● ● ● ● Contém classes utilitárias para programação concorrente Estas classes implementam funcionalidades úteis na programação concorrente, mas difíceis ou tediosas de se programar Principais componentes: Executors, Queues, Timing, Synchronizers, Concurrent Collections, Memory Consistency Properties http://docs.oracle.com/javase/7/docs/api/java/uti l/concurrent/package-summary.html Executors ● Possuem classes (interfaces e de implementação) voltadas para a gerência de threads – Pool de threads – IO assíncrono – Framework para manipulação de tarefas (tasks) Pool de Threads public class WorkerThread implements Runnable { private int workerNumber; WorkerThread(int number) { workerNumber = number; } public void run() { for (int i=0;i<=100;i+=20) { // Perform some work ... System.out.println("Worker number: " + workerNumber + ", percent complete: " + i ); try { Thread.sleep((int)(Math.random() * 1000)); } catch (InterruptedException e) { } } } } Pool de Threads import java.util.concurrent.*; public class ThreadPoolTest { public static void main(String[] args) { int numWorkers = 8; int threadPoolSize = 4; ExecutorService tpes = Executors.newFixedThreadPool(threadPoolSize); WorkerThread[] workers = new WorkerThread[numWorkers]; for (int i = 0; i < numWorkers; i++) { workers[i] = new WorkerThread(i); tpes.execute(workers[i]); } tpes.shutdown(); } } Pool de Threads Exemplo: Crawler para varrer Web public class Tarefa { String url; int level; public Tarefa(String url, int level) { this.url = url; this.level = level; } public String getUrl() { return url; } public int getLevel() { return level; } } Pool de Threads Exemplo: Crawler para varrer Web import java.util.concurrent.*; public class ThreadPoolTest { public static void main(String[] args) { BlockingQueue<Tarefa> urls = new LinkedBlockingQueue<Tarefa>(); urls.add(new Tarefa("http://www.google.com.br", 0)); int threadPoolSize = 16; ExecutorService pool = Executors.newFixedThreadPool(threadPoolSize); } } Tarefa t = null; WorkerCrawler worker = null; int n_tasks = 0; long tempoLimite = 10L; try { while ((n_tasks < 100) && ((t = urls.poll(tempoLimite, TimeUnit.SECONDS)) != null)) { worker = new WorkerCrawler(n_tasks, t, urls); pool.execute(worker); n_tasks++; } } catch (InterruptedException e) { System.out.println("Interrupção na espera por urls (" + tempoLimite + TimeUnit.SECONDS.name()); } pool.shutdown(); import import import import java.io.*; java.net.*; java.util.concurrent.BlockingQueue; java.util.regex.*; public class WorkerCrawler implements Runnable { private int taskNumber; private Tarefa urltoCrawl; BlockingQueue<Tarefa> sharedUrls; WorkerCrawler(int number, Tarefa t, BlockingQueue<Tarefa> urls) { taskNumber = number; urltoCrawl = t; sharedUrls = urls; } public void run() { URL url; System.out.println("Thread id: " + Thread.currentThread().getId() + " Task: " + taskNumber + " Nivel: " + urltoCrawl.getLevel() + " Url: " + urltoCrawl.getUrl()); try { url = new URL(urltoCrawl.getUrl()); URLConnection conexao = url.openConnection(); InputStream stream = conexao.getInputStream(); BufferedReader in = new BufferedReader( new InputStreamReader(stream)); String input = ""; String inputLine; while ((inputLine = in.readLine()) != null) input = input + inputLine; in.close(); // Find links of the form: http://xxx.yyy.zzz String regexp = "http://(\\w+\\.)*(\\w+)"; Pattern pattern = Pattern.compile(regexp); Matcher matcher = pattern.matcher(input); // find all matches while (matcher.find()) { String match = matcher.group(); sharedUrls.add(new Tarefa(match, urltoCrawl.getLevel() + 1)); } } catch (MalformedURLException e) { System.out.println("URL mal formada: " + urltoCrawl); } catch (IOException e) { System.out.println("Erro de entrada/saída na abertura da conexão com servidor web ou na obtenção dos dados"); } } } Queues ● Úteis para manipulação consistente de filas ● Destaques: – ConcurrentLinkedQueue: fila FIFO nãobloqueante e thread-safe – BlockingQueue: interface para implementação de filas bloqueantes; pode ter capacidade limitada – BlockingDeque: interface que extende BlockingQueue, permitindo comportamento FIFO e LIFO (pilha) Timing ● ● ● A classe TimeUnit representa uma unidade de tempo É útil por padronizar diferentes operações que possuam timeout Há diversos métodos para conversão entre unidades, como toNanos() e toMillis(), para conversão para nanos e milisegundos, respectivamente Synchronizers ● 5 classes oferecem ferramentas interessantes para formas de sincronização: – Semaphore: implementação clássica – CountDownLatch: implementa bloqueio dado que um número de sinais, eventos e condições ocorram – CyclicBarrier: ponto de sincronização múltipla – Phaser: forma mais flexível de barreira para controle de fases em múltiplas threads – Exchanger: permite 2 threads trocarem objetos num ponto específico da execução Concurrent Collections ● Além de Filas, este pacote oferece outras coleções para serem utilizadas em contextos multithreaded: – ● ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet, CopyOnWriteArrayList, CopyOnWriteArraySet Uma vantagem destas em relação às versões não concorrentes e sincronizadas é escalabilidade, uma vez que as sincronizadas trabalham utilizando lock simples ThreadGroup ● Permite o agrupamento lógico de threads ● Grupos também podem conter subgrupos ThreadGroup grupo = new ThreadGroup("Grupo_teste"); ... Thread novathread = new Thread(grupo, "thr1"); ThreadGroup ● ● A classe java.lang.ThreadGroup possui alguns métodos para manipulação de grupos Dentre esses, podemos citar o método destroy() e o método interrupt()