Browse Source

fixed sink, but watermark is weird

master
Julio Biason 6 years ago
parent
commit
e4cd550ba3
  1. 78
      src/main/java/net/juliobiason/flink/SocketWindowWordCount.java

78
src/main/java/net/juliobiason/flink/SocketWindowWordCount.java

@ -48,7 +48,6 @@ import org.apache.flink.util.Collector;
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class SocketWindowWordCount { public class SocketWindowWordCount {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// the host and the port to connect to // the host and the port to connect to
@ -80,25 +79,33 @@ public class SocketWindowWordCount {
.flatMap(new FlatMapFunction<String, WordEvent>() { .flatMap(new FlatMapFunction<String, WordEvent>() {
@Override @Override
public void flatMap(String input, Collector<WordEvent> output) { public void flatMap(String input, Collector<WordEvent> output) {
WordEvent event = new WordEvent(input); try {
System.out.println("CREATE\t " + event); WordEvent event = new WordEvent(input);
output.collect(event); displayStep("CREATE", event.toString());
output.collect(event);
} catch (Exception exc) {
displayStep("ERROR", "unparseable data: " + input);
}
} }
}) })
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<WordEvent>() { .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<WordEvent>() {
private long currentMaxTimestamp = 0; private long currentMaxTimestamp = 0;
private long watermarkTime = 0;
@Override @Override
public final long extractTimestamp(WordEvent element, long previousElementTimestamp) { public final long extractTimestamp(WordEvent element, long previousElementTimestamp) {
long eventTimestamp = element.getTimestamp(); long eventTimestamp = element.getTimestamp();
currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp); if (eventTimestamp > currentMaxTimestamp) {
System.out.println("MAXTIMESTAMP\t " + currentMaxTimestamp); currentMaxTimestamp = eventTimestamp;
watermarkTime = currentMaxTimestamp - Time.seconds(10).toMilliseconds();
displayStep("TIMESTAMP", "moved to " + (watermarkTime / 1000));
}
return eventTimestamp; return eventTimestamp;
} }
@Override @Override
public final Watermark getCurrentWatermark() { public final Watermark getCurrentWatermark() {
long watermarkTime = currentMaxTimestamp - 10;
// System.out.println("WATERMARK\t " + watermarkTime);
return new Watermark(watermarkTime); return new Watermark(watermarkTime);
} }
}) })
@ -109,45 +116,61 @@ public class SocketWindowWordCount {
new ReduceFunction<WordEvent>() { new ReduceFunction<WordEvent>() {
public WordEvent reduce(WordEvent element1, WordEvent element2) { public WordEvent reduce(WordEvent element1, WordEvent element2) {
long total = element1.getCount() + element2.getCount(); long total = element1.getCount() + element2.getCount();
System.out.print("REDUCE\t " + element1 + " + " + element2 + " = " + total + "\n"); WordEvent result = new WordEvent(element1.getTimestamp(),
return new WordEvent(element1.getTimestamp(),
element1.getWord(), element1.getWord(),
total); total);
displayStep("REDUCE", element1 + " + " + element2 + ", now " + result);
return result;
} }
}, },
new ProcessWindowFunction<WordEvent, WordEvent, String, TimeWindow>() { new ProcessWindowFunction<WordEvent, WordEvent, String, TimeWindow>() {
public void process(String key, Context context, Iterable<WordEvent> values, Collector<WordEvent> out) { public void process(String key, Context context, Iterable<WordEvent> values, Collector<WordEvent> out) {
TimeWindow window = context.window(); TimeWindow window = context.window();
for (WordEvent word: values) { for (WordEvent word: values) {
System.out.println("MOVE\t " + word + " to " + window.getStart()); WordEvent result = new WordEvent(window.getStart(),
out.collect(new WordEvent(window.getStart(),
word.getWord(), word.getWord(),
word.getCount())); word.getCount());
displayStep("MOVE", word + " to " + window.getStart() + ", now " + result);
out.collect(result);
} }
} }
} }
); )
.flatMap(new FlatMapFunction<WordEvent, WordEvent>() {
@Override
public void flatMap(WordEvent input, Collector<WordEvent> output) {
displayStep("DONE", "-----");
output.collect(input);
}
});
mainStream. mainStream
.flatMap(new FlatMapFunction<WordEvent, WordEvent>() { .flatMap(new FlatMapFunction<WordEvent, WordEvent>() {
@Override @Override
public void flatMap(WordEvent input, Collector<WordEvent> output) { public void flatMap(WordEvent input, Collector<WordEvent> output) {
output.collect(input); output.collect(input);
} }
})
.addSink(new SinkFunction<WordEvent>() {
@Override
public synchronized void invoke(
WordEvent word,
org.apache.flink.streaming.api.functions.sink.SinkFunction.Context ctx)
throws Exception {
displayStep("SINK", word.toString());
}
}); });
// .addSink(new SinkFunction<WordEvent>() {
// @Override
// public synchronized void invoke(
// WordEvent word,
// org.apache.flink.streaming.api.functions.sink.SinkFunction.Context ctx)
// throws Exception {
// System.out.println("SINK\t " + word);
// }
// });
env.execute("Socket Window WordCount"); env.execute("Socket Window WordCount");
} }
private static void displayStep(String eventType, String eventMessage) {
System.out.println(String.format("%-30s %s",
eventType,
eventMessage));
}
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
/** /**
@ -160,7 +183,7 @@ public class SocketWindowWordCount {
public WordEvent(String input) { public WordEvent(String input) {
String[] frags = input.split(" ", 2); String[] frags = input.split(" ", 2);
this.timestamp = Integer.parseInt(frags[0]) * 1000; // must be millis this.timestamp = Time.seconds(Integer.parseInt(frags[0])).toMilliseconds();
this.word = frags[1]; this.word = frags[1];
this.count = 1; this.count = 1;
} }
@ -185,9 +208,10 @@ public class SocketWindowWordCount {
@Override @Override
public String toString() { public String toString() {
return String.format("%s at %d", return String.format("%s at %ds seen %d times",
this.word, this.word,
(this.timestamp / 1000)); // still display as secs (this.timestamp / 1000), // still display as secs
this.count);
} }
} }
} }

Loading…
Cancel
Save