|
|
|
@ -353,9 +353,10 @@ if __name__ == "__main__":
|
|
|
|
|
<h2>Flink</h2> |
|
|
|
|
|
|
|
|
|
<p> |
|
|
|
|
Uma coisa que eu não mencionei: Não são todos os dados |
|
|
|
|
passados para as janelas; dados são agrupados e então |
|
|
|
|
passados para as janelas. |
|
|
|
|
Quando os registros são passados para a janela, eles são |
|
|
|
|
agrupados (ao invés de conter todos os registros que se |
|
|
|
|
encaixam na janela, somente os valores agrupados |
|
|
|
|
são guardados). |
|
|
|
|
</p> |
|
|
|
|
</section> |
|
|
|
|
</section> |
|
|
|
@ -371,18 +372,18 @@ if __name__ == "__main__":
|
|
|
|
|
<h2>Mr Banks</h2> |
|
|
|
|
|
|
|
|
|
<pre><code>source |
|
|
|
|
.filter(new Selector(processor)).name(s"Selecting ${processor * 100}% messages") |
|
|
|
|
.process(new ProcessMessages(brokenMessageTag)).name("Message Processor") |
|
|
|
|
.flatMap(new MessageSpliter).name("Get Logs") // get the lines in the message |
|
|
|
|
.filter(new LogBrokenFilter).name("Remove broken logs") |
|
|
|
|
.filter(new MissingClientFilter).name("Remove logs without clients") |
|
|
|
|
.flatMap(new MetricExtractor).name("Create metrics") |
|
|
|
|
.assignTimestampsAndWatermarks(new MetricTimestampAndWatermarks(watermarkTime)).name("Watermark") |
|
|
|
|
.keyBy(_.key) |
|
|
|
|
.window(TumblingEventTimeWindows.of(windowTime)) |
|
|
|
|
.allowedLateness(latenessTime) |
|
|
|
|
.sideOutputLateData(lateMessageTag) |
|
|
|
|
.reduce(new MetricReducer(), new MetricWindowTimeMatcher()).name("Group metrics") |
|
|
|
|
.filter(new Selector(processor)).name(s"Selecting ${processor * 100}% messages") |
|
|
|
|
.process(new ProcessMessages(brokenMessageTag)).name("Message Processor") |
|
|
|
|
.flatMap(new MessageSpliter).name("Get Logs") // get the lines in the message |
|
|
|
|
.filter(new LogBrokenFilter).name("Remove broken logs") |
|
|
|
|
.filter(new MissingClientFilter).name("Remove logs without clients") |
|
|
|
|
.flatMap(new MetricExtractor).name("Create metrics") |
|
|
|
|
.assignTimestampsAndWatermarks(new MetricTimestampAndWatermarks(watermarkTime)).name("Watermark") |
|
|
|
|
.keyBy(_.key) |
|
|
|
|
.window(TumblingEventTimeWindows.of(windowTime)) |
|
|
|
|
.allowedLateness(latenessTime) |
|
|
|
|
.sideOutputLateData(lateMessageTag) |
|
|
|
|
.reduce(new MetricReducer(), new MetricWindowTimeMatcher()).name("Group metrics") |
|
|
|
|
</code></pre> |
|
|
|
|
</section> |
|
|
|
|
|
|
|
|
|