From a86ea4ebff46d05bde50f8bc0884371d6378fc2a Mon Sep 17 00:00:00 2001 From: Julio Biason Date: Thu, 2 Aug 2018 16:40:52 -0300 Subject: [PATCH] added the basic SocketWindowWordCount from flink examples --- .../flink/SocketWindowWordCount.java | 117 ++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 src/main/java/net/juliobiason/flink/SocketWindowWordCount.java diff --git a/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java b/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java new file mode 100644 index 0000000..62eb578 --- /dev/null +++ b/src/main/java/net/juliobiason/flink/SocketWindowWordCount.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.juliobiason.flink; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; + +/** + * Implements a streaming windowed version of the "WordCount" program. + * + *

This program connects to a server socket and reads strings from the socket. + * The easiest way to try this out is to open a text server (at port 12345) + * using the netcat tool via + *

+ * nc -l 12345
+ * 
+ * and run this example with the hostname and the port as arguments. + */ +@SuppressWarnings("serial") +public class SocketWindowWordCount { + + public static void main(String[] args) throws Exception { + + // the host and the port to connect to + final String hostname; + final int port; + try { + final ParameterTool params = ParameterTool.fromArgs(args); + hostname = params.has("hostname") ? params.get("hostname") : "localhost"; + port = params.getInt("port"); + } catch (Exception e) { + System.err.println("No port specified. Please run 'SocketWindowWordCount " + + "--hostname --port ', where hostname (localhost by default) " + + "and port is the address of the text server"); + System.err.println("To start a simple text server, run 'netcat -l ' and " + + "type the input text into the command line"); + return; + } + + // get the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // get input data by connecting to the socket + DataStream text = env.socketTextStream(hostname, port, "\n"); + + // parse the data, group it, window it, and aggregate the counts + DataStream windowCounts = text + + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(String value, Collector out) { + for (String word : value.split("\\s")) { + out.collect(new WordWithCount(word, 1L)); + } + } + }) + + .keyBy("word") + .timeWindow(Time.seconds(5)) + + .reduce(new ReduceFunction() { + @Override + public WordWithCount reduce(WordWithCount a, WordWithCount b) { + return new WordWithCount(a.word, a.count + b.count); + } + }); + + // print the results with a single thread, rather than in parallel + windowCounts.print().setParallelism(1); + + env.execute("Socket Window WordCount"); + } + + // ------------------------------------------------------------------------ + + /** + * Data type for words with count. + */ + public static class WordWithCount { + + public String word; + public long count; + + public WordWithCount() {} + + public WordWithCount(String word, long count) { + this.word = word; + this.count = count; + } + + @Override + public String toString() { + return word + " : " + count; + } + } +}