From e4cd550ba3920b0e18b34834a125a1e7e3a015c5 Mon Sep 17 00:00:00 2001 From: Julio Biason Date: Fri, 3 Aug 2018 10:47:28 -0300 Subject: [PATCH] fixed sink, but watermark is weird --- .../flink/SocketWindowWordCount.java | 78 ++++++++++++------- 1 file changed, 51 insertions(+), 27 deletions(-) diff --git a/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java b/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java index e8e6fe1..3d45576 100644 --- a/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java +++ b/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java @@ -48,7 +48,6 @@ import org.apache.flink.util.Collector; */ @SuppressWarnings("serial") public class SocketWindowWordCount { - public static void main(String[] args) throws Exception { // the host and the port to connect to @@ -80,25 +79,33 @@ public class SocketWindowWordCount { .flatMap(new FlatMapFunction() { @Override public void flatMap(String input, Collector output) { - WordEvent event = new WordEvent(input); - System.out.println("CREATE\t " + event); - output.collect(event); + try { + WordEvent event = new WordEvent(input); + displayStep("CREATE", event.toString()); + output.collect(event); + } catch (Exception exc) { + displayStep("ERROR", "unparseable data: " + input); + } } }) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { private long currentMaxTimestamp = 0; + private long watermarkTime = 0; + @Override public final long extractTimestamp(WordEvent element, long previousElementTimestamp) { long eventTimestamp = element.getTimestamp(); - currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp); - System.out.println("MAXTIMESTAMP\t " + currentMaxTimestamp); + if (eventTimestamp > currentMaxTimestamp) { + currentMaxTimestamp = eventTimestamp; + watermarkTime = currentMaxTimestamp - Time.seconds(10).toMilliseconds(); + displayStep("TIMESTAMP", "moved to " + (watermarkTime / 1000)); + } return eventTimestamp; } + @Override public final Watermark getCurrentWatermark() { - long watermarkTime = currentMaxTimestamp - 10; - // System.out.println("WATERMARK\t " + watermarkTime); return new Watermark(watermarkTime); } }) @@ -109,45 +116,61 @@ public class SocketWindowWordCount { new ReduceFunction() { public WordEvent reduce(WordEvent element1, WordEvent element2) { long total = element1.getCount() + element2.getCount(); - System.out.print("REDUCE\t " + element1 + " + " + element2 + " = " + total + "\n"); - return new WordEvent(element1.getTimestamp(), + WordEvent result = new WordEvent(element1.getTimestamp(), element1.getWord(), total); + displayStep("REDUCE", element1 + " + " + element2 + ", now " + result); + + return result; } }, new ProcessWindowFunction() { public void process(String key, Context context, Iterable values, Collector out) { TimeWindow window = context.window(); for (WordEvent word: values) { - System.out.println("MOVE\t " + word + " to " + window.getStart()); - out.collect(new WordEvent(window.getStart(), + WordEvent result = new WordEvent(window.getStart(), word.getWord(), - word.getCount())); + word.getCount()); + displayStep("MOVE", word + " to " + window.getStart() + ", now " + result); + out.collect(result); } } } - ); + ) + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(WordEvent input, Collector output) { + displayStep("DONE", "-----"); + output.collect(input); + } + }); - mainStream. + mainStream .flatMap(new FlatMapFunction() { @Override public void flatMap(WordEvent input, Collector output) { output.collect(input); } + }) + .addSink(new SinkFunction() { + @Override + public synchronized void invoke( + WordEvent word, + org.apache.flink.streaming.api.functions.sink.SinkFunction.Context ctx) + throws Exception { + displayStep("SINK", word.toString()); + } }); - // .addSink(new SinkFunction() { - // @Override - // public synchronized void invoke( - // WordEvent word, - // org.apache.flink.streaming.api.functions.sink.SinkFunction.Context ctx) - // throws Exception { - // System.out.println("SINK\t " + word); - // } - // }); env.execute("Socket Window WordCount"); } + private static void displayStep(String eventType, String eventMessage) { + System.out.println(String.format("%-30s %s", + eventType, + eventMessage)); + } + // ------------------------------------------------------------------------ /** @@ -160,7 +183,7 @@ public class SocketWindowWordCount { public WordEvent(String input) { String[] frags = input.split(" ", 2); - this.timestamp = Integer.parseInt(frags[0]) * 1000; // must be millis + this.timestamp = Time.seconds(Integer.parseInt(frags[0])).toMilliseconds(); this.word = frags[1]; this.count = 1; } @@ -185,9 +208,10 @@ public class SocketWindowWordCount { @Override public String toString() { - return String.format("%s at %d", + return String.format("%s at %ds seen %d times", this.word, - (this.timestamp / 1000)); // still display as secs + (this.timestamp / 1000), // still display as secs + this.count); } } }