Browse Source

almost working, sink still fails

master
Julio Biason 6 years ago
parent
commit
8109ca7ff2
  1. 140
      src/main/java/net/juliobiason/flink/SocketWindowWordCount.java

140
src/main/java/net/juliobiason/flink/SocketWindowWordCount.java

@ -20,10 +20,19 @@ package net.juliobiason.flink;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
/** /**
@ -57,37 +66,84 @@ public class SocketWindowWordCount {
"type the input text into the command line"); "type the input text into the command line");
return; return;
} }
System.out.println("Connecting to " + hostname);
System.out.println("And port " + port);
// get the execution environment // get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n"); DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts DataStream<WordEvent> mainStream = text
DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordEvent>() {
@Override
.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String input, Collector<WordEvent> output) {
@Override WordEvent event = new WordEvent(input);
public void flatMap(String value, Collector<WordWithCount> out) { System.out.println("CREATE\t " + event);
for (String word : value.split("\\s")) { output.collect(event);
out.collect(new WordWithCount(word, 1L)); }
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<WordEvent>() {
private long currentMaxTimestamp = 0;
@Override
public final long extractTimestamp(WordEvent element, long previousElementTimestamp) {
long eventTimestamp = element.getTimestamp();
currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp);
System.out.println("MAXTIMESTAMP\t " + currentMaxTimestamp);
return eventTimestamp;
}
@Override
public final Watermark getCurrentWatermark() {
long watermarkTime = currentMaxTimestamp - 10;
// System.out.println("WATERMARK\t " + watermarkTime);
return new Watermark(watermarkTime);
}
})
.keyBy(record -> record.getWord())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.allowedLateness(Time.seconds(90))
.reduce(
new ReduceFunction<WordEvent>() {
public WordEvent reduce(WordEvent element1, WordEvent element2) {
long total = element1.getCount() + element2.getCount();
System.out.print("REDUCE\t " + element1 + " + " + element2 + " = " + total + "\n");
return new WordEvent(element1.getTimestamp(),
element1.getWord(),
total);
}
},
new ProcessWindowFunction<WordEvent, WordEvent, String, TimeWindow>() {
public void process(String key, Context context, Iterable<WordEvent> values, Collector<WordEvent> out) {
TimeWindow window = context.window();
for (WordEvent word: values) {
System.out.println("MOVE\t " + word + " to " + window.getStart());
out.collect(new WordEvent(window.getStart(),
word.getWord(),
word.getCount()));
}
} }
} }
}) );
.keyBy("word") mainStream.
.timeWindow(Time.seconds(5)) .flatMap(new FlatMapFunction<WordEvent, WordEvent>() {
@Override
.reduce(new ReduceFunction<WordWithCount>() { public void flatMap(WordEvent input, Collector<WordEvent> output) {
@Override output.collect(input);
public WordWithCount reduce(WordWithCount a, WordWithCount b) { }
return new WordWithCount(a.word, a.count + b.count); });
} // .addSink(new SinkFunction<WordEvent>() {
}); // @Override
// public synchronized void invoke(
// print the results with a single thread, rather than in parallel // WordEvent word,
windowCounts.print().setParallelism(1); // org.apache.flink.streaming.api.functions.sink.SinkFunction.Context ctx)
// throws Exception {
// System.out.println("SINK\t " + word);
// }
// });
env.execute("Socket Window WordCount"); env.execute("Socket Window WordCount");
} }
@ -95,23 +151,43 @@ public class SocketWindowWordCount {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
/** /**
* Data type for words with count. * The event of a word.
*/ */
public static class WordWithCount { public static class WordEvent {
private long timestamp;
private String word;
private long count;
public WordEvent(String input) {
String[] frags = input.split(" ", 2);
this.timestamp = Integer.parseInt(frags[0]) * 1000; // must be millis
this.word = frags[1];
this.count = 1;
}
public WordEvent(long timestamp, String word, long count) {
this.timestamp = timestamp;
this.word = word;
this.count = count;
}
public String word; public long getTimestamp() {
public long count; return this.timestamp;
}
public WordWithCount() {} public String getWord() {
return this.word;
}
public WordWithCount(String word, long count) { public long getCount() {
this.word = word; return this.count;
this.count = count;
} }
@Override @Override
public String toString() { public String toString() {
return word + " : " + count; return String.format("%s at %d",
this.word,
(this.timestamp / 1000)); // still display as secs
} }
} }
} }

Loading…
Cancel
Save