Prepare your Flink Java program

SSH to node-1 to clone the git repo for this lab to your home directory.

ssh node-1.cse.cuhk.edu.hk
git clone https://github.com/CUINeo/CUHK-CSE-Lab-Flink.git
cd CUHK-CSE-Lab-Flink

Open the file src/main/java/cslab/flinklab/FlinkExample.java

Line 28: Define the stream source as our custom class CustomSource.

Lines 31-36: Do the transformation. - Line 33: flatMap(map_function); flatMap() emits 0 or more records (i.e., 1:n mapping) whereas map() emits 1 record (i.e., 1:1 mapping) the map_function this time is GetWordCount(). For each sentence, emit multiple records <word1, 1>, <word2, 1>. - Line 35: keyBy(0); //group by 1st element. - Line 36: sum(1); //aggregate each group by 2nd element.

Lines 39-40: Define a PrintSink.

package cslab.flinklab;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Collector;

public class FlinkExample {

    private static final Logger LOG = LoggerFactory.getLogger(FlinkExample.class);

    public static void main(String[] args) throws Exception {
        // checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(params.getInt("parallelism", 1));

        // define the Source
        DataStream<String> streamSource = env.addSource(new CustomSource());

        // transform
        DataStream<Tuple2<String, Integer>> words = streamSource
                // splitting sentences to (word, 1)
                .flatMap(new GetWordCount())
                // group by words and sum their occurrences
                .keyBy(0)
                .sum(1);

        // emit result to the standard output
        SinkFunction<Tuple2<String, Integer>> printSink = new PrintSinkFunction<>();
        words.addSink(printSink).name("StdoutSink");

        // execute program
        env.execute("Flink Streaming Example");
    }

    // *************************************************************************
    // Custom FlatMap Function
    // *************************************************************************
    public static class GetWordCount implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            // split the sentence into words
            String[] words = value.toLowerCase().split("\\W+");

            // emit each word with a count of 1
            for (String word : words) {
                if (word.length() > 0) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}

If you are interested, also have a look at the CustomSource class.

Last updated

Was this helpful?