cat /var/log/messages | cut -d\ -f 1 | sort | uniq
v5 2017-10-17T11:09:59+00:00 7695w 27695w
1462530548 ha 1441740010 servico.clubeportoseguro.com.br
0.064 GET BYPASS 200 - 200 1199 http
/api/produto - -
application/json; charset=utf-8 HTTP/1.1
230 1205 0.047 0.064 0.064 8000
200.230.226.125 17759 - - - - -
- - Jakarta Commons-HttpClient/3.1 - - -
#!/usr/bin/env python
# -*- encoding: utf-8 -*-o
import collections
def default_data():
return {
'requests': 0,
'user_to_edge': 0,
'edge_to_origin': 0,
'origin_to_edge': 0,
'edge_to_user': 0,
'client': '',
'time': ''
}
def to_bytes(string):
value = 0
for val1 in string.split(','):
for val2 in val1.split(':'):
if val2 != '-':
value += int(val2)
return value
def main():
clients = collections.defaultdict(default_data)
with open('v5sample1.log') as content:
for line in content:
line = line.split('\t')
time = line[1]
client_id = line[2]
client = client_id + time[:16]
upstream_cache_status = line[10].lower()
upstream_bytes_received = to_bytes(line[14])
request_length = to_bytes(line[21])
bytes_sent = to_bytes(line[22])
clients[client]['requests'] += 1
clients[client]['user_to_edge'] += request_length
clients[client]['edge_to_origin'] += request_length \
if upstream_cache_status not in ['hit', 'updating'] \
else 0
clients[client]['origin_to_edge'] += upstream_bytes_received
clients[client]['edge_to_user'] += bytes_sent
clients[client]['client'] = client_id
clients[client]['time'] = 'at11_{min}'.format(min=time[14:16])
if __name__ == "__main__":
main()
Ou seja, se pegássemos cada arquivo e processássemos linha por linha, teríamos um batch processing.
Problemas:
Kafka
"São ~25 arquivos/segundo gerados pela CDN."
"Cada arquivo é de uma máquina, não o agregado geral."
Se estamos processando os logs continuamente...
v5 2017-10-17T11:09:59+00:00
Processing time: A hora que o evento foi processado.
Event time: A hora que o event foi gerado.
"One more thing..."
Uma coisa que eu não mencionei: Não são todos os dados passados para as janelas; dados são agrupados e então passados para as janelas.
source
.filter(new Selector(processor)).name(s"Selecting ${processor * 100}% messages")
.process(new ProcessMessages(brokenMessageTag)).name("Message Processor")
.flatMap(new MessageSpliter).name("Get Logs") // get the lines in the message
.filter(new LogBrokenFilter).name("Remove broken logs")
.filter(new MissingClientFilter).name("Remove logs without clients")
.flatMap(new MetricExtractor).name("Create metrics")
.assignTimestampsAndWatermarks(new MetricTimestampAndWatermarks(watermarkTime)).name("Watermark")
.keyBy(_.key)
.window(TumblingEventTimeWindows.of(windowTime))
.allowedLateness(latenessTime)
.sideOutputLateData(lateMessageTag)
.reduce(new MetricReducer(), new MetricWindowTimeMatcher()).name("Group metrics")
.filter(new Selector(processor)).name(s"Selecting ${processor * 100}% messages")
.process(new ProcessMessages(brokenMessageTag)).name("Message Processor")
.flatMap(new MessageSpliter).name("Get Logs")
.filter(new LogBrokenFilter).name("Remove broken logs")
.filter(new MissingClientFilter).name("Remove logs without clients")
.flatMap(new MetricExtractor).name("Create metrics")
.assignTimestampsAndWatermarks(new MetricTimestampAndWatermarks(watermarkTime)).name("Watermark")
.keyBy(_.key)
.window(TumblingEventTimeWindows.of(windowTime))
.allowedLateness(latenessTime)
.sideOutputLateData(lateMessageTag)
.reduce(new MetricReducer(), new MetricWindowTimeMatcher()).name("Group metrics")