|
|
@ -1,21 +1,3 @@ |
|
|
|
/* |
|
|
|
|
|
|
|
* Licensed to the Apache Software Foundation (ASF) under one |
|
|
|
|
|
|
|
* or more contributor license agreements. See the NOTICE file |
|
|
|
|
|
|
|
* distributed with this work for additional information |
|
|
|
|
|
|
|
* regarding copyright ownership. The ASF licenses this file |
|
|
|
|
|
|
|
* to you under the Apache License, Version 2.0 (the |
|
|
|
|
|
|
|
* "License"); you may not use this file except in compliance |
|
|
|
|
|
|
|
* with the License. You may obtain a copy of the License at |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* Unless required by applicable law or agreed to in writing, software |
|
|
|
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
|
|
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
|
|
|
|
|
|
* See the License for the specific language governing permissions and |
|
|
|
|
|
|
|
* limitations under the License. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
package net.juliobiason.flink; |
|
|
|
package net.juliobiason.flink; |
|
|
|
|
|
|
|
|
|
|
|
import org.apache.flink.api.common.functions.FlatMapFunction; |
|
|
|
import org.apache.flink.api.common.functions.FlatMapFunction; |
|
|
@ -26,6 +8,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic; |
|
|
|
import org.apache.flink.streaming.api.datastream.DataStream; |
|
|
|
import org.apache.flink.streaming.api.datastream.DataStream; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; |
|
|
|
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; |
|
|
|
|
|
|
|
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; |
|
|
|
import org.apache.flink.streaming.api.functions.sink.SinkFunction; |
|
|
|
import org.apache.flink.streaming.api.functions.sink.SinkFunction; |
|
|
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context; |
|
|
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context; |
|
|
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; |
|
|
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; |
|
|
@ -73,9 +56,14 @@ public class SocketWindowWordCount { |
|
|
|
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
|
|
|
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
|
|
|
env.setParallelism(1); |
|
|
|
env.setParallelism(1); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Add the source for the datastream; in this case, the source is a socket.
|
|
|
|
DataStream<String> text = env.socketTextStream(hostname, port, "\n"); |
|
|
|
DataStream<String> text = env.socketTextStream(hostname, port, "\n"); |
|
|
|
|
|
|
|
|
|
|
|
DataStream<WordEvent> mainStream = text |
|
|
|
DataStream<WordEvent> mainStream = text |
|
|
|
|
|
|
|
// This is the first transformation we do: we receive a String from the socket,
|
|
|
|
|
|
|
|
// but we need a more structured data to deal with it, so we convert it to a
|
|
|
|
|
|
|
|
// WordEvent object. From this point on, the stream will see WordEvents, not
|
|
|
|
|
|
|
|
// Strings (we could convert WordEvent to another object, if needed, though).
|
|
|
|
.flatMap(new FlatMapFunction<String, WordEvent>() { |
|
|
|
.flatMap(new FlatMapFunction<String, WordEvent>() { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void flatMap(String input, Collector<WordEvent> output) { |
|
|
|
public void flatMap(String input, Collector<WordEvent> output) { |
|
|
@ -89,30 +77,74 @@ public class SocketWindowWordCount { |
|
|
|
} |
|
|
|
} |
|
|
|
}) |
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<WordEvent>() { |
|
|
|
// This is the start of the windowing grouping: We need to define a class
|
|
|
|
|
|
|
|
// that Flink will call to know how to extract the timestamp of some element
|
|
|
|
|
|
|
|
// (in this case, a WordEvent, 'cause that's the transformation exactly
|
|
|
|
|
|
|
|
// before the windowing); along with it, it will also ask what's the current
|
|
|
|
|
|
|
|
// watermark.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// One thing to keep in mind: All timestamps and watermarks must be in milliseconds.
|
|
|
|
|
|
|
|
// So, even if the user input is in seconds, internally we are keeping everything
|
|
|
|
|
|
|
|
// in millis -- and that's why you'll see "/1000" around this code.
|
|
|
|
|
|
|
|
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<WordEvent>() { |
|
|
|
private long currentMaxTimestamp = 0; |
|
|
|
private long currentMaxTimestamp = 0; |
|
|
|
private long watermarkTime = 0; |
|
|
|
private long watermarkTime = 0; |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public final long extractTimestamp(WordEvent element, long previousElementTimestamp) { |
|
|
|
public final long extractTimestamp(WordEvent element, long previousElementTimestamp) { |
|
|
|
long eventTimestamp = element.getTimestamp(); |
|
|
|
long eventTimestamp = element.getTimestamp(); |
|
|
|
if (eventTimestamp > currentMaxTimestamp) { |
|
|
|
|
|
|
|
currentMaxTimestamp = eventTimestamp; |
|
|
|
|
|
|
|
watermarkTime = currentMaxTimestamp - Time.seconds(10).toMilliseconds(); |
|
|
|
|
|
|
|
displayStep("TIMESTAMP", "moved to " + (watermarkTime / 1000)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return eventTimestamp; |
|
|
|
return eventTimestamp; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public final Watermark getCurrentWatermark() { |
|
|
|
public Watermark checkAndGetNextWatermark(WordEvent word, long extractedTimestamp) { |
|
|
|
return new Watermark(watermarkTime); |
|
|
|
Watermark result = null; |
|
|
|
|
|
|
|
if (extractedTimestamp > currentMaxTimestamp) { |
|
|
|
|
|
|
|
currentMaxTimestamp = extractedTimestamp; |
|
|
|
|
|
|
|
watermarkTime = currentMaxTimestamp - Time.seconds(10).toMilliseconds(); |
|
|
|
|
|
|
|
displayStep("TIMESTAMP", "moved to " + (watermarkTime / 1000)); |
|
|
|
|
|
|
|
result = new Watermark(watermarkTime); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
|
}) |
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// How elements appearing the the pipeline will be identified?
|
|
|
|
|
|
|
|
// Because we want to group words by... well... words, we need to point
|
|
|
|
|
|
|
|
// the grouping value.
|
|
|
|
.keyBy(record -> record.getWord()) |
|
|
|
.keyBy(record -> record.getWord()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Defining the window size: Remember, each element that Flink sees,
|
|
|
|
|
|
|
|
// it will call `extractedTimestamp` from the watermark object above;
|
|
|
|
|
|
|
|
// with it, it will assign a Window for it. A tumbling window means
|
|
|
|
|
|
|
|
// a Window that has a fixed point in time, so every 30 seconds (based
|
|
|
|
|
|
|
|
// on the event timestamp) there will be a new window.
|
|
|
|
.window(TumblingEventTimeWindows.of(Time.seconds(30))) |
|
|
|
.window(TumblingEventTimeWindows.of(Time.seconds(30))) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// By default, Flink keeps a single window (of the time above); with
|
|
|
|
|
|
|
|
// `allowedLateness` you can say "keep this much time behind the window
|
|
|
|
|
|
|
|
// in memory, just in case something behind the current window appears."
|
|
|
|
.allowedLateness(Time.seconds(90)) |
|
|
|
.allowedLateness(Time.seconds(90)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Reducing elements means that instead of keeping every single element
|
|
|
|
|
|
|
|
// that appears in the window, we'll pick them and generate a single one;
|
|
|
|
|
|
|
|
// if it is the first element of that key in the window, the element is
|
|
|
|
|
|
|
|
// kept as is; if there is already an element there and there is a new
|
|
|
|
|
|
|
|
// one coming, both are reduced to a new element.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// Something like this (imagine this is the content of the window):
|
|
|
|
|
|
|
|
// Current Element | incoming | result
|
|
|
|
|
|
|
|
// None | 1 | 1
|
|
|
|
|
|
|
|
// 1 | 4 | 5
|
|
|
|
|
|
|
|
// 5 | 2 | 7
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// Instead of keeping "1", "4" and "2" in the window, we keep a single
|
|
|
|
|
|
|
|
// element in it (which is the sum of the elements seen, although
|
|
|
|
|
|
|
|
// we could reduce in any way).
|
|
|
|
.reduce( |
|
|
|
.reduce( |
|
|
|
|
|
|
|
// This is the reduce function; in this case, we are aggregating
|
|
|
|
|
|
|
|
// elements by adding their count, but keeping everything exactly
|
|
|
|
|
|
|
|
// as the first element seen.
|
|
|
|
new ReduceFunction<WordEvent>() { |
|
|
|
new ReduceFunction<WordEvent>() { |
|
|
|
public WordEvent reduce(WordEvent element1, WordEvent element2) { |
|
|
|
public WordEvent reduce(WordEvent element1, WordEvent element2) { |
|
|
|
long total = element1.getCount() + element2.getCount(); |
|
|
|
long total = element1.getCount() + element2.getCount(); |
|
|
@ -124,6 +156,10 @@ public class SocketWindowWordCount { |
|
|
|
return result; |
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
}, |
|
|
|
|
|
|
|
// Reduce allows a second object, which is run when the element is
|
|
|
|
|
|
|
|
// "ejected" (fired, in stream processing terms) out of the window;
|
|
|
|
|
|
|
|
// in this case, we change the event timestamp to the start of the
|
|
|
|
|
|
|
|
// window.
|
|
|
|
new ProcessWindowFunction<WordEvent, WordEvent, String, TimeWindow>() { |
|
|
|
new ProcessWindowFunction<WordEvent, WordEvent, String, TimeWindow>() { |
|
|
|
public void process(String key, Context context, Iterable<WordEvent> values, Collector<WordEvent> out) { |
|
|
|
public void process(String key, Context context, Iterable<WordEvent> values, Collector<WordEvent> out) { |
|
|
|
TimeWindow window = context.window(); |
|
|
|
TimeWindow window = context.window(); |
|
|
@ -131,27 +167,28 @@ public class SocketWindowWordCount { |
|
|
|
WordEvent result = new WordEvent(window.getStart(), |
|
|
|
WordEvent result = new WordEvent(window.getStart(), |
|
|
|
word.getWord(), |
|
|
|
word.getWord(), |
|
|
|
word.getCount()); |
|
|
|
word.getCount()); |
|
|
|
displayStep("MOVE", word + " to " + window.getStart() + ", now " + result); |
|
|
|
displayStep("MOVE", word + " to " + (window.getStart() / 1000) + ", now " + result); |
|
|
|
out.collect(result); |
|
|
|
out.collect(result); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
) |
|
|
|
); |
|
|
|
.flatMap(new FlatMapFunction<WordEvent, WordEvent>() { |
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public void flatMap(WordEvent input, Collector<WordEvent> output) { |
|
|
|
|
|
|
|
displayStep("DONE", "-----"); |
|
|
|
|
|
|
|
output.collect(input); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mainStream |
|
|
|
mainStream |
|
|
|
|
|
|
|
// This is not really necessary, but Flink can't plug a sink directly
|
|
|
|
|
|
|
|
// into the Window result; so we have a function that returns the
|
|
|
|
|
|
|
|
// element without any modifications and the sink can use it.
|
|
|
|
.flatMap(new FlatMapFunction<WordEvent, WordEvent>() { |
|
|
|
.flatMap(new FlatMapFunction<WordEvent, WordEvent>() { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void flatMap(WordEvent input, Collector<WordEvent> output) { |
|
|
|
public void flatMap(WordEvent input, Collector<WordEvent> output) { |
|
|
|
output.collect(input); |
|
|
|
output.collect(input); |
|
|
|
} |
|
|
|
} |
|
|
|
}) |
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// The "sink" is where any element ejected from the window (fired)
|
|
|
|
|
|
|
|
// meets its permanent storage; in this example, we only display it,
|
|
|
|
|
|
|
|
// but we could save it to a number of storages (like ElasticSearch,
|
|
|
|
|
|
|
|
// DB, etc).
|
|
|
|
.addSink(new SinkFunction<WordEvent>() { |
|
|
|
.addSink(new SinkFunction<WordEvent>() { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public synchronized void invoke( |
|
|
|
public synchronized void invoke( |
|
|
@ -162,11 +199,12 @@ public class SocketWindowWordCount { |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// And finally, starts everything.
|
|
|
|
env.execute("Socket Window WordCount"); |
|
|
|
env.execute("Socket Window WordCount"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static void displayStep(String eventType, String eventMessage) { |
|
|
|
private static void displayStep(String eventType, String eventMessage) { |
|
|
|
System.out.println(String.format("%-30s %s", |
|
|
|
System.out.println(String.format("%-25s - %s", |
|
|
|
eventType, |
|
|
|
eventType, |
|
|
|
eventMessage)); |
|
|
|
eventMessage)); |
|
|
|
} |
|
|
|
} |
|
|
|