|
|
|
package net.juliobiason
|
|
|
|
|
|
|
|
// Yes, you can import multiple elements from a module in Scala;
|
|
|
|
// I just prefer to use each on a single line to a) see how huge
|
|
|
|
// the import list is getting, which is usually a point that I should
|
|
|
|
// move things to another module and b) it makes really easy to
|
|
|
|
// sort the list in VIM.
|
|
|
|
import org.apache.flink.api.common.functions.FlatMapFunction
|
|
|
|
import org.apache.flink.api.common.functions.MapFunction
|
|
|
|
import org.apache.flink.api.common.functions.ReduceFunction
|
|
|
|
import org.apache.flink.streaming.api.TimeCharacteristic
|
|
|
|
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
|
|
|
|
import org.apache.flink.streaming.api.functions.ProcessFunction
|
|
|
|
import org.apache.flink.streaming.api.functions.sink.SinkFunction
|
|
|
|
import org.apache.flink.streaming.api.functions.source.SourceFunction
|
|
|
|
import org.apache.flink.streaming.api.scala.DataStream
|
|
|
|
import org.apache.flink.streaming.api.scala.OutputTag
|
|
|
|
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
|
|
|
|
import org.apache.flink.streaming.api.scala._
|
|
|
|
import org.apache.flink.streaming.api.scala.function.WindowFunction
|
|
|
|
import org.apache.flink.streaming.api.watermark.Watermark
|
|
|
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
|
|
|
|
import org.apache.flink.streaming.api.windowing.time.Time
|
|
|
|
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
|
|
|
|
import org.apache.flink.util.Collector
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Basic trait (interface) for all metrics.
|
|
|
|
*
|
|
|
|
* Also notice that all metrics have static values; they are designed
|
|
|
|
* this way to be immutable, so all functions that change any value should
|
|
|
|
* actually return a new instance of the metric.
|
|
|
|
*/
|
|
|
|
trait Metric {
|
|
|
|
val prefix:String
|
|
|
|
val name:String
|
|
|
|
val eventTime:Long
|
|
|
|
def add(another:Metric):Metric
|
|
|
|
def updateTime(newEventTime:Long):Metric
|
|
|
|
|
|
|
|
def key():String = {
|
|
|
|
prefix + "-" + name
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* The simple metric have a single value and must be saved in a sink
|
|
|
|
* that accepts a single value.
|
|
|
|
*/
|
|
|
|
case class SimpleMetric(override val name:String, override val eventTime:Long, value:Int) extends Metric {
|
|
|
|
override val prefix = "simple"
|
|
|
|
|
|
|
|
override def add(another:Metric):Metric = {
|
|
|
|
val other:SimpleMetric = another.asInstanceOf[SimpleMetric]
|
|
|
|
println(s"Adding ${other} into ${this}")
|
|
|
|
new SimpleMetric(name, eventTime, value + other.value)
|
|
|
|
}
|
|
|
|
|
|
|
|
override def updateTime(newEventTime:Long):Metric = {
|
|
|
|
println(s"Updating ${this} to have event time at ${newEventTime}")
|
|
|
|
new SimpleMetric(name, newEventTime, value)
|
|
|
|
}
|
|
|
|
|
|
|
|
override def toString():String = {
|
|
|
|
s"Simple Metric of ${name} [${eventTime}] with value ${value}"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* The complex metric have more than one value and, thus, should be saved
|
|
|
|
* in a different sink than the `SimpleMetric`
|
|
|
|
*/
|
|
|
|
case class ComplexMetric(override val name:String, override val eventTime:Long, value1:Int, value2:Int, value3:Int) extends Metric {
|
|
|
|
override val prefix = "complex"
|
|
|
|
|
|
|
|
override def add(another:Metric):Metric = {
|
|
|
|
val other:ComplexMetric = another.asInstanceOf[ComplexMetric]
|
|
|
|
println(s"Adding ${other} into ${this}")
|
|
|
|
new ComplexMetric(
|
|
|
|
name,
|
|
|
|
eventTime,
|
|
|
|
value1 + other.value1,
|
|
|
|
value2 + other.value2,
|
|
|
|
value3 + other.value3)
|
|
|
|
}
|
|
|
|
|
|
|
|
override def updateTime(newEventTime:Long):Metric = {
|
|
|
|
println(s"Updating ${this} to have event time at ${newEventTime}")
|
|
|
|
new ComplexMetric(name, newEventTime, value1, value2, value3)
|
|
|
|
}
|
|
|
|
|
|
|
|
override def toString():String = {
|
|
|
|
s"Complex Metric of ${name} [${eventTime}] with values ${value1}, ${value2}, ${value3}"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
object SideouputSample {
|
|
|
|
def main(args: Array[String]) {
|
|
|
|
val env = StreamExecutionEnvironment.getExecutionEnvironment
|
|
|
|
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
|
|
|
|
env.setParallelism(2) // just random, we usually run with a higher parallelism
|
|
|
|
|
|
|
|
// side output names
|
|
|
|
val outputSimple = OutputTag[Metric]("simple")
|
|
|
|
val outputComplex = OutputTag[Metric]("complex")
|
|
|
|
val outputLate = OutputTag[Metric]("late")
|
|
|
|
|
|
|
|
// Source would be, usually, Kafka, but for this example, we'll use a function
|
|
|
|
// that generates a fixed set of elements.
|
|
|
|
val source = env.addSource(new SourceFunction[String]() {
|
|
|
|
def run(ctx:SourceFunction.SourceContext[String]) {
|
|
|
|
val data = List(
|
|
|
|
"2018-04-03T14:20:00+00:00\tevent1\t1\t2\t3",
|
|
|
|
"2018-04-03T14:20:10+00:00\tevent2\t1\t2\t3",
|
|
|
|
"2018-04-03T14:20:20+00:00\tevent1\t1\t2\t3",
|
|
|
|
"2018-04-03T14:21:00+00:00\tevent1\t1\t2\t3",
|
|
|
|
"2018-04-03T14:21:00+00:00\tevent2\t1\t2\t3",
|
|
|
|
"2018-04-03T14:21:00+00:00\tevent1\t1\t2\t3",
|
|
|
|
"2018-04-03T14:22:00+00:00\tevent2\t1\t2\t3",
|
|
|
|
"2018-04-03T14:22:00+00:00\tevent2\t1\t2\t3")
|
|
|
|
for (record <- data) {
|
|
|
|
println(s"Adding ${record} to be processed in the pipeline...")
|
|
|
|
ctx.collect(record)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
def cancel() {}
|
|
|
|
})
|
|
|
|
|
|
|
|
val pipeline = source
|
|
|
|
// convert lines to maps, to make them easier to extract data
|
|
|
|
// (in reality, our "data" is a bunch of records, so we explode
|
|
|
|
// the data here)
|
|
|
|
.flatMap(new FlatMapFunction[String, Map[String, String]] {
|
|
|
|
val fieldNames = List(
|
|
|
|
"time",
|
|
|
|
"eventName",
|
|
|
|
"importantValue",
|
|
|
|
"notSoImportantValue",
|
|
|
|
"reallyNotImportantValue")
|
|
|
|
override def flatMap(input:String, output:Collector[Map[String, String]]):Unit = {
|
|
|
|
val result = fieldNames.zip(input.split("\t")).toMap
|
|
|
|
println(s"Mapped event ${result}...")
|
|
|
|
output.collect(result)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
// from each line/map, create the necessary metrics (in this case, 2 metrics for each
|
|
|
|
// line)
|
|
|
|
.flatMap(new FlatMapFunction[Map[String, String], Metric] {
|
|
|
|
val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX")
|
|
|
|
|
|
|
|
override def flatMap(input:Map[String, String], output:Collector[Metric]):Unit = {
|
|
|
|
val time = format.parse(input("time").asInstanceOf[String]).getTime
|
|
|
|
|
|
|
|
val simpleMetric = new SimpleMetric(
|
|
|
|
input("eventName").asInstanceOf[String],
|
|
|
|
time,
|
|
|
|
input("importantValue").toInt)
|
|
|
|
println(s"Created ${simpleMetric}...")
|
|
|
|
val complexMetric = new ComplexMetric(
|
|
|
|
input("eventName").asInstanceOf[String],
|
|
|
|
time,
|
|
|
|
input("importantValue").toInt,
|
|
|
|
input("notSoImportantValue").toInt,
|
|
|
|
input("reallyNotImportantValue").toInt)
|
|
|
|
println(s"Created ${complexMetric}...")
|
|
|
|
output.collect(simpleMetric)
|
|
|
|
output.collect(complexMetric)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
// window assignment
|
|
|
|
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Metric] {
|
|
|
|
var currentMaxTimestamp: Long = 0
|
|
|
|
|
|
|
|
override def extractTimestamp(element:Metric, previousElementTimestamp:Long): Long = {
|
|
|
|
val eventTime = element.eventTime
|
|
|
|
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime)
|
|
|
|
eventTime
|
|
|
|
}
|
|
|
|
|
|
|
|
override def getCurrentWatermark():Watermark = {
|
|
|
|
new Watermark(currentMaxTimestamp - 1000) // ms, so 1 second lag before firing
|
|
|
|
}
|
|
|
|
})
|
|
|
|
// group things in windows
|
|
|
|
.keyBy(_.key)
|
|
|
|
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
|
|
|
|
.allowedLateness(Time.seconds(30))
|
|
|
|
.sideOutputLateData(outputLate)
|
|
|
|
// "compact" all the entries, by merging them together
|
|
|
|
.reduce(
|
|
|
|
new ReduceFunction[Metric] {
|
|
|
|
override def reduce(elem1:Metric, elem2:Metric):Metric = {
|
|
|
|
println(s"Reducing ${elem1} with ${elem2}")
|
|
|
|
elem1.add(elem2)
|
|
|
|
}
|
|
|
|
},
|
|
|
|
// because this happens when the window closes, the start of
|
|
|
|
// the event is, actually, the time the window opened
|
|
|
|
// (this works because we're using Tumbling windows)
|
|
|
|
new WindowFunction[Metric, Metric, String, TimeWindow] {
|
|
|
|
def apply(
|
|
|
|
key:String,
|
|
|
|
window:TimeWindow,
|
|
|
|
elements:Iterable[Metric],
|
|
|
|
out:Collector[Metric]
|
|
|
|
):Unit = {
|
|
|
|
println(s"Grouping ${elements.toList.length} elements at ${window.getStart}")
|
|
|
|
|
|
|
|
for (record <- elements) {
|
|
|
|
val updatedEvent = record.updateTime(window.getStart)
|
|
|
|
out.collect(updatedEvent)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// Split each metric (based on their classes) on a different sideout, which
|
|
|
|
// we'll plug a different sink.
|
|
|
|
val result = pipeline
|
|
|
|
.process(new ProcessFunction[Metric, Metric] {
|
|
|
|
override def processElement(
|
|
|
|
value:Metric,
|
|
|
|
ctx:ProcessFunction[Metric, Metric]#Context,
|
|
|
|
out:Collector[Metric]
|
|
|
|
):Unit = {
|
|
|
|
value match {
|
|
|
|
case record:SimpleMetric => {
|
|
|
|
println(s"Sending ${record} to ${outputSimple}")
|
|
|
|
ctx.output(outputSimple, record)
|
|
|
|
}
|
|
|
|
case record:ComplexMetric => {
|
|
|
|
println(s"Sending ${record} to ${outputComplex}")
|
|
|
|
ctx.output(outputComplex, record)
|
|
|
|
}
|
|
|
|
case record => println(s"Don't know how to handle ${record}")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
// collect all simple metrics
|
|
|
|
result
|
|
|
|
.getSideOutput(outputSimple)
|
|
|
|
// the sink would, usually, be the JDBCOutputFormat here, but we
|
|
|
|
// only want to print the results, so this will do, pig.
|
|
|
|
.addSink(new SinkFunction[Metric] {
|
|
|
|
def invoke(value:Metric):Unit = {
|
|
|
|
println(s"Got ${value} in the simple output")
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
result
|
|
|
|
.getSideOutput(outputComplex)
|
|
|
|
.addSink(new SinkFunction[Metric] {
|
|
|
|
def inkoke(value:Metric):Unit = {
|
|
|
|
println(s"Got ${value} in the complex output")
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
// execute program
|
|
|
|
env.execute("Sample sideoutput")
|
|
|
|
}
|
|
|
|
}
|