Mr Banks Ou Como Aprendi a Parar de Me Preocupar e Amar o Stream Processing

Batch Processing


cat /var/log/messages | grep sync | cut -d\  -f 1 | sort | uniq
						

Batch Processing


v5	2017-10-17T11:09:59+00:00	7695w	27695w	
1462530548	ha	1441740010	servico.clubeportoseguro.com.br	
0.064	GET	BYPASS	200	-	200	1199	http	
/api/produto	-	-	
application/json; charset=utf-8	HTTP/1.1	
230	1205	0.047	0.064	0.064	8000	
200.230.226.125	17759	-	-	-	-	-	
-	-	Jakarta Commons-HttpClient/3.1	-	-	-
						

Batch Processing

  • Campos são nomeados
  • É verificado o status do cache (hit, miss, updating, bypass)
  • É calculado a banda utilizada (do usuário pra edge, da edge pra origem, da origem de volta pra edge, e da edge pro usuário)
  • Dados são agrupados por cliente, produto e janela de tempo

Batch Processing


#!/usr/bin/env python
# -*- encoding: utf-8 -*-o

import collections


def default_data():
    return {
        'requests': 0,
        'user_to_edge': 0,
        'edge_to_origin': 0,
        'origin_to_edge': 0,
        'edge_to_user': 0,
        'client': '',
        'time': ''
    }


def to_bytes(string):
    value = 0
    for val1 in string.split(','):
        for val2 in val1.split(':'):
            if val2 != '-':
                value += int(val2)
    return value


def main():
    clients = collections.defaultdict(default_data)
    with open('v5sample1.log') as content:
        for line in content:
            line = line.split('\t')
            time = line[1]
            client_id = line[2]
            client = client_id + time[:16]
            upstream_cache_status = line[10].lower()
            upstream_bytes_received = to_bytes(line[14])
            request_length = to_bytes(line[21])
            bytes_sent = to_bytes(line[22])

            clients[client]['requests'] += 1
            clients[client]['user_to_edge'] += request_length
            clients[client]['edge_to_origin'] += request_length \
                if upstream_cache_status not in ['hit', 'updating'] \
                else 0

            clients[client]['origin_to_edge'] += upstream_bytes_received
            clients[client]['edge_to_user'] += bytes_sent
            clients[client]['client'] = client_id
            clients[client]['time'] = 'at11_{min}'.format(min=time[14:16])

if __name__ == "__main__":
    main()
						

Batch Processing

Ou seja, se pegássemos cada arquivo e processássemos linha por linha, teríamos um batch processing.

Batch Processing

Problemas:

  • Os logs não param.
  • São ~25 arquivos/segundo gerados pela CDN. (Só HTTP)
  • Cada arquivo é de uma máquina, não o agregado geral.
  • "Event time" vs "Processing Time"

"Os logs não param"

The logs never stop

Kafka

A simple representation of Kafka

"São ~25 arquivos/segundo gerados pela CDN."

"Cada arquivo é de uma máquina, não o agregado geral."

Kafka provides solution for those two too

Se estamos processando os logs continuamente...

STREAM PROCESSING!

"Event time" vs "Processing Time"

Taking about Event time vs Processing Time

"Event time" vs "Processing Time"

  • Cada máquina está gerando logs no seu ritmo (máquinas menos acessadas geral logs mais devagar)
  • Monkeys collect aguarda ter ~2000 linhas para enviar para o Kafka
  • Mensagens tem que chegar ao Kafka e então serem processadas
  • v5 2017-10-17T11:09:59+00:00

"Event time" vs "Processing Time"

Processing time: A hora que o evento foi processado.

Event time: A hora que o event foi gerado.

Flink

A representation of flink consuming kafka events

Flink

  • Window: janela de tempo, com um período definido (1h, 1min).
  • Watermark: tempo a partir da criação da janela para começar a despejar os dados.
  • Lateness: tempo em que a janela permanece em memória.
  • Late arrivals: eventos que surgem depois que a janela foi removida.

Flink

Flink

Flink

Flink

Flink

Flink

Flink

Flink


Flink

Flink

Flink

Flink

"One more thing..."

Flink

Uma coisa que eu não mencionei: Não são todos os dados passados para as janelas; dados são agrupados e então passados para as janelas.

Mr Banks

Mr Banks

source
      .filter(new Selector(processor)).name(s"Selecting ${processor * 100}% messages")
      .process(new ProcessMessages(brokenMessageTag)).name("Message Processor")
      .flatMap(new MessageSpliter).name("Get Logs")   // get the lines in the message
      .filter(new LogBrokenFilter).name("Remove broken logs")
      .filter(new MissingClientFilter).name("Remove logs without clients")
      .flatMap(new MetricExtractor).name("Create metrics")
      .assignTimestampsAndWatermarks(new MetricTimestampAndWatermarks(watermarkTime)).name("Watermark")
      .keyBy(_.key)
      .window(TumblingEventTimeWindows.of(windowTime))
      .allowedLateness(latenessTime)
      .sideOutputLateData(lateMessageTag)
      .reduce(new MetricReducer(), new MetricWindowTimeMatcher()).name("Group metrics")
						

Mr Banks

.filter(new Selector(processor)).name(s"Selecting ${processor * 100}% messages")

Mr Banks

.process(new ProcessMessages(brokenMessageTag)).name("Message Processor")

Mr Banks

.flatMap(new MessageSpliter).name("Get Logs")

Mr Banks

.filter(new LogBrokenFilter).name("Remove broken logs")

Mr Banks

.filter(new MissingClientFilter).name("Remove logs without clients")

Mr Banks

.flatMap(new MetricExtractor).name("Create metrics")

Mr Banks

.assignTimestampsAndWatermarks(new MetricTimestampAndWatermarks(watermarkTime)).name("Watermark")

Mr Banks

.keyBy(_.key)

Mr Banks

.window(TumblingEventTimeWindows.of(windowTime))

Mr Banks

.allowedLateness(latenessTime)

Mr Banks

.sideOutputLateData(lateMessageTag)

Mr Banks

.reduce(new MetricReducer(), new MetricWindowTimeMatcher()).name("Group metrics")

Mr Banks

Mr Banks

Perguntas?