Programação Concorrente em Java

Propaganda
Programação Concorrente em Java
Prof. Carlos Bazilio
Criando Threads
●
Existem 2 formas básicas de criarmos threads
em Java:
–
Criando uma classe que estende a classe
java.lang.Thread
–
Criando uma classe que implementa a interface
java.lang.Runnable
Criando Threads
class ThinkParallel1 implements Runnable {
...
}
class ThinkParallel2 extends Thread {
...
}
Thread t = new Thread(new ThinkParallel2());
t.start();
Criando Threads
●
●
●
Em ambas opções devemos dispor um método
chamado run() que inicia a execução da thread
Variáveis podem ser passadas às threads
através do construtor
Membros declarados nas classes são privados
de cada thread
Criando Threads
class ThinkParallel1 implements Runnable {
int id;
public ThinkParallel1(int pId) {
this.id = pId;
}
public void run() {
System.out.println("Thread número: " + id);
}
}
Thread t = new Thread(new ThinkParallel1(10));
t.start();
Ciclo de Vida
Nova Thread
start()
notifyAll(),
notify() ou
fim sleep()
dispatch
Pronto
Execução
yield/runout
Fim I/O
I/O
Bloqueado
Fim da execução
wait()
Suspenso
sleep(long)
Finalizado
Atomicidade, Sincronização de
Memória e termo volatile
●
A JVM demanda que leituras e escritas de
todos os tipos primitivos (exceto long e double)
sejam atômicas
–
●
●
P. ex., teste = true; é sempre atômica
Além do controle de leitura/escrita, outra
questão a ser abordada é a visualização das
atualizações
Em Java esta situação é resolvida declarando o
recurso como volatile
Atomicidade, Sincronização de
Memória e termo volatile
●
●
Os tipos long e double têm suporte no pacote
java.util.concurrent.atomic
Por exemplo, temos uma classe AtomicLong
para manipulação segura de valores long
Blocos Síncronos
●
●
●
Recurso em Java para tratar de situações onde
possam ocorrer condição de corrida (race
condition)
Um bloco síncrono sempre está associado a
um objeto
Isto é possível pois todo objeto contém
implicitamente um lock
synchronized(object_ref) {
...
corpo do bloco
...
}
Blocos Síncronos
class ThinkParallel1 implements Runnable {
int id;
static int count;
public ThinkParallel1(int pId) {
this.id = pId;
}
}
public void run() {
System.out.println("Thread número: " + id);
synchronized(ThinkParallel1.class){
count++;
}
}
Blocos Síncronos
●
Implementação alternativa poderia utilizar as
classes disponíveis no pacote
java.util.concurrent.atomic
Blocos Síncronos
class ThinkParallel1 implements Runnable {
int id;
static int count;
public ThinkParallel1(int pId) {
this.id = pId;
}
}
public void run() {
System.out.println("Thread número: " + id);
synchronized(this){ // Erro !!!
count++;
}
}
Blocos Síncronos
●
●
A versão com erro tem o problema de realizar
lock às instâncias individualmente
Uma alternativa para este uso do synchronize é
quando um método inteiro deve ser controlado
public synchronized static void atualiza() {
count++;
}
// Equivale a ...
public static void atualiza() {
synchronized(this){
count++;
}
}
wait / notify / notifyAll
●
●
●
●
●
Métodos esperar e sinalizar a disponibilização
de um recurso
São definidos na classe Object
notify e notifyAll só podem ser chamados de
métodos synchronized
Se + de 1 thread está aguardando um recurso,
a escolha da thread a ser acordada é arbitrária
Se não existirem threads aguardando, as
instruções notify são descartadas
wait / notify / notifyAll
Exemplo
public class Message {
private String msg;
public Message(String str){
this.msg=str;
}
public String getMsg() {
return msg;
}
public void setMsg(String str) {
this.msg=str;
}
}
wait / notify / notifyAll
Exemplo
public class Waiter implements Runnable{
private Message msg;
public Waiter(Message m){
this.msg=m;
}
public void run() {
String name = Thread.currentThread().getName();
synchronized (msg) {
try{
System.out.println(name+" waiting to get notified at time:"+System.currentTimeMillis());
msg.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println(name+" waiter thread got notified at time:"+System.currentTimeMillis());
//process the message now
System.out.println(name+" processed: "+msg.getMsg());
}
}
}
wait / notify / notifyAll
Exemplo
public class Notifier implements Runnable {
private Message msg;
public Notifier(Message msg) {
this.msg = msg;
}
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name+" started");
try {
Thread.sleep(1000);
synchronized (msg) {
msg.setMsg(name+" Notifier work done");
msg.notify();
// msg.notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
wait / notify / notifyAll
Exemplo
public class WaitNotifyTest {
public static void main(String[] args) {
Message msg = new Message("process it");
Waiter waiter = new Waiter(msg);
new Thread(waiter,"waiter").start();
Waiter waiter1 = new Waiter(msg);
new Thread(waiter1, "waiter1").start();
Notifier notifier = new Notifier(msg);
new Thread(notifier, "notifier").start();
System.out.println("All the threads are started");
}
}
wait / notify / notifyAll
Exemplo
waiter waiting to get notified at time:1383760734647
waiter1 waiting to get notified at time:1383760734648
All the threads are started
notifier started
waiter waiter thread got notified at time:1383760735649
waiter processed: notifier Notifier work done
waiter1 waiting to get notified at time:1383760546239
waiter waiting to get notified at time:1383760546241
notifier started
All the threads are started
waiter waiter thread got notified at time:1383760547241
waiter processed: notifier Notifier work done
waiter1 waiter thread got notified at time:1383760547242
waiter1 processed: notifier Notifier work done
Pacote java.util.concurrent
●
●
●
●
Contém classes utilitárias para programação
concorrente
Estas classes implementam funcionalidades
úteis na programação concorrente, mas difíceis
ou tediosas de se programar
Principais componentes: Executors, Queues,
Timing, Synchronizers, Concurrent Collections,
Memory Consistency Properties
http://docs.oracle.com/javase/7/docs/api/java/uti
l/concurrent/package-summary.html
Executors
●
Possuem classes (interfaces e de
implementação) voltadas para a gerência de
threads
–
Pool de threads
–
IO assíncrono
–
Framework para manipulação de tarefas (tasks)
Pool de Threads
public class WorkerThread implements Runnable {
private int workerNumber;
WorkerThread(int number) {
workerNumber = number;
}
public void run() {
for (int i=0;i<=100;i+=20) {
// Perform some work ...
System.out.println("Worker number: " + workerNumber
+ ", percent complete: " + i );
try {
Thread.sleep((int)(Math.random() * 1000));
} catch (InterruptedException e) {
}
}
}
}
Pool de Threads
import java.util.concurrent.*;
public class ThreadPoolTest {
public static void main(String[] args) {
int numWorkers = 8;
int threadPoolSize = 4;
ExecutorService tpes =
Executors.newFixedThreadPool(threadPoolSize);
WorkerThread[] workers = new WorkerThread[numWorkers];
for (int i = 0; i < numWorkers; i++) {
workers[i] = new WorkerThread(i);
tpes.execute(workers[i]);
}
tpes.shutdown();
}
}
Pool de Threads
Exemplo: Crawler para varrer Web
public class Tarefa {
String url;
int level;
public Tarefa(String url, int level) {
this.url = url;
this.level = level;
}
public String getUrl() {
return url;
}
public int getLevel() {
return level;
}
}
Pool de Threads
Exemplo: Crawler para varrer Web
import java.util.concurrent.*;
public class ThreadPoolTest {
public static void main(String[] args) {
BlockingQueue<Tarefa> urls = new LinkedBlockingQueue<Tarefa>();
urls.add(new Tarefa("http://www.google.com.br", 0));
int threadPoolSize = 16;
ExecutorService pool =
Executors.newFixedThreadPool(threadPoolSize);
}
}
Tarefa t = null;
WorkerCrawler worker = null;
int n_tasks = 0;
long tempoLimite = 10L;
try {
while ((n_tasks < 100) && ((t = urls.poll(tempoLimite, TimeUnit.SECONDS)) != null)) {
worker = new WorkerCrawler(n_tasks, t, urls);
pool.execute(worker);
n_tasks++;
}
} catch (InterruptedException e) {
System.out.println("Interrupção na espera por urls (" + tempoLimite + TimeUnit.SECONDS.name());
}
pool.shutdown();
import
import
import
import
java.io.*;
java.net.*;
java.util.concurrent.BlockingQueue;
java.util.regex.*;
public class WorkerCrawler implements Runnable {
private int taskNumber;
private Tarefa urltoCrawl;
BlockingQueue<Tarefa> sharedUrls;
WorkerCrawler(int number, Tarefa t, BlockingQueue<Tarefa> urls) {
taskNumber = number;
urltoCrawl = t;
sharedUrls = urls;
}
public void run() {
URL url;
System.out.println("Thread id: " + Thread.currentThread().getId() + " Task: " +
taskNumber + " Nivel: " + urltoCrawl.getLevel() + " Url: " + urltoCrawl.getUrl());
try {
url = new URL(urltoCrawl.getUrl());
URLConnection conexao = url.openConnection();
InputStream stream = conexao.getInputStream();
BufferedReader in =
new BufferedReader(
new InputStreamReader(stream));
String input = "";
String inputLine;
while ((inputLine = in.readLine()) != null)
input = input + inputLine;
in.close();
// Find links of the form: http://xxx.yyy.zzz
String regexp = "http://(\\w+\\.)*(\\w+)";
Pattern pattern = Pattern.compile(regexp);
Matcher matcher = pattern.matcher(input);
// find all matches
while (matcher.find()) {
String match = matcher.group();
sharedUrls.add(new Tarefa(match, urltoCrawl.getLevel() + 1));
}
} catch (MalformedURLException e) {
System.out.println("URL mal formada: " + urltoCrawl);
} catch (IOException e) {
System.out.println("Erro de entrada/saída na abertura da conexão com
servidor web ou na obtenção dos dados");
}
}
}
Queues
●
Úteis para manipulação consistente de filas
●
Destaques:
–
ConcurrentLinkedQueue: fila FIFO nãobloqueante e thread-safe
–
BlockingQueue: interface para implementação
de filas bloqueantes; pode ter capacidade
limitada
–
BlockingDeque: interface que extende
BlockingQueue, permitindo comportamento
FIFO e LIFO (pilha)
Timing
●
●
●
A classe TimeUnit representa uma unidade de
tempo
É útil por padronizar diferentes operações que
possuam timeout
Há diversos métodos para conversão entre
unidades, como toNanos() e toMillis(), para
conversão para nanos e milisegundos,
respectivamente
Synchronizers
●
5 classes oferecem ferramentas interessantes
para formas de sincronização:
–
Semaphore: implementação clássica
–
CountDownLatch: implementa bloqueio dado
que um número de sinais, eventos e
condições ocorram
–
CyclicBarrier: ponto de sincronização múltipla
–
Phaser: forma mais flexível de barreira para
controle de fases em múltiplas threads
–
Exchanger: permite 2 threads trocarem objetos
num ponto específico da execução
Concurrent Collections
●
Além de Filas, este pacote oferece outras
coleções para serem utilizadas em contextos
multithreaded:
–
●
ConcurrentHashMap, ConcurrentSkipListMap,
ConcurrentSkipListSet,
CopyOnWriteArrayList, CopyOnWriteArraySet
Uma vantagem destas em relação às versões
não concorrentes e sincronizadas é
escalabilidade, uma vez que as sincronizadas
trabalham utilizando lock simples
ThreadGroup
●
Permite o agrupamento lógico de threads
●
Grupos também podem conter subgrupos
ThreadGroup grupo =
new ThreadGroup("Grupo_teste");
...
Thread novathread = new Thread(grupo, "thr1");
ThreadGroup
●
●
A classe java.lang.ThreadGroup possui alguns
métodos para manipulação de grupos
Dentre esses, podemos citar o método
destroy() e o método interrupt()
Download