diff --git a/src/main/scala/net/juliobiason/SideoutputSample.scala b/src/main/scala/net/juliobiason/SideoutputSample.scala index 278c980..77a3d0b 100644 --- a/src/main/scala/net/juliobiason/SideoutputSample.scala +++ b/src/main/scala/net/juliobiason/SideoutputSample.scala @@ -217,7 +217,7 @@ object SideouputSample { // Split each metric (based on their classes) on a different sideout, which // we'll plug a different sink. - pipeline + val result = pipeline .process(new ProcessFunction[Metric, Metric] { override def processElement( value:Metric, @@ -239,7 +239,7 @@ object SideouputSample { }) // collect all simple metrics - pipeline + result .getSideOutput(outputSimple) // the sink would, usually, be the JDBCOutputFormat here, but we // only want to print the results, so this will do, pig. @@ -249,7 +249,7 @@ object SideouputSample { } }) - pipeline + result .getSideOutput(outputComplex) .addSink(new SinkFunction[Metric] { def inkoke(value:Metric):Unit = {