diff --git a/_images/unclephil-continuous.gif b/_images/unclephil-continuous.gif new file mode 100644 index 0000000..dfc146a Binary files /dev/null and b/_images/unclephil-continuous.gif differ diff --git a/_images/unclephil-flinkpipeline.png b/_images/unclephil-flinkpipeline.png new file mode 100644 index 0000000..d4f4329 Binary files /dev/null and b/_images/unclephil-flinkpipeline.png differ diff --git a/_images/unclephil-flinkpipelinereal.png b/_images/unclephil-flinkpipelinereal.png new file mode 100644 index 0000000..e095f1b Binary files /dev/null and b/_images/unclephil-flinkpipelinereal.png differ diff --git a/_images/unclephil-kafka.png b/_images/unclephil-kafka.png new file mode 100644 index 0000000..dee0fd1 Binary files /dev/null and b/_images/unclephil-kafka.png differ diff --git a/_images/unclephil-kafkaflink.png b/_images/unclephil-kafkaflink.png new file mode 100644 index 0000000..4662f25 Binary files /dev/null and b/_images/unclephil-kafkaflink.png differ diff --git a/_images/unclephil-mrbanks.gif b/_images/unclephil-mrbanks.gif new file mode 100644 index 0000000..35e98ec Binary files /dev/null and b/_images/unclephil-mrbanks.gif differ diff --git a/_images/unclephil-nonstop.gif b/_images/unclephil-nonstop.gif new file mode 100644 index 0000000..f6cae12 Binary files /dev/null and b/_images/unclephil-nonstop.gif differ diff --git a/_images/unclephil-onemorething.jpg b/_images/unclephil-onemorething.jpg new file mode 100644 index 0000000..9bbafb1 Binary files /dev/null and b/_images/unclephil-onemorething.jpg differ diff --git a/_images/unclephil-step0.png b/_images/unclephil-step0.png new file mode 100644 index 0000000..4b44ace Binary files /dev/null and b/_images/unclephil-step0.png differ diff --git a/_images/unclephil-step1.png b/_images/unclephil-step1.png new file mode 100644 index 0000000..b2d68f2 Binary files /dev/null and b/_images/unclephil-step1.png differ diff --git a/_images/unclephil-step10.png b/_images/unclephil-step10.png new file mode 100644 index 0000000..2fc54f8 Binary files /dev/null and b/_images/unclephil-step10.png differ diff --git a/_images/unclephil-step2.png b/_images/unclephil-step2.png new file mode 100644 index 0000000..bff7081 Binary files /dev/null and b/_images/unclephil-step2.png differ diff --git a/_images/unclephil-step3.png b/_images/unclephil-step3.png new file mode 100644 index 0000000..e5e3b55 Binary files /dev/null and b/_images/unclephil-step3.png differ diff --git a/_images/unclephil-step4.png b/_images/unclephil-step4.png new file mode 100644 index 0000000..e94faaa Binary files /dev/null and b/_images/unclephil-step4.png differ diff --git a/_images/unclephil-step5.png b/_images/unclephil-step5.png new file mode 100644 index 0000000..f49c274 Binary files /dev/null and b/_images/unclephil-step5.png differ diff --git a/_images/unclephil-step6.png b/_images/unclephil-step6.png new file mode 100644 index 0000000..09d8285 Binary files /dev/null and b/_images/unclephil-step6.png differ diff --git a/_images/unclephil-step7.png b/_images/unclephil-step7.png new file mode 100644 index 0000000..d17ddb3 Binary files /dev/null and b/_images/unclephil-step7.png differ diff --git a/_images/unclephil-step8.png b/_images/unclephil-step8.png new file mode 100644 index 0000000..d522569 Binary files /dev/null and b/_images/unclephil-step8.png differ diff --git a/_images/unclephil-step9.png b/_images/unclephil-step9.png new file mode 100644 index 0000000..922be77 Binary files /dev/null and b/_images/unclephil-step9.png differ diff --git a/_images/unclephil-time.jpg b/_images/unclephil-time.jpg new file mode 100644 index 0000000..c9e0546 Binary files /dev/null and b/_images/unclephil-time.jpg differ diff --git a/_images/unclephil-victory.gif b/_images/unclephil-victory.gif new file mode 100644 index 0000000..5643cdf Binary files /dev/null and b/_images/unclephil-victory.gif differ diff --git a/_images/unclephil.jpg b/_images/unclephil.jpg new file mode 100644 index 0000000..86b2a25 Binary files /dev/null and b/_images/unclephil.jpg differ diff --git a/_sources/unclephil-flinkpipelinereal.dot b/_sources/unclephil-flinkpipelinereal.dot new file mode 100644 index 0000000..a3fe75f --- /dev/null +++ b/_sources/unclephil-flinkpipelinereal.dot @@ -0,0 +1,270 @@ +digraph UnclePhil { + kafka [color=gray, label="Kafka"]; + + selector1 [label="Selector"]; + selector2 [label="Selector"]; + selector3 [label="Selector"]; + selector4 [label="Selector"]; + selector5 [label="Selector"]; + selector6 [label="Selector"]; + selector7 [label="Selector"]; + selector8 [label="Selector"]; + selector9 [label="Selector"]; + selector10 [label="Selector"]; + selector11 [label="Selector"]; + selector12 [label="Selector"]; + + process1 [label="ProcessMessages"]; + process2 [label="ProcessMessages"]; + process3 [label="ProcessMessages"]; + process4 [label="ProcessMessages"]; + process5 [label="ProcessMessages"]; + process6 [label="ProcessMessages"]; + process7 [label="ProcessMessages"]; + process8 [label="ProcessMessages"]; + process9 [label="ProcessMessages"]; + process10 [label="ProcessMessages"]; + process11 [label="ProcessMessages"]; + process12 [label="ProcessMessages"]; + + splitter1 [label="MessageSpliter"]; + splitter2 [label="MessageSpliter"]; + splitter3 [label="MessageSpliter"]; + splitter4 [label="MessageSpliter"]; + splitter5 [label="MessageSpliter"]; + splitter6 [label="MessageSpliter"]; + splitter7 [label="MessageSpliter"]; + splitter8 [label="MessageSpliter"]; + splitter9 [label="MessageSpliter"]; + splitter10 [label="MessageSpliter"]; + splitter11 [label="MessageSpliter"]; + splitter12 [label="MessageSpliter"]; + + extract1 [label="MetricExtractor"]; + extract2 [label="MetricExtractor"]; + extract3 [label="MetricExtractor"]; + extract4 [label="MetricExtractor"]; + extract5 [label="MetricExtractor"]; + extract6 [label="MetricExtractor"]; + extract7 [label="MetricExtractor"]; + extract8 [label="MetricExtractor"]; + extract9 [label="MetricExtractor"]; + extract10 [label="MetricExtractor"]; + extract11 [label="MetricExtractor"]; + extract12 [label="MetricExtractor"]; + + keyby [label="keyBy/Hash"]; + + window1 [label="Window"]; + window2 [label="Window"]; + window3 [label="Window"]; + window4 [label="Window"]; + window5 [label="Window"]; + window6 [label="Window"]; + window7 [label="Window"]; + window8 [label="Window"]; + window9 [label="Window"]; + window10 [label="Window"]; + window11 [label="Window"]; + window12 [label="Window"]; + + reduce1 [label="Reduce"]; + reduce2 [label="Reduce"]; + reduce3 [label="Reduce"]; + reduce4 [label="Reduce"]; + reduce5 [label="Reduce"]; + reduce6 [label="Reduce"]; + reduce7 [label="Reduce"]; + reduce8 [label="Reduce"]; + reduce9 [label="Reduce"]; + reduce10 [label="Reduce"]; + reduce11 [label="Reduce"]; + reduce12 [label="Reduce"]; + + sink1 [label="Sink"]; + sink2 [label="Sink"]; + sink3 [label="Sink"]; + sink4 [label="Sink"]; + sink5 [label="Sink"]; + sink6 [label="Sink"]; + sink7 [label="Sink"]; + sink8 [label="Sink"]; + sink9 [label="Sink"]; + sink10 [label="Sink"]; + sink11 [label="Sink"]; + sink12 [label="Sink"]; + + kafka -> selector1; + kafka -> selector2; + kafka -> selector3; + kafka -> selector4; + kafka -> selector5; + kafka -> selector6; + kafka -> selector7; + kafka -> selector8; + kafka -> selector9; + kafka -> selector10; + kafka -> selector11; + kafka -> selector12; + + selector1 -> process1; + selector2 -> process2; + selector3 -> process3; + selector4 -> process4; + selector5 -> process5; + selector6 -> process6; + selector7 -> process7; + selector8 -> process8; + selector9 -> process9; + selector10 -> process10; + selector11 -> process11; + selector12 -> process12; + + process1 -> splitter1; + process2 -> splitter2; + process3 -> splitter3; + process4 -> splitter4; + process5 -> splitter5; + process6 -> splitter6; + process7 -> splitter7; + process8 -> splitter8; + process9 -> splitter9; + process10 -> splitter10; + process11 -> splitter11; + process12 -> splitter12; + + splitter1 -> extract1; + splitter1 -> extract1; + splitter1 -> extract1; + + splitter2 -> extract2; + splitter2 -> extract2; + splitter2 -> extract2; + + splitter3 -> extract3; + splitter3 -> extract3; + splitter3 -> extract3; + + splitter4 -> extract4; + splitter4 -> extract4; + splitter4 -> extract4; + + splitter5 -> extract5; + splitter5 -> extract5; + splitter5 -> extract5; + + splitter6 -> extract6; + splitter6 -> extract6; + splitter6 -> extract6; + + splitter7 -> extract7; + splitter7 -> extract7; + splitter7 -> extract7; + + splitter8 -> extract8; + splitter8 -> extract8; + splitter8 -> extract8; + + splitter9 -> extract9; + splitter9 -> extract9; + splitter9 -> extract9; + + splitter10 -> extract10; + splitter10 -> extract10; + splitter10 -> extract10; + + splitter11 -> extract11; + splitter11 -> extract11; + splitter11 -> extract11; + + splitter12 -> extract12; + splitter12 -> extract12; + splitter12 -> extract12; + + extract1 -> keyby; + extract1 -> keyby; + extract1 -> keyby; + + extract2 -> keyby; + extract2 -> keyby; + extract2 -> keyby; + + extract3 -> keyby; + extract3 -> keyby; + extract3 -> keyby; + + extract4 -> keyby; + extract4 -> keyby; + extract4 -> keyby; + + extract5 -> keyby; + extract5 -> keyby; + extract5 -> keyby; + + extract6 -> keyby; + extract6 -> keyby; + extract6 -> keyby; + + extract7 -> keyby; + extract7 -> keyby; + extract7 -> keyby; + + extract8 -> keyby; + extract8 -> keyby; + extract8 -> keyby; + + extract9 -> keyby; + extract9 -> keyby; + extract9 -> keyby; + + extract10 -> keyby; + extract10 -> keyby; + extract10 -> keyby; + + extract11 -> keyby; + extract11 -> keyby; + extract11 -> keyby; + + extract12 -> keyby; + extract12 -> keyby; + extract12 -> keyby; + + keyby -> window1; + keyby -> window2; + keyby -> window3; + keyby -> window4; + keyby -> window5; + keyby -> window6; + keyby -> window7; + keyby -> window8; + keyby -> window9; + keyby -> window10; + keyby -> window11; + keyby -> window12; + + window1 -> reduce1; + window2 -> reduce2; + window3 -> reduce3; + window4 -> reduce4; + window5 -> reduce5; + window6 -> reduce6; + window7 -> reduce7; + window8 -> reduce8; + window9 -> reduce9; + window10 -> reduce10; + window11 -> reduce11; + window12 -> reduce12; + + reduce1 -> sink1; + reduce2 -> sink2; + reduce3 -> sink3; + reduce4 -> sink4; + reduce5 -> sink5; + reduce6 -> sink6; + reduce7 -> sink7; + reduce8 -> sink8; + reduce9 -> sink9; + reduce10 -> sink10; + reduce11 -> sink11; + reduce12 -> sink12; +} diff --git a/_sources/unclephil-steps-drawio.xml b/_sources/unclephil-steps-drawio.xml new file mode 100644 index 0000000..c46a141 --- /dev/null +++ b/_sources/unclephil-steps-drawio.xml @@ -0,0 +1 @@ +5Zpdb5swFIZ/DbcT2BDCZZt1282kSb1Yd+mBC1YNjhynSffrZ8Dmy0RCFTCPchN8bPzxnFfHPgQHHvLrV46O2XeWYOoAN7k68LMDgAfgXv6UlrfastuHtSHlJFGNWsMj+YOV0VXWM0nwqddQMEYFOfaNMSsKHIueDXHOLv1mz4z2Rz2iFBuGxxhR0/qTJCKrrfvAbe3fMEkzPbLnqprfKH5JOTsXajwHwOfqqqtzpPtS7U8ZStilY4IPDjxwxkR9l18PmJZsNbb6uS83apt5c1yIKQ8oP70iesZ6xtW8xJtmUa0Gl+1dB95fMiLw4xHFZe1Fel/aMpFTWfLkbcxyEqv7k+DsBR8YZbzqCUbV1dRoslBa1DQwF/h6cyleA0gKD7McC/4mm6gHfDeoH1GaCxXiS+vASJmyru+gMiKlmbTpueUmbxS6cYyRgdGTq7rTqujQlOsTfWR9TAUrZMv7Z0LpwIQoSQtZjCUPLO33JS0iNXunKnKSJOUwoz7qe7HrphnQw6CPfmei90fQgxnIe952FQzcFSXsge2ChHBNkNAMBv4HDgZetGI08E32wQdmD8GK7AOT/e4Ds/eDFdnvFgre7ljwdqvLCN4zcdSMbp8lmn2xi9GfA2O4WYzeyFliOY5LpRUWcPTX5GjmFZvhuF+Row6xG+QIRnb45TgulWxZwHHNfUZj2yBHuOY+A8xUazMc19xngJk2bYbjqvuMmQJthaO/6j4z4RyOi+SufNMvSzFFp1PJycwou1DwlYinzv2vEuynoCwVcoJPinNVaOvqgXFi/GEwACknx848bjPfyiYQT7Fqth/n3QEajPDUNo4pEuS1P4sxyGqEH4zI+bXuBIPzVzDwUz179VTrKrOjcNCRO+ioXrLRUeXzZtnTZDAhjbBcBqFlMmj6Ud6L3qmC0B2oAAaLqWBCEmS5CvamCrwb0fffyMB48W2hDvQU/2MdRJZHAy+cSQZgt5wMJuSgdstA78T2hoPmOwybw8GEHNpyHegvW+zVwXsPiWvGgwnvACzXAbBeB3MdE5fUwYR3GJbrAI7oANikAzjXMXFJHUx4B2O5DnzrdTDXORFGy+lgwl/ilusgMHVgVbrgz3VMnE8Gsth+PFs3b79Qhg9/AQ== \ No newline at end of file diff --git a/mr-banks.html b/mr-banks.html new file mode 100644 index 0000000..d559956 --- /dev/null +++ b/mr-banks.html @@ -0,0 +1,502 @@ + + + + + + Mr Banks -- Ou Como Aprendi a Parar de Me Preocupar e Amar o Stream Processing + + + + + + + + + + + + + + + + + + + + + + + + +
+
+
+
+

+ Mr Banks + Ou Como Aprendi a Parar de Me Preocupar e Amar o Stream Processing +

+
+
+ +
+
+

Batch Processing

+ +

+cat /var/log/messages | cut -d\  -f 1 | sort | uniq
+						
+
+ +
+

Batch Processing

+ +

+v5	2017-10-17T11:09:59+00:00	7695w	27695w	
+1462530548	ha	1441740010	servico.clubeportoseguro.com.br	
+0.064	GET	BYPASS	200	-	200	1199	http	
+/api/produto	-	-	
+application/json; charset=utf-8	HTTP/1.1	
+230	1205	0.047	0.064	0.064	8000	
+200.230.226.125	17759	-	-	-	-	-	
+-	-	Jakarta Commons-HttpClient/3.1	-	-	-
+						
+ + +
+ +
+

Batch Processing

+ +
    +
  • Campos são nomeados
  • +
  • É verificado o status do cache (hit, miss, updating, bypass)
  • +
  • É calculado a banda utilizada (do usuário pra edge, da edge pra origem, da origem de volta pra edge, e da edge pro usuário)
  • +
  • Dados são agrupados por cliente, produto e janela de tempo
  • +
+
+ +
+

Batch Processing

+ +

+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-o
+
+import collections
+
+
+def default_data():
+    return {
+        'requests': 0,
+        'user_to_edge': 0,
+        'edge_to_origin': 0,
+        'origin_to_edge': 0,
+        'edge_to_user': 0,
+        'client': '',
+        'time': ''
+    }
+
+
+def to_bytes(string):
+    value = 0
+    for val1 in string.split(','):
+        for val2 in val1.split(':'):
+            if val2 != '-':
+                value += int(val2)
+    return value
+
+
+def main():
+    clients = collections.defaultdict(default_data)
+    with open('v5sample1.log') as content:
+        for line in content:
+            line = line.split('\t')
+            time = line[1]
+            client_id = line[2]
+            client = client_id + time[:16]
+            upstream_cache_status = line[10].lower()
+            upstream_bytes_received = to_bytes(line[14])
+            request_length = to_bytes(line[21])
+            bytes_sent = to_bytes(line[22])
+
+            clients[client]['requests'] += 1
+            clients[client]['user_to_edge'] += request_length
+            clients[client]['edge_to_origin'] += request_length \
+                if upstream_cache_status not in ['hit', 'updating'] \
+                else 0
+
+            clients[client]['origin_to_edge'] += upstream_bytes_received
+            clients[client]['edge_to_user'] += bytes_sent
+            clients[client]['client'] = client_id
+            clients[client]['time'] = 'at11_{min}'.format(min=time[14:16])
+
+if __name__ == "__main__":
+    main()
+						
+
+ +
+

Batch Processing

+ +

Ou seja, se pegássemos cada arquivo e processássemos + linha por linha, teríamos um batch processing.

+
+ +
+

Batch Processing

+ +

Problemas:

+ +
    +
  • Os logs não param.
  • +
  • São ~25 arquivos/segundo gerados pela CDN. (Só HTTP)
  • +
  • Cada arquivo é de uma máquina, não o agregado geral.
  • +
  • "Event time" vs "Processing Time"
  • +
+
+
+ +
+
+

"Os logs não param"

+ + The logs never stop + +

Kafka

+ + A simple representation of Kafka +
+ +
+

"São ~25 arquivos/segundo gerados pela CDN."

+ +

"Cada arquivo é de uma máquina, não o agregado geral."

+ + Kafka provides solution for those two too +
+ +
+

Se estamos processando os logs continuamente...

+ + + +

STREAM PROCESSING!

+
+
+ +
+
+

"Event time" vs "Processing Time"

+ + Taking about Event time vs Processing Time +
+ +
+

"Event time" vs "Processing Time"

+ +
    +
  • Cada máquina está gerando logs no seu ritmo (máquinas menos acessadas geral logs mais devagar)
  • +
  • Monkeys collect aguarda ter ~2000 linhas para enviar para o Kafka
  • +
  • Mensagens tem que chegar ao Kafka e então serem processadas
  • +
  • v5 2017-10-17T11:09:59+00:00
  • +
+
+ +
+

"Event time" vs "Processing Time"

+ +

Processing time: A hora que o evento foi processado.

+

Event time: A hora que o event foi gerado.

+
+
+ +
+
+

Flink

+ + A representation of flink consuming kafka events +
+ +
+

Flink

+ +
    +
  • Window: janela de tempo, com um período definido (1h, 1min).
  • +
  • Watermark: tempo a partir da criação da janela para começar a despejar os dados.
  • +
  • Lateness: tempo em que a janela permanece em memória.
  • +
  • Late arrivals: eventos que surgem depois que a janela foi removida.
  • +
+
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ + +
+ +
+

Flink

+ +

"One more thing..."

+ + +
+ +
+

Flink

+ +

+ Uma coisa que eu não mencionei: Não são todos os dados + passados para as janelas; dados são agrupados e então + passados para as janelas. +

+
+
+ +
+
+

Mr Banks

+ + +
+ +
+

Mr Banks

+ +
source
+      .filter(new Selector(processor)).name(s"Selecting ${processor * 100}% messages")
+      .process(new ProcessMessages(brokenMessageTag)).name("Message Processor")
+      .flatMap(new MessageSpliter).name("Get Logs")   // get the lines in the message
+      .filter(new LogBrokenFilter).name("Remove broken logs")
+      .filter(new MissingClientFilter).name("Remove logs without clients")
+      .flatMap(new MetricExtractor).name("Create metrics")
+      .assignTimestampsAndWatermarks(new MetricTimestampAndWatermarks(watermarkTime)).name("Watermark")
+      .keyBy(_.key)
+      .window(TumblingEventTimeWindows.of(windowTime))
+      .allowedLateness(latenessTime)
+      .sideOutputLateData(lateMessageTag)
+      .reduce(new MetricReducer(), new MetricWindowTimeMatcher()).name("Group metrics")
+						
+
+ +
+

Mr Banks

+ +
.filter(new Selector(processor)).name(s"Selecting ${processor * 100}% messages")
+
+ +
+

Mr Banks

+ +
.process(new ProcessMessages(brokenMessageTag)).name("Message Processor")
+
+ +
+

Mr Banks

+ +
.flatMap(new MessageSpliter).name("Get Logs")
+
+ +
+

Mr Banks

+ +
.filter(new LogBrokenFilter).name("Remove broken logs")
+
+ +
+

Mr Banks

+ +
.filter(new MissingClientFilter).name("Remove logs without clients")
+
+ +
+

Mr Banks

+ +
.flatMap(new MetricExtractor).name("Create metrics")
+
+ +
+

Mr Banks

+ +
.assignTimestampsAndWatermarks(new MetricTimestampAndWatermarks(watermarkTime)).name("Watermark")
+
+ +
+

Mr Banks

+ +
.keyBy(_.key)
+
+ +
+

Mr Banks

+ +
.window(TumblingEventTimeWindows.of(windowTime))
+
+ +
+

Mr Banks

+ +
.allowedLateness(latenessTime)
+
+ +
+

Mr Banks

+ +
.sideOutputLateData(lateMessageTag)
+
+ +
+

Mr Banks

+ +
.reduce(new MetricReducer(), new MetricWindowTimeMatcher()).name("Group metrics")
+
+ +
+

Mr Banks

+ + +
+ +
+

Mr Banks

+ + +
+
+ +
+
+

Perguntas?

+
+
+
+
+ + + + + + + +