diff --git a/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java b/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java index 3d45576..da99f75 100644 --- a/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java +++ b/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java @@ -1,21 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package net.juliobiason.flink; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -26,6 +8,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; @@ -73,9 +56,14 @@ public class SocketWindowWordCount { env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); + // Add the source for the datastream; in this case, the source is a socket. DataStream text = env.socketTextStream(hostname, port, "\n"); DataStream mainStream = text + // This is the first transformation we do: we receive a String from the socket, + // but we need a more structured data to deal with it, so we convert it to a + // WordEvent object. From this point on, the stream will see WordEvents, not + // Strings (we could convert WordEvent to another object, if needed, though). .flatMap(new FlatMapFunction() { @Override public void flatMap(String input, Collector output) { @@ -89,30 +77,74 @@ public class SocketWindowWordCount { } }) - .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { + // This is the start of the windowing grouping: We need to define a class + // that Flink will call to know how to extract the timestamp of some element + // (in this case, a WordEvent, 'cause that's the transformation exactly + // before the windowing); along with it, it will also ask what's the current + // watermark. + // + // One thing to keep in mind: All timestamps and watermarks must be in milliseconds. + // So, even if the user input is in seconds, internally we are keeping everything + // in millis -- and that's why you'll see "/1000" around this code. + .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks() { private long currentMaxTimestamp = 0; private long watermarkTime = 0; @Override public final long extractTimestamp(WordEvent element, long previousElementTimestamp) { long eventTimestamp = element.getTimestamp(); - if (eventTimestamp > currentMaxTimestamp) { - currentMaxTimestamp = eventTimestamp; - watermarkTime = currentMaxTimestamp - Time.seconds(10).toMilliseconds(); - displayStep("TIMESTAMP", "moved to " + (watermarkTime / 1000)); - } return eventTimestamp; } @Override - public final Watermark getCurrentWatermark() { - return new Watermark(watermarkTime); + public Watermark checkAndGetNextWatermark(WordEvent word, long extractedTimestamp) { + Watermark result = null; + if (extractedTimestamp > currentMaxTimestamp) { + currentMaxTimestamp = extractedTimestamp; + watermarkTime = currentMaxTimestamp - Time.seconds(10).toMilliseconds(); + displayStep("TIMESTAMP", "moved to " + (watermarkTime / 1000)); + result = new Watermark(watermarkTime); + } + return result; } }) + + // How elements appearing the the pipeline will be identified? + // Because we want to group words by... well... words, we need to point + // the grouping value. .keyBy(record -> record.getWord()) + + // Defining the window size: Remember, each element that Flink sees, + // it will call `extractedTimestamp` from the watermark object above; + // with it, it will assign a Window for it. A tumbling window means + // a Window that has a fixed point in time, so every 30 seconds (based + // on the event timestamp) there will be a new window. .window(TumblingEventTimeWindows.of(Time.seconds(30))) + + // By default, Flink keeps a single window (of the time above); with + // `allowedLateness` you can say "keep this much time behind the window + // in memory, just in case something behind the current window appears." .allowedLateness(Time.seconds(90)) + + // Reducing elements means that instead of keeping every single element + // that appears in the window, we'll pick them and generate a single one; + // if it is the first element of that key in the window, the element is + // kept as is; if there is already an element there and there is a new + // one coming, both are reduced to a new element. + // + // Something like this (imagine this is the content of the window): + // Current Element | incoming | result + // None | 1 | 1 + // 1 | 4 | 5 + // 5 | 2 | 7 + // + // Instead of keeping "1", "4" and "2" in the window, we keep a single + // element in it (which is the sum of the elements seen, although + // we could reduce in any way). .reduce( + // This is the reduce function; in this case, we are aggregating + // elements by adding their count, but keeping everything exactly + // as the first element seen. new ReduceFunction() { public WordEvent reduce(WordEvent element1, WordEvent element2) { long total = element1.getCount() + element2.getCount(); @@ -124,6 +156,10 @@ public class SocketWindowWordCount { return result; } }, + // Reduce allows a second object, which is run when the element is + // "ejected" (fired, in stream processing terms) out of the window; + // in this case, we change the event timestamp to the start of the + // window. new ProcessWindowFunction() { public void process(String key, Context context, Iterable values, Collector out) { TimeWindow window = context.window(); @@ -131,27 +167,28 @@ public class SocketWindowWordCount { WordEvent result = new WordEvent(window.getStart(), word.getWord(), word.getCount()); - displayStep("MOVE", word + " to " + window.getStart() + ", now " + result); + displayStep("MOVE", word + " to " + (window.getStart() / 1000) + ", now " + result); out.collect(result); } } } - ) - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(WordEvent input, Collector output) { - displayStep("DONE", "-----"); - output.collect(input); - } - }); + ); mainStream + // This is not really necessary, but Flink can't plug a sink directly + // into the Window result; so we have a function that returns the + // element without any modifications and the sink can use it. .flatMap(new FlatMapFunction() { @Override public void flatMap(WordEvent input, Collector output) { output.collect(input); } }) + + // The "sink" is where any element ejected from the window (fired) + // meets its permanent storage; in this example, we only display it, + // but we could save it to a number of storages (like ElasticSearch, + // DB, etc). .addSink(new SinkFunction() { @Override public synchronized void invoke( @@ -162,11 +199,12 @@ public class SocketWindowWordCount { } }); + // And finally, starts everything. env.execute("Socket Window WordCount"); } private static void displayStep(String eventType, String eventMessage) { - System.out.println(String.format("%-30s %s", + System.out.println(String.format("%-25s - %s", eventType, eventMessage)); }