Tecnologias Java Threads Marcio Seiji Oyamada [email protected] Pós-graduação Especialização em Desenvolvimento de Software e Novas Tecnologias Conteúdo programático • • • • • • • • Apresentação da plataforma Java Threads Construção de ambientes gráficos Acesso a base de dados Sockets RMI Applets Linguagem de script (JavaFX) Threads • Única linha de execução x múltiplas linhas de execução Benefícios • • • • Tempo de resposta Compartilhamento de recursos Economia de recursos Desempenho em arquiteturas multiprocessadas Muitos para um • Várias threads no nível do usuário, mapeadas para uma única thread no nível do kernel • Exemplos: – Solaris Green Threads – GNU Portable Threads Um para um • Cada thread no nível do usuário é mapeada para uma thread no nível do kernel • Exemplos – Windows NT/XP/2000 – Linux – Solaris 9 e versões posteriores Threads em Java • Gerenciada pela JVM – Mapeamento para o sistema operacional depende da implementação da JVM – Java Threads • Classe Thread ou • Interface Runnable Interface Runnable • Método necessário para descrever uma thread public interface java.lang.Runnable { // Methods public abstract void run(); } • Exemplo: class ThreadInterface implements Runnable{ public void run(){ for (int i=0; i <20;i++){ System.out.println("Thread["+Thread.currentThread(). getName()+"]="+i); } } } Classe Thread • Classe principal que representa uma thread em Java • Métodos para gerenciar threads – – – – Obter nome da thread Alterar a prioridade Interromper uma thread Liberar o processador Criando uma thread com a classe Thread public class ThreadClasse extends Thread { public ThreadClasse(){ super(); } public void run(){ for (int i=0; i <20;i++){ System.out.println("Thread["+ Thread.currentThread().getName()+ "]="+i); } } } Executando threads (Interface Runnable) • Utilizando a interface Runnable public class ExecutaThread { public static void main(String args[]) throws InterruptedException{ Thread t1; Thread t2; t1= new Thread(new ThreadInterface()); t2= new Thread(new ThreadInterface()); t1.start(); // inicia a execução da Thread t2.start(); // inicia a execução da Thread System.out.println(“Thread inicializadas”); t1.join(); // Aguarda a thread t1 finalizar t2.join(); // Aguarda a thread t1 finalizar System.out.println(“Thread finalizadas”); } } Executando threads (Classe Thread) • Utilizando a Classe Thread public class ExecutaThread { public static void main(String[] args) throws InterruptedException { // TODO code application logic here ClasseThread t1= new ClasseThread(); ClasseThread t2= new ClasseThread(); t1.start(); t2.start(); System.out.println("Thread inicializadas"); t1.join(); t2.join(); System.out.println("Thread finalizadas"); } } Executors • Gerencia um conjunto de Threads – FixedThreadPool: número fixo de threads – CachedThreadPool: aloca dinamicamente • FixedThreadPool – Evita o overhead com a criação de thread – Maior controle dos recursos utilizados durante a execução do sistema Exemplo Executors (1) import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { // TODO code application logic here ExecutorService executor= Executors.newCachedThreadPool(); for (int i=0; i < 5; i++){ executor.execute(new ThreadInterface()); } System.out.println("Threads executando"); executor.shutdown(); } } Exemplo: Executors (2) public class ThreadInterface implements Runnable{ public void run() { for (int i=0; i <20;i++){ System.out.println("Thread["+Thread.currentThr ead().getName()+"]="+i); } } } Executors • Métodos – Execute(Interface Runnable): submete uma nova interface para ser executada – Shutdown(): previne que outras threads sejam submetidas ao ExecutorService – ShutdownNow(): tentar finalizar a execução das Threads Interrompendo Threads public class MainInterruptTest { public static void main(String[] args) throws InterruptedException { ExecutorService executor= Executors.newCachedThreadPool(); for (int i=0; i < 10; i++) executor.execute(new MyThread()); System.out.println("Sleeping...."); TimeUnit.SECONDS.sleep(10); executor.shutdownNow(); System.out.println("Shutdown solicitado"); } } class MyThread implements Runnable{ public void run() { boolean sair=false; while (!sair){ System.out.println(Thread.currentThread().getName()); if (Thread.currentThread().isInterrupted()){ System.out.println("Interrupção solicitada, finalizando a Thread"+ Thread.currentThread().getName()); sair= true; } } } } Thread.yield() • Para a execução da Thread atual e libera o processador para que uma outra Thread possa executar – Forçar a troca de contexto e conseqüentemente uma melhor distribuição do processador public class ThreadInterface implements Runnable{ public void run() { for (int i=0; i <10;i++){ System.out.println("Thread["+Thread.currentThread(). getName()+"]="+i); Thread.yield(); } } } Threads:Sleep public class SleepingTest implements Runnable { public void run() { try { for (int i=0; i <10; i++) { System.out.print(“Sleep …”+Thread.currentThread().getName()) ; // Old-style: // Thread.sleep(100); // Java SE5/6-style: TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { System.err.println("Interrupted"); } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool() for (int i = 0; i < 5; i++) exec.execute(new SleepingTest()); exec.shutdown(); } } Inicialização de atributos public class SleepingTest implements Runnable{ boolean setYield= false; public SleepingTest (boolean _setYield){ this.setYield= _setYield; } public void run() { for (int i=0; i <10;i++){ System.out.println("Thread["+Thread.currentThread().getNam e()+"]="+i); if (setYield) Thread.yield(); } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool() exec.execute(new SleepingTest(true)); exec.execute(new SleepingTest(true)); exec.execute(new SleepingTest(false)); exec.execute(new SleepingTest(false)); exec.shutdown(); } } Exercícios 1) Qual o comportamento na execução das Threads utilizando o método yield()? 2) Faça uma aplicação multithread onde cada Thread escreva na tela N vezes uma String. O valor N e a String serão passados como parâmetro pelo construtor Threads: Retornando valores • A interface Runnable não define um método para retornar valores – Solução 1: utilizar a interface Callable – Solução 2: retornar os valores em um objeto compartilhado • Problemas de sincronização em dados compartilhados (veremos mais adiante) • Adotemos a solução 1... Interface Callable • Callable: generic possibilitando definir o tipo de retorno • Método call: ponto de entrada para a Thread (ao invés do método run()), retornando o tipo definido no generic Callable Exemplo: Callable(1) public class ThreadCallable implements Callable<int>{ private static Random generator= new Random(); public int call(){ return generator.nextInt(1000); } } Exemplo: Callable(2) import import import import import java.util.concurrent.ExecutionException; java.util.concurrent.ExecutorService; java.util.concurrent.Executors; java.util.concurrent.Future; java.util.ArrayList; public class MainCallable { public static void main(String[] args) { ExecutorService executor= Executors.newCachedThreadPool(); ArrayList<Future<Integer>> resultados= new ArrayList<Future<Integer>>(); for (int i=0; i < 10; i++) resultados.add(executor.submit(new ThreadCallable())); executor.shutdown(); for (int i=0; i< resultados.size(); i++){ try { Future<Integer> result; result = resultados.get(i); System.out.println(result.get()); } catch (InterruptedException ex) { ex.printStackTrace(); } catch (ExecutionException ex) { ex.printStackTrace(); } } } } Exercício • Faça uma aplicação Java multithread para buscar um dado elemento em um vetor (desordenado). Utilize um objeto Random para gerar numeros aleatórios – Cada Thread ficará responsável pela busca em uma parte do vetor – Retorne a posição do elemento no vetor ou –1 caso o elemento não foi encontrado ThreadFactory • Um Executor utiliza padrão de projeto Factory para criar as Threads • O desenvolvedor pode criar sua própria ThreadFactory para criar threads – Definir atributos específicos – Definir prioridades durante a criação de threads – Definir manipuladores de exceção ThreadFactory(1) import java.util.concurrent.*; public class MyThreadFactory implements ThreadFactory{ private static int count; public MyThreadFactory(){ count=0; } public Thread newThread(Runnable r) { Thread t= null; t = new Thread(r); count++; t.setName("MinhaThread[" + count + "]"); return t; } } ThreadFactory(2) Classe MyThread public class MyThread implements Runnable { public void run() { System.out.print(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getId()); } } ---Classe principal import java.util.concurrent.*; public class MainTreadFactory { public static void main(String[] args) { ExecutorService executor= Executors.newFixedThreadPool(10, new MyThreadFactory()); for (int i=0; i < 10; i++) executor.execute(new MyThread()); executor.shutdown(); } } Exercício • No exercício anterior, foi utilizado um gerenciador de Threads com número fixo. Substitua por um gerenciador dinâmico (Executors.newCachedThreadPool(new MyFactoryThread()) – Existe alguma diferença na saída gerada? Tratando “exception” import java.util.concurrent.*; public class ExceptionThread implements Runnable { public void run() { throw new RuntimeException(); } public static void main(String [] args) { ExecutorService exec = Executors.newCachedThreadPool() ; exec.execute(new ExceptionThread()); exec.shutdown(); } } Tratando internamente import java.util.concurrent.*; public class ExceptionThread implements Runnable { public void run() { try { throw new RuntimeException(); catch (Exception ex){ ex.printStackTrace(); } } public static void main(String [] args) { ExecutorService exec = Executors.newCachedThreadPool() ; exec.execute(new ExceptionThread()); exec.shutdown(); } } Tratando “exception”(2) / / : concurrency/NaiveExceptionHandling.java import java.util.concurrent.*; public class NaiveExceptionHandling { public static void main(String[] args) { try { ExecutorService exec= Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()) ; exec.shutdown(); } catch(RuntimeException ue) { // Esse código não executará!! System.out.println("Exception has been handled!"); } } } Registrando um tratador de exceções • Como “pegar” exceções que não são tratadas internamente nas Threads? • Registrar um tratador de exceções durante a criação de uma thread (ThreadFactory) – Interface Thread.UncaughtExceptionHandler – public void uncaughtException(Thread t, Throwable e) Tratador de exceções(1) public class MyThreadFactory implements ThreadFactory{ public Thread newThread(Runnable arg0) { Thread t=new Thread(arg0); t.setUncaughtExceptionHandler(new TrataException()); return t; } } class TrataException implements UncaughtExceptionHandler{ public void uncaughtException(Thread arg0, Throwable arg1) { System.out.println("Tratador da exceção da Thread"); } } Tratador de exceções(2) class MyThread implements Runnable{ public void run(){ throw new RuntimeException(); } } public class MainUncaughtException { public static void main(String[] args) { ExecutorService executor= Executors.newCachedThreadPool(new MyThreadFactory()); for (int i=0; i < 5 ; i++) executor.execute(new MyThread()); } } Sincronização entre threads Acesso a variáveis compartilhadas Sincronização • Programa concorrente – Executado por diversos processos – Acesso concorrente a dados • Paralelismo real x Paralelismo aparente – Multiprocessadores: paralelismo real – Paralelismo “aparente”: concorrência Programas concorrentes • Processos seqüenciais que executam concorrentemente (afetam ou são afetados por outros programas) • Motivação – Aumento de desempenho em máquinas multiprocessadas – Interface com o usuário – Sobreposição de operações de E/S e processamento Problema produtor-consumidor • Seja um buffer compartilhado entre dois processos/threads. O processo produtor coloca dados em um buffer que são retirados pelo processo consumidor • Possível implementação – Uma variável count armazena o número de posições preenchidas no buffer – Inicialmente armazena o valor 0 – Quando o processo produtor coloca um dado no buffer, count é incrementado – Quando o processo consumidor retira o dado do buffer, count é decrementado Produtor while (true) { /* produce an item and put in nextProduced */ while (count == BUFFER_SIZE) ; // do nothing buffer [in] = nextProduced; in = (in + 1) % BUFFER_SIZE; count++; } Consumidor while (true) { while (count == 0) ; // do nothing nextConsumed = buffer[out]; out = (out + 1) % BUFFER_SIZE; count--; /* consume the item in nextConsumed } Condição de corrida • count++ ou count— – register= count – register= count+1 – count= register • Considere a execução intercalada com “count = 5” inicialmente: S0: produtor executa register1 = count {register1 = 5} S1: produtor executa register1 = register1 + 1 {register1 = 6} S2: consumidor executa register2 = count {register2 = 5} S3: consumidor executa register2 = register2 - 1 {register2 = 4} S4: produtor executa count = register1 {count = 6 } S5: consumidor executa count = register2 {count = 4} Problema da seção crítica • Condição de corrida: vários processos/thread manipulam conjuntos de dados onde o resultado depende da ordem de execução • Seção crítica: trecho de código onde somente um processo pode executar por vez Solução • Criar um protocolo que garanta a exclusão mútua – Execução da seção crítica por somente um processo/thread Propriedades da seção crítica • Regra 1: Exclusão mútua • Regra 2: Progressão – Nenhum processo fora da seção crítica pode bloquear um outro processo • Regra 3: Espera limitada – Nenhum processo pode esperar infinitamente para entrar na seção crítica • Regra 4 – Nenhuma consideração sobre o número de processadores ou velocidades relativas Acesso concorrente a dados Classe Conta public class Conta { int saldoPoupanca; int saldoCC; public Conta(int _saldoPoupanca, int _saldoCC){ saldoPoupanca= _saldoPoupanca; saldoCC= _saldoCC; } public void transferePoupanca(int v){ saldoCC -= v; saldoPoupanca +=v; } public void transfereCC(int v){ saldoPoupanca -=v; saldoCC +=v; } public int saldoTotal(){ return (saldoPoupanca + saldoCC); } } Produtor public class Produtor implements Runnable{ private Conta c; private Random r= new Random(); public Produtor(Conta _c){ c= _c; } public void run(){ while (true){ c.transfereCC(r.nextInt(1000)); c.transferePoupanca(r.nextInt(500)); } } } Consumidor public class Consumidor implements Runnable{ private Conta c; private int saldoInicial; public Consumidor(Conta _c, int _saldoInicial){ c= _c; saldoInicial=_saldoInicial; } public void run(){ int saldo; while (true){ saldo= c.saldoTotal(); if (saldo != saldoInicial){ System.out.println("Saldo errado = "+saldo); Thread.currentThread().yield(); } } } } Sincronizando acessos concorrentes • Synchronized – Evita a execução concorrente das threads • Como definir a sincronização – Métodos • public synchronized transfereCC(int v); • public synchronized transferePoupanca(int v); • public synchronized saldoTotal(); • Quando um método é definido como synchronized, ocorre um bloqueio, evitando a execução – No método – Entre métodos synchronized da classe Exercício 1) Altere o código da transferência de conta para que o mesmo funcione corretamente 2) Verifique o funcionamento da aplicação SerialNumberGenerator a) Identifique o problema b) Faça as alterações necessárias Utilizando tipos de dados sincronizados • AtomicInteger, AtomicLong – boolean compareAndSet(expectedValue, updateValue); – addAndGet (int delta) – incrementAndGet() • AtomicReference<V> – boolean compareAndSet(expectedValue, updateValue); – getAndSet(V newValue) Tipos de dados sincronizados • Os tipos Queue (fila) LinkedQueue (lista) no pacote java.util não são sincronizados • Tipos de dados thread-safe são definidos no pacote java.util.concurrent • Interface BlockingQueue() – Classe: ArrayBlockingQueue • void put(E e); • E take(); ArrayBlockingQueue class Main() { // Instanciando private ArrayBlockingQueue<Integer> buffer= new ArrayBlockingQueue<Integer>(5); // buffer de 5 posições } // inserindo elementos no buffer buffer.put(new Integer(5)); //removendo elementos do buffer valor= buffer.take(); Exercício • Implemente duas Threads, uma produtora e uma consumidora – O Produtor deverá gerar 1000 números e colocar no buffer compartilhado para ser consumido pela Thread Consumidor – Utilize a classe BlockingQueue como buffer compartilhado Material complementar Semáforos • • • • • Proposto por Dijkstra(1965) Sincronização que não necessita de espera ativa Semáforo S – variável inteira Duas operações S: acquire() e release() Operações atômicas Semáforo • Binário: varia entre 0 e 1 • Contador: valor inteiro Implementação de semáforos • Deve garantir que dois processos não executem acquire () e release () no mesmo semáforo ao mesmo tempo • A implementação do acquire e release tornase o problema de seção crítica. – Espera ativa • Algumas aplicações podem ficar muito tempo na seção crítica. Implementação de semáforo sem espera ativa • Cada semáforo tem sua fila de espera. Cada posição da fila de espera tem dois campos: – valor (tipo inteiro) – ponteiro para o próximo elemento da lista Duas operações: • block – coloca o processo que esta adquirindo o semáforo na fila apropriada. • wakeup – remove o processo que esta na fila de espera e coloca na fila de prontos. Implementação de semáforo sem espera ativa • Acquire() • Release() Semafóros em Java • Exemplo: import java.util.concurrent.Semaphore; public class ThreadInterface implements Runnable{ Semaphore sem= new Semaphore(1); public void run() { sem.acquire(); for (int i=0; i <10;i++){ System.out.println("Thread["+Thread.currentThread(). getName()+"]="+i); Thread.yield(); } sem.release(); } public static void main(String args[]) { ExecutorService executor= Executors.newCachedThreadPool(); for (int i=0; i < 5; i++) executor.execute(new MyThread()); } } Bloqueios e variáveis de condição • Menor overhead que semáforos e synchronized • Implementação mais eficiente no Java • Variáveis de condição são utilizadas caso seja necessário bloquear a execução no meio da seção crítica Exemplo: Lock import java.util.concurrent.locks.*; public class ThreadInterface implements Runnable{ Lock mutex= new Lock(); public void run() { mutex.lock(); for (int i=0; i <10;i++){ System.out.println("Thread["+Thread.currentThread(). getName()+"]="+i); Thread.yield(); } mutex.unlock(); } public static void main(String args[]) { ExecutorService executor= Executors.newCachedThreadPool(); for (int i=0; i < 5; i++) executor.execute(new MyThread()); } Exemplo: Variáveis de condição class ProducerConsumer { Lock mutex= new Lock(); Condition cond= mutex.newCondition(); static class Produtor extends Runnable{ public void run() { while (true) { mutex.lock(); while (count == BUFFER_SIZE) cond.await(); buffer [in] = nextProduced; in = (in + 1) % BUFFER_SIZE; count++; cond.signal(); } } } static class Consumidor extends Runnable{ public void run() { while (true) { mutex.lock(); while (count == 0) cond.await(); nextConsumed= buffer[out]; out = (out + 1) % BUFFER_SIZE; count--; cond.signal(); } } } .. ..// método Main } Deadlock e starvation • Deadlock – dois ou mais processsos esperam infinitamente por eventos que somente podem ser gerados por processos no estado de espera • Seja S e Q dois semáforos inicializados em 1 P0 P1 S.acquire(); Q.acquire(); Q.acquire(); S.acquire(); . . . . . . S.release(); Q.release(); Q.release(); S.release(); • Starvation – bloqueio indefinido.