From d5a12ec2dd0a3298ee78a720e60f01923c1f3765 Mon Sep 17 00:00:00 2001 From: Julio Biason Date: Fri, 20 Apr 2018 13:42:31 -0300 Subject: [PATCH] solved the side output problem --- src/main/scala/net/juliobiason/SideoutputSample.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/net/juliobiason/SideoutputSample.scala b/src/main/scala/net/juliobiason/SideoutputSample.scala index 278c980..77a3d0b 100644 --- a/src/main/scala/net/juliobiason/SideoutputSample.scala +++ b/src/main/scala/net/juliobiason/SideoutputSample.scala @@ -217,7 +217,7 @@ object SideouputSample { // Split each metric (based on their classes) on a different sideout, which // we'll plug a different sink. - pipeline + val result = pipeline .process(new ProcessFunction[Metric, Metric] { override def processElement( value:Metric, @@ -239,7 +239,7 @@ object SideouputSample { }) // collect all simple metrics - pipeline + result .getSideOutput(outputSimple) // the sink would, usually, be the JDBCOutputFormat here, but we // only want to print the results, so this will do, pig. @@ -249,7 +249,7 @@ object SideouputSample { } }) - pipeline + result .getSideOutput(outputComplex) .addSink(new SinkFunction[Metric] { def inkoke(value:Metric):Unit = {