Browse Source

solved the side output problem

master
Julio Biason 6 years ago
parent
commit
d5a12ec2dd
  1. 6
      src/main/scala/net/juliobiason/SideoutputSample.scala

6
src/main/scala/net/juliobiason/SideoutputSample.scala

@ -217,7 +217,7 @@ object SideouputSample {
// Split each metric (based on their classes) on a different sideout, which // Split each metric (based on their classes) on a different sideout, which
// we'll plug a different sink. // we'll plug a different sink.
pipeline val result = pipeline
.process(new ProcessFunction[Metric, Metric] { .process(new ProcessFunction[Metric, Metric] {
override def processElement( override def processElement(
value:Metric, value:Metric,
@ -239,7 +239,7 @@ object SideouputSample {
}) })
// collect all simple metrics // collect all simple metrics
pipeline result
.getSideOutput(outputSimple) .getSideOutput(outputSimple)
// the sink would, usually, be the JDBCOutputFormat here, but we // the sink would, usually, be the JDBCOutputFormat here, but we
// only want to print the results, so this will do, pig. // only want to print the results, so this will do, pig.
@ -249,7 +249,7 @@ object SideouputSample {
} }
}) })
pipeline result
.getSideOutput(outputComplex) .getSideOutput(outputComplex)
.addSink(new SinkFunction[Metric] { .addSink(new SinkFunction[Metric] {
def inkoke(value:Metric):Unit = { def inkoke(value:Metric):Unit = {

Loading…
Cancel
Save