Processando Big Data com Java: Receitas para resolver

Propaganda
Processando Big Data com Java: Receitas para resolver problemas comuns Fabiane Bizinella Nardon (@fabianenardon) “The best minds of my genera3on are thinking about how to make people click ads. That sucks.” Jeff Hammerbacher
Big Data em Publicidade Nossos Números Dados imensos 1.5 bilhões novos registros/dia Métricas para medir eficiência 50% aumento de CTR Resposta rápida RTB precisa retorno em 100ms BIG DATA em Java HADOOP HDFS SPARK STORM PROCESSAMENTO PARALELO E DISTRIBUÍDO INPUT M1 M2 M3 OUTPUT M4 M5 Mais paralelismo pode deixar sua aplicação mais lenta UID:1 = FUTEBOL, CINEMA, TECNOLOGIA UID2 = MODA, TECNOLOGIA M1 M2 M3 FUTEBOL: 1 CINEMA: 1 TECNOLOGIA: 2 MODA: 1 M4 M5 UID:1 = FUTEBOL, CINEMA, TECNOLOGIA UID2 = MODA, TECNOLOGIA M1 M2 M3 M4 M5 M6 FUTEBOL: 1 CINEMA: 1 TECNOLOGIA: 2 MODA: 1 M7 M8 M9 M10 ~10h UID:1 = FUTEBOL, CINEMA, TECNOLOGIA UID2 = MODA, TECNOLOGIA M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20 M21 M22 M23 M24 M25 M26 M27 M28 M29 M30 FUTEBOL: 1 CINEMA: 1 TECNOLOGIA: 2 MODA: 1 ~36h UID:1 = FUTEBOL, CINEMA, TECNOLOGIA UID2 = MODA, TECNOLOGIA M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 M13 M14 M15 M16 M17 M18 M19 M20 M21 M22 M23 M24 M25 M26 M27 M28 M29 M30 FUTEBOL: 1 CINEMA: 1 TECNOLOGIA: 2 MODA: 1 ~36h Hadoop Fair Scheduler conf/mapred-­‐site.xml <property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.FairScheduler</value>
</property>
conf/fair-­‐scheduler.xml <alloca^ons> ... <pool name="smallpool"> <maxMaps>10</maxMaps> </pool> ... </alloca^ons>
Código config.set("mapred.fairscheduler.pool", "smallpool");
PROCESSAMENTO PARALELO E DISTRIBUÍDO DADOS M1 M2 M3 M4 M5 PROCESSAMENTO PARALELO E DISTRIBUÍDO DADOS CACHE M1 CACHE M2 M3 CACHE M4 M5 Cache distribuído tem que ser bem distribuído SHARDING CACHE C A C CACHE H E CACHE 1 2 3 CLIENTE 4 5 CLIENTE CLIENTE SHARDING Com Hashing (Ex: Murmur) CACHE 5 1 a1d1c1 a2b2c3 2 a2b2c4 3 4 a2f2c4 b2f2c4 CACHE CACHE a2b2c4 CLIENTE CLIENTE CLIENTE SHARDING Escalando CACHE CACHE CACHE CACHE CACHE CACHE CLIENTE CLIENTE CLIENTE SHARDING Escalando CACHE CACHE CACHE CLIENTE CLIENTE CACHE CACHE CACHE CLIENTE PIPELINES Processo 1 Processo 2 Processo 3 Saída 1 Saída 2 Às vezes, a “inteligência” do seu framework não vai te dar o melhor pipeline BANCO DE DADOS CONTA NÚMERO DE USUÁRIOS POR INTERESSES CONTA NÚMERO DE USUÁRIOS POR PAÍS CONTA NÚMERO DE USUÁRIOS POR IDADE GRAVA TOTAL NO BD GRAVA TOTAL NO BD GRAVA TOTAL NO BD Cache Intermediário PCollection<RedisRecord> users = pipeline.read(source);
PCollection<Tuple3<String, String, Integer>> globalStats =
users.parallelDo("Count global cookies", new CountCookies(properties),
Writables.triples(Writables.strings(), Writables.strings(),
Writables.ints()));
pipeline.cache(globalStats, CachingOptions.builder()
.useDisk(true).useMemory(false)
.replicas(1)
.deserialized(true)
.build());
pipeline.run();
PTable<String, Integer> statsByInterest = globalStats.parallelDo(
"Cookies by interest",
new CountCookiesByInterest(),
Writables.tableOf(Writables.strings(), Writables.ints()));
PGroupedTable<String, Integer> groupedByInterest =
statsByInterest.groupByKey(1);
QUE TAMANHO É BIG DATA? * Tamanho default to bloco do HDFS é 64MB * Se o arquivo é muito pequeno, o tempo para iniciar/terminar um job será maior que o job em si * Se os dados cabem em memória, fazer um POJP vai ser muito mais rápido É melhor processar um arquivo grande. E isso é um problema. Arquivos Pequenos: Se puder evitar, não processe Alterna^va: Concatene no HDFS Se puder concatenar no file system e depois mandar para o HDFS, melhor ainda! Mandando arquivão para o HDFS hadoop fs –put /arq.log /hdfsFolder
Isso não é atômico! Use: hadoop fs –put /arq.log /hdfsFolder/.arq.log
hadoop fs -mv /hdfsFolder/.arq.log /hdfsFolder/arq.log
Quando processar um arquivo grande é um problema ARQUIVO ENTRADA Contador Contador Contador Total Contador Quando processar um arquivo grande é um problema 500 GB Contador …… Contador Contador 7.800 Maps Total Quando processar um arquivo grande é um problema 500 GB config.setLong(
RuntimeParameters.COMBINE_FILE_BLOCK_SIZE,
6_710_886_400L);
Contador …… Contador Contador 7.800 Maps Total Muitas vezes, se você não processar todos os dados, dá na mesma Usuários por Interesse (TODOS) FUTEBOL EMPREGO FINANÇAS POLÍTICA Usuários por Interesse (1%) FUTEBOL EMPREGO FINANÇAS POLÍTICA Usuários por Interesse TODOS 1% Como fazer uma boa amostra Tamanho tem que ser representa^vo Distribuição tem que ser homogênea Tamanho da Amostra MAIOR AMOSTRA = MAIS ACURÁCIA Tamanho da Amostra Defina tamanho mínimo da amostra. Se a base ^ver menos que isso, use a base toda Distribuição da Amostra (Ex: Redis) SHARDING 1 SHARDING 2 RANDOMKEYs SHARDING 3 RANDOMKEYs RANDOMKEYs ITEMS POR SHARDING = TAMANHO DA AMOSTRA / NÚMERO DE SHARDINGS Distribuição da Amostra (Ex: Redis) Ta = Tamanho da Amostra SHARDING 1 SHARDING 2 SHARDING 3 Tt = Tamanho Total Na = Número de Itens na Amostra Nt = Número de Itens Total Nt = Na * Tt / Ta RANDOMKEYs RANDOMKEYs RANDOMKEYs Exemplo: Ta = 1000 ITEMS POR SHARDING = TAMANHO Tt D=A 1A00.000 MOSTRA / NÚMERO DE SHARDINGS Na = 400 Mulheres Nt = 400 * 100.000 / 1000 = 40.000 E quando não se sabe o tamanho total? Reservoir Sampling 1 2 3 4 5 A B C D E F Random (0..1): 0.7 K = Ta / i K = 5 / 6 = 0.83 Se K > Random => TROCA! Reservoir Sampling Distribuído A B C D E F G H I J K L M N O P Q R S T U V X Y W Z 1 2 3 4 5 Reservoir Sampling Distribuído A B C D E F G H I 1 J K L M N O P Q R 2 3 4 S T U V X Y W Z 5 Reservoir Sampling Distribuído A:0.1 B:0.3 C:0.2 D:0.7 E:0.9 F:0.11 G:0.4 H:0.6 I:0.76 J:0.8 K:0.2 L:0.54 M:0.4 N:0.21 O:0.33 P:0.56 Q:0.32 R:0.23 S:0.21 T:0.32 U:0.22 V:0.7 X:0.12 Y:
0.23 W:0.3 Z:0.76 private SortedMap<Double, MyObject> reservoir;
...
if (reservoir.size() < SAMPLE_SIZE) {
reservoir.put(score, myObject);
} else if (score > reservoir.firstKey()) {
reservoir.remove(reservoir.firstKey());
reservoir.put(score, myObject);
}
1 2 3 4 5 Reservoir Sampling Distribuído A:0.1 B:0.3 C:0.2 D:0.7 E:0.9 F:0.11 G:0.4 H:0.6 I:0.76 H:0.6 D:0.7 E:0.9 F:0.11 I:0.76 J:0.8 K:0.2 L:0.54 M:0.4 N:0.21 O:0.33 P:0.56 Q:0.32 R:0.23 S:0.21 T:0.32 U:0.22 V:0.7 X:0.12 Y:
0.23 W:0.3 Z:0.76 R:0.23 Q:0.32 O:0.33 L:0.54 P:0.56 S:0.21 U:0.22 Y:0.23 T:0.32 Z:0.76 COMBINER 1 2 3 4 5 O L P I Z Reservoir Sampling Distribuído Apache Crunch: import org.apache.crunch.lib.Sample;
Sample.reservoirSample(PCollection<T> input, int sampleSize)
Processando Big Data com Java: Receitas para resolver problemas comuns Fabiane Bizinella Nardon (@fabianenardon) h}p://www.tailtarget.com/vagas/ 
Download