Julio Biason
6 years ago
commit
eedc78715b
4 changed files with 306 additions and 0 deletions
@ -0,0 +1,4 @@
|
||||
[*.scala] |
||||
charset = utf-8 |
||||
indent_style = space |
||||
indent_size = 2 |
@ -0,0 +1,38 @@
|
||||
resolvers in ThisBuild ++= Seq("Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/", |
||||
"velvia maven" at "http://dl.bintray.com/velvia/maven", |
||||
Resolver.mavenLocal) |
||||
|
||||
name := "sideoutput-sample" |
||||
|
||||
version := "0.0.1" |
||||
|
||||
organization := "com.azion" |
||||
|
||||
scalaVersion in ThisBuild := "2.11.8" |
||||
|
||||
val flinkVersion = "1.4.2" |
||||
|
||||
val flinkDependencies = Seq( |
||||
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided", |
||||
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided") |
||||
|
||||
lazy val root = (project in file(".")). |
||||
settings( |
||||
libraryDependencies ++= flinkDependencies |
||||
) |
||||
|
||||
libraryDependencies ++= Seq( |
||||
"org.slf4j" % "slf4j-log4j12" % "1.7.25" |
||||
) |
||||
|
||||
mainClass in assembly := Some("com.azion.SideouputSample") |
||||
|
||||
// make run command include the provided dependencies |
||||
run in Compile := Defaults.runTask(fullClasspath in Compile, |
||||
mainClass in (Compile, run), |
||||
runner in (Compile,run) |
||||
).evaluated |
||||
|
||||
// exclude Scala library from assembly |
||||
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) |
||||
|
@ -0,0 +1 @@
|
||||
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5") |
@ -0,0 +1,263 @@
|
||||
package com.azion |
||||
|
||||
// Yes, you can import multiple elements from a module in Scala; |
||||
// I just prefer to use each on a single line to a) see how huge |
||||
// the import list is getting, which is usually a point that I should |
||||
// move things to another module and b) it makes really easy to |
||||
// sort the list in VIM. |
||||
import org.apache.flink.api.common.functions.FlatMapFunction |
||||
import org.apache.flink.api.common.functions.MapFunction |
||||
import org.apache.flink.api.common.functions.ReduceFunction |
||||
import org.apache.flink.streaming.api.TimeCharacteristic |
||||
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks |
||||
import org.apache.flink.streaming.api.functions.ProcessFunction |
||||
import org.apache.flink.streaming.api.functions.sink.SinkFunction |
||||
import org.apache.flink.streaming.api.functions.source.SourceFunction |
||||
import org.apache.flink.streaming.api.scala.DataStream |
||||
import org.apache.flink.streaming.api.scala.OutputTag |
||||
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment |
||||
import org.apache.flink.streaming.api.scala._ |
||||
import org.apache.flink.streaming.api.scala.function.WindowFunction |
||||
import org.apache.flink.streaming.api.watermark.Watermark |
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows |
||||
import org.apache.flink.streaming.api.windowing.time.Time |
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow |
||||
import org.apache.flink.util.Collector |
||||
|
||||
/** |
||||
* Basic trait (interface) for all metrics. |
||||
* |
||||
* Also notice that all metrics have static values; they are designed |
||||
* this way to be immutable, so all functions that change any value should |
||||
* actually return a new instance of the metric. |
||||
*/ |
||||
trait Metric { |
||||
val prefix:String |
||||
val name:String |
||||
val eventTime:Long |
||||
def add(another:Metric):Metric |
||||
def updateTime(newEventTime:Long):Metric |
||||
|
||||
def key():String = { |
||||
prefix + "-" + name |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* The simple metric have a single value and must be saved in a sink |
||||
* that accepts a single value. |
||||
*/ |
||||
case class SimpleMetric(override val name:String, override val eventTime:Long, value:Int) extends Metric { |
||||
override val prefix = "simple" |
||||
|
||||
override def add(another:Metric):Metric = { |
||||
val other:SimpleMetric = another.asInstanceOf[SimpleMetric] |
||||
println(s"Adding ${other} into ${this}") |
||||
new SimpleMetric(name, eventTime, value + other.value) |
||||
} |
||||
|
||||
override def updateTime(newEventTime:Long):Metric = { |
||||
println(s"Updating ${this} to have event time at ${newEventTime}") |
||||
new SimpleMetric(name, newEventTime, value) |
||||
} |
||||
|
||||
override def toString():String = { |
||||
s"Simple Metric of ${name} [${eventTime}] with value ${value}" |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* The complex metric have more than one value and, thus, should be saved |
||||
* in a different sink than the `SimpleMetric` |
||||
*/ |
||||
case class ComplexMetric(override val name:String, override val eventTime:Long, value1:Int, value2:Int, value3:Int) extends Metric { |
||||
override val prefix = "complex" |
||||
|
||||
override def add(another:Metric):Metric = { |
||||
val other:ComplexMetric = another.asInstanceOf[ComplexMetric] |
||||
println(s"Adding ${other} into ${this}") |
||||
new ComplexMetric( |
||||
name, |
||||
eventTime, |
||||
value1 + other.value1, |
||||
value2 + other.value2, |
||||
value3 + other.value3) |
||||
} |
||||
|
||||
override def updateTime(newEventTime:Long):Metric = { |
||||
println(s"Updating ${this} to have event time at ${newEventTime}") |
||||
new ComplexMetric(name, newEventTime, value1, value2, value3) |
||||
} |
||||
|
||||
override def toString():String = { |
||||
s"Complex Metric of ${name} [${eventTime}] with values ${value1}, ${value2}, ${value3}" |
||||
} |
||||
} |
||||
|
||||
object SideouputSample { |
||||
def main(args: Array[String]) { |
||||
val env = StreamExecutionEnvironment.getExecutionEnvironment |
||||
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) |
||||
env.setParallelism(2) // just random, we usually run with a higher parallelism |
||||
|
||||
// side output names |
||||
val outputSimple = OutputTag[Metric]("simple") |
||||
val outputComplex = OutputTag[Metric]("complex") |
||||
val outputLate = OutputTag[Metric]("late") |
||||
|
||||
// Source would be, usually, Kafka, but for this example, we'll use a function |
||||
// that generates a fixed set of elements. |
||||
val source = env.addSource(new SourceFunction[String]() { |
||||
def run(ctx:SourceFunction.SourceContext[String]) { |
||||
val data = List( |
||||
"2018-04-03T14:20:00+00:00\tevent1\t1\t2\t3", |
||||
"2018-04-03T14:20:10+00:00\tevent2\t1\t2\t3", |
||||
"2018-04-03T14:20:20+00:00\tevent1\t1\t2\t3", |
||||
"2018-04-03T14:21:00+00:00\tevent1\t1\t2\t3", |
||||
"2018-04-03T14:21:00+00:00\tevent2\t1\t2\t3", |
||||
"2018-04-03T14:21:00+00:00\tevent1\t1\t2\t3", |
||||
"2018-04-03T14:22:00+00:00\tevent2\t1\t2\t3", |
||||
"2018-04-03T14:22:00+00:00\tevent2\t1\t2\t3") |
||||
for (record <- data) { |
||||
println(s"Adding ${record} to be processed in the pipeline...") |
||||
ctx.collect(record) |
||||
} |
||||
} |
||||
|
||||
def cancel() {} |
||||
}) |
||||
|
||||
val pipeline = source |
||||
// convert lines to maps, to make them easier to extract data |
||||
// (in reality, our "data" is a bunch of records, so we explode |
||||
// the data here) |
||||
.flatMap(new FlatMapFunction[String, Map[String, String]] { |
||||
val fieldNames = List( |
||||
"time", |
||||
"eventName", |
||||
"importantValue", |
||||
"notSoImportantValue", |
||||
"reallyNotImportantValue") |
||||
override def flatMap(input:String, output:Collector[Map[String, String]]):Unit = { |
||||
val result = fieldNames.zip(input.split("\t")).toMap |
||||
println(s"Mapped event ${result}...") |
||||
output.collect(result) |
||||
} |
||||
}) |
||||
// from each line/map, create the necessary metrics (in this case, 2 metrics for each |
||||
// line) |
||||
.flatMap(new FlatMapFunction[Map[String, String], Metric] { |
||||
val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX") |
||||
|
||||
override def flatMap(input:Map[String, String], output:Collector[Metric]):Unit = { |
||||
val time = format.parse(input("time").asInstanceOf[String]).getTime |
||||
|
||||
val simpleMetric = new SimpleMetric( |
||||
input("eventName").asInstanceOf[String], |
||||
time, |
||||
input("importantValue").toInt) |
||||
println(s"Created ${simpleMetric}...") |
||||
val complexMetric = new ComplexMetric( |
||||
input("eventName").asInstanceOf[String], |
||||
time, |
||||
input("importantValue").toInt, |
||||
input("notSoImportantValue").toInt, |
||||
input("reallyNotImportantValue").toInt) |
||||
println(s"Created ${complexMetric}...") |
||||
output.collect(simpleMetric) |
||||
output.collect(complexMetric) |
||||
} |
||||
}) |
||||
// window assignment |
||||
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Metric] { |
||||
var currentMaxTimestamp: Long = 0 |
||||
|
||||
override def extractTimestamp(element:Metric, previousElementTimestamp:Long): Long = { |
||||
val eventTime = element.eventTime |
||||
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime) |
||||
eventTime |
||||
} |
||||
|
||||
override def getCurrentWatermark():Watermark = { |
||||
new Watermark(currentMaxTimestamp - 1000) // ms, so 1 second lag before firing |
||||
} |
||||
}) |
||||
// group things in windows |
||||
.keyBy(_.key) |
||||
.window(TumblingEventTimeWindows.of(Time.minutes(1))) |
||||
.allowedLateness(Time.seconds(30)) |
||||
.sideOutputLateData(outputLate) |
||||
// "compact" all the entries, by merging them together |
||||
.reduce( |
||||
new ReduceFunction[Metric] { |
||||
override def reduce(elem1:Metric, elem2:Metric):Metric = { |
||||
println(s"Reducing ${elem1} with ${elem2}") |
||||
elem1.add(elem2) |
||||
} |
||||
}, |
||||
// because this happens when the window closes, the start of |
||||
// the event is, actually, the time the window opened |
||||
// (this works because we're using Tumbling windows) |
||||
new WindowFunction[Metric, Metric, String, TimeWindow] { |
||||
def apply( |
||||
key:String, |
||||
window:TimeWindow, |
||||
elements:Iterable[Metric], |
||||
out:Collector[Metric] |
||||
):Unit = { |
||||
println(s"Grouping ${elements.toList.length} elements at ${window.getStart}") |
||||
|
||||
for (record <- elements) { |
||||
val updatedEvent = record.updateTime(window.getStart) |
||||
out.collect(updatedEvent) |
||||
} |
||||
} |
||||
} |
||||
) |
||||
|
||||
// Split each metric (based on their classes) on a different sideout, which |
||||
// we'll plug a different sink. |
||||
pipeline |
||||
.process(new ProcessFunction[Metric, Metric] { |
||||
override def processElement( |
||||
value:Metric, |
||||
ctx:ProcessFunction[Metric, Metric]#Context, |
||||
out:Collector[Metric] |
||||
):Unit = { |
||||
value match { |
||||
case record:SimpleMetric => { |
||||
println(s"Sending ${record} to ${outputSimple}") |
||||
ctx.output(outputSimple, record) |
||||
} |
||||
case record:ComplexMetric => { |
||||
println(s"Sending ${record} to ${outputComplex}") |
||||
ctx.output(outputComplex, record) |
||||
} |
||||
case record => println(s"Don't know how to handle ${record}") |
||||
} |
||||
} |
||||
}) |
||||
|
||||
// collect all simple metrics |
||||
pipeline |
||||
.getSideOutput(outputSimple) |
||||
// the sink would, usually, be the JDBCOutputFormat here, but we |
||||
// only want to print the results, so this will do, pig. |
||||
.addSink(new SinkFunction[Metric] { |
||||
def invoke(value:Metric):Unit = { |
||||
println(s"Got ${value} in the simple output") |
||||
} |
||||
}) |
||||
|
||||
pipeline |
||||
.getSideOutput(outputComplex) |
||||
.addSink(new SinkFunction[Metric] { |
||||
def inkoke(value:Metric):Unit = { |
||||
println(s"Got ${value} in the complex output") |
||||
} |
||||
}) |
||||
|
||||
// execute program |
||||
env.execute("Sample sideoutput") |
||||
} |
||||
} |
Loading…
Reference in new issue