diff --git a/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java b/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java index 62eb578..e8e6fe1 100644 --- a/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java +++ b/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java @@ -20,10 +20,19 @@ package net.juliobiason.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** @@ -57,37 +66,84 @@ public class SocketWindowWordCount { "type the input text into the command line"); return; } + System.out.println("Connecting to " + hostname); + System.out.println("And port " + port); // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); - // get input data by connecting to the socket DataStream text = env.socketTextStream(hostname, port, "\n"); - // parse the data, group it, window it, and aggregate the counts - DataStream windowCounts = text - - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(String value, Collector out) { - for (String word : value.split("\\s")) { - out.collect(new WordWithCount(word, 1L)); + DataStream mainStream = text + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(String input, Collector output) { + WordEvent event = new WordEvent(input); + System.out.println("CREATE\t " + event); + output.collect(event); + } + }) + + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { + private long currentMaxTimestamp = 0; + @Override + public final long extractTimestamp(WordEvent element, long previousElementTimestamp) { + long eventTimestamp = element.getTimestamp(); + currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp); + System.out.println("MAXTIMESTAMP\t " + currentMaxTimestamp); + return eventTimestamp; + } + @Override + public final Watermark getCurrentWatermark() { + long watermarkTime = currentMaxTimestamp - 10; + // System.out.println("WATERMARK\t " + watermarkTime); + return new Watermark(watermarkTime); + } + }) + .keyBy(record -> record.getWord()) + .window(TumblingEventTimeWindows.of(Time.seconds(30))) + .allowedLateness(Time.seconds(90)) + .reduce( + new ReduceFunction() { + public WordEvent reduce(WordEvent element1, WordEvent element2) { + long total = element1.getCount() + element2.getCount(); + System.out.print("REDUCE\t " + element1 + " + " + element2 + " = " + total + "\n"); + return new WordEvent(element1.getTimestamp(), + element1.getWord(), + total); + } + }, + new ProcessWindowFunction() { + public void process(String key, Context context, Iterable values, Collector out) { + TimeWindow window = context.window(); + for (WordEvent word: values) { + System.out.println("MOVE\t " + word + " to " + window.getStart()); + out.collect(new WordEvent(window.getStart(), + word.getWord(), + word.getCount())); + } } } - }) - - .keyBy("word") - .timeWindow(Time.seconds(5)) - - .reduce(new ReduceFunction() { - @Override - public WordWithCount reduce(WordWithCount a, WordWithCount b) { - return new WordWithCount(a.word, a.count + b.count); - } - }); - - // print the results with a single thread, rather than in parallel - windowCounts.print().setParallelism(1); + ); + + mainStream. + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(WordEvent input, Collector output) { + output.collect(input); + } + }); + // .addSink(new SinkFunction() { + // @Override + // public synchronized void invoke( + // WordEvent word, + // org.apache.flink.streaming.api.functions.sink.SinkFunction.Context ctx) + // throws Exception { + // System.out.println("SINK\t " + word); + // } + // }); env.execute("Socket Window WordCount"); } @@ -95,23 +151,43 @@ public class SocketWindowWordCount { // ------------------------------------------------------------------------ /** - * Data type for words with count. + * The event of a word. */ - public static class WordWithCount { + public static class WordEvent { + private long timestamp; + private String word; + private long count; + + public WordEvent(String input) { + String[] frags = input.split(" ", 2); + this.timestamp = Integer.parseInt(frags[0]) * 1000; // must be millis + this.word = frags[1]; + this.count = 1; + } + + public WordEvent(long timestamp, String word, long count) { + this.timestamp = timestamp; + this.word = word; + this.count = count; + } - public String word; - public long count; + public long getTimestamp() { + return this.timestamp; + } - public WordWithCount() {} + public String getWord() { + return this.word; + } - public WordWithCount(String word, long count) { - this.word = word; - this.count = count; + public long getCount() { + return this.count; } @Override public String toString() { - return word + " : " + count; + return String.format("%s at %d", + this.word, + (this.timestamp / 1000)); // still display as secs } } }