Julio Biason
6 years ago
1 changed files with 100 additions and 0 deletions
@ -0,0 +1,100 @@ |
|||||||
|
# Flink Side Output Sample |
||||||
|
|
||||||
|
This is an example of working with Flink and Side outputs. |
||||||
|
|
||||||
|
## What this is |
||||||
|
|
||||||
|
The pipeline is, basically, processing log lines, turning them into metrics, |
||||||
|
reducing the results and applying them to time windows (tumbling windows, in |
||||||
|
the Flink jargon, which basically are consecutive blocks of elements split by |
||||||
|
their event time). Once the elements are grouped in their windows, we want |
||||||
|
them to go to different sinks (stored). |
||||||
|
|
||||||
|
## The layout |
||||||
|
|
||||||
|
### Metrics |
||||||
|
|
||||||
|
The metrics are the information being extracted from the log lines. We have |
||||||
|
two different metrics: a simple metrics (`SimpleMetric`) that has a single |
||||||
|
value and a more complex (`ComplexMetric`) with more values. |
||||||
|
|
||||||
|
The exercise is to have two different types of elements floating in the data |
||||||
|
stream, which would require different sinks for each. |
||||||
|
|
||||||
|
Even if those two are different elements, both use the same trait (interface), |
||||||
|
so even of both float in the data stream, they are processed in the same way. |
||||||
|
|
||||||
|
### The Source |
||||||
|
|
||||||
|
In this example, we use a function (`SourceFunction`) to generate the |
||||||
|
elements. It basically have a list of lines and throw each in the data stream. |
||||||
|
|
||||||
|
### Making it easier to deal with the lines |
||||||
|
|
||||||
|
Because the lines are pure text, we need an easy way to extract the |
||||||
|
information on them. For this, we used a `flatMap` to split the lines in their |
||||||
|
separator (in this example, a tab character) and then name each field, |
||||||
|
creating a map/object/dictionary (Scala and steam/functional processing names |
||||||
|
coliding here). This way, when we actually create the metrics, we can simply |
||||||
|
request the fields in the map by their names. |
||||||
|
|
||||||
|
Note, though, that each line becomes a single map, so a `map` would also work |
||||||
|
here. We simply used `flatMap` because instead of working with a single line |
||||||
|
of log, we could work with blocks of lines and the map function would generate |
||||||
|
more maps/objects/dictionaries. |
||||||
|
|
||||||
|
### Extracting metrics |
||||||
|
|
||||||
|
To extract the metrics, we use another `flatMap`, this time because we are |
||||||
|
extracting more than one metric from each line. |
||||||
|
|
||||||
|
### Windows |
||||||
|
|
||||||
|
As mentioned, we group elements by window, so we need to define how the window |
||||||
|
works. The very definition on how we define the time of the events that should |
||||||
|
create/close windows is in the very start of the pipeline, when we indicated |
||||||
|
to use `TimeCharacteristic.EventTime`, which means "use the time of the event, |
||||||
|
instead of the time the log is being processed or some other information". |
||||||
|
|
||||||
|
Because we are using the event time, we need to indicate how the time needs to |
||||||
|
be extracted. This is done in |
||||||
|
`AssignerWithPeriodicWatermarks.extractTimestamp`. Another thing to notice is |
||||||
|
that we also define the watermakr, the point in which, if an even before this |
||||||
|
time appears, the window of time it belongs will be fired (sent to the sink). |
||||||
|
In this example, it is 1 second after the more recent event that appeared in |
||||||
|
the data stream. |
||||||
|
|
||||||
|
The metrics, inside their windows, are grouped by their key, with windows of 1 |
||||||
|
minute, which will survive for 30 seconds (as this is defined as the late |
||||||
|
possible time) and everything sent after this is put on a side output for |
||||||
|
later processing. |
||||||
|
|
||||||
|
### Reducing |
||||||
|
|
||||||
|
Once an element is added to a window, it is reduced (in functional jargon) |
||||||
|
along elements of its own key. This is what we do with `ReduceFunction` and |
||||||
|
the fact that the Metric trait have an `add` method. |
||||||
|
|
||||||
|
### Sinks |
||||||
|
|
||||||
|
When the window fires, we divide the results into two different side outputs |
||||||
|
-- remember, we have two different metrics and each require a different sink. |
||||||
|
the `ProcessFunction` does that, based on the class of the metric, sending |
||||||
|
each metric type to a different side outputs. |
||||||
|
|
||||||
|
Those side outputs are then captured and sent to different sinks. |
||||||
|
|
||||||
|
## Running |
||||||
|
|
||||||
|
Simply, install [SBT](https://www.scala-sbt.org/) and run `sbt run`. SBT will |
||||||
|
download all necessary dependencies and run the pipeline in standalone mode. |
||||||
|
|
||||||
|
We didn't test it using the full Jobmanager+TaskManager model of Flink and, |
||||||
|
thus, this is given as an exercise to the reader. :) |
||||||
|
|
||||||
|
## The problem |
||||||
|
|
||||||
|
The problem here -- which this sample tried to demonstrate -- is that once the |
||||||
|
windows are generated and are processed by the `ProcessFunction` that sends |
||||||
|
each metric to its side output, the side outputs are never captured again -- |
||||||
|
so even if they are sent to side outputs, they die in the limbo. |
Loading…
Reference in new issue