Skip to content

First Java Applications

Update
  • Created 2018
  • Updated 10/2024 Flink open source 1.20.x supports Java 17 Use jbang or sdkman (sdk cli) to install jdk 11 or 17.

In this chapter, I will address how to develop basic Flink java application with Table API, DataStream API or SQL in java in the context of three deployment: open-source, Confluent Cloud and Confluent Platform. The chapter is also addressing CI/CD practices.

Introduction

Each Java Flink app is a Java main function which defines the data flow to execute on one or more data streams. The flow will be "compiled" during deployment, by the flink cli client, as a directed acyclic graph where each node may run in a task slot, within a distributed cluster of Task Manager nodes.

The code structure follows the following standard steps:

  1. Obtain a Flink execution environment
  2. Define the data pipeline graph (as a separate method to simplify unit testing)

    1. Load/create the initial data from the stream
    2. Specify transformations on the data
    3. Specify where to put the results of the computations
  3. Trigger the program execution

Once developers build the application jar file, they use Flink CLI to send the jar as a job to the Job manager server which will schedule and assign the job to task managers.

Create a Java project with maven

See the product documentation which can be summarized as:

  1. Create project template from quickstart shell

    curl https://flink.apache.org/q/quickstart.sh | bash -s 1.20.0
    
  2. Add the following maven dependencies into the pom.xml:

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink-version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink-version}</version>
        <scope>provided</scope>
    </dependency>
    
  3. When using Kafka, add kafka connector dependencies in pom.xml

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka</artifactId>
          <version>3.0.0-1.17</version>
      </dependency>
    
  4. Create a Java Class with a main function and the following code structure (See 01-word-count example):

    • get Flink execution context
    • defined process flow to apply to the data stream
    • start the execution
    // Get execution context
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      // use file as input so use program arguments to get file name
      ParameterTool params = ParameterTool.fromArgs(args);
      env.getConfig().setGlobalJobParameters(params);
      defineWorkflow(env)
      env.execute();
    

    The code above uses the ParameterTool class to process the program arguments. So most of the basic examples use --input filename and --output filename as java arguments. So params will have those arguments in a Map.

  5. Define event structure as POJO, as a separate Java Bean in the event folder.

  6. Implement the process logic and the event mapping, filtering logic... We will see more examples later.
  7. Package with mvn package
  8. Access to a Flink compute pool, locally using local installation, docker, Kubernetes or Confluent Cloud for Flink. Use the flink cli to submit the job for a local cluster, use FlinkDeployment(KFF) or FlinkApplication (CMF)

With the open source version, we have access to a lot of different connectors, for example to load data from csv file. This is convenient to do local testing. With Confluent Cloud only Kafka topics can be used as source and sink.

Create a Quarkus java app

  • Create a Quarkus app: quarkus create app -DprojectGroupId=jbcodeforce -DprojectArtifactId=my-flink. See code examples under flink-java/my-flink folder and jbcodeforce.p1 package.
  • Do the same steps as above for the main class.
  • Be sure to set quarkus uber-jar generation (quarkus.package.type=uber-jar) in the application.properties to get all the dependencies in a unique jar: Flink needs all dependencies in the classpath.
  • Package the jar with mvn package
  • Every Flink application needs a reference to the execution environment (variable env in previous example).
  • Start a job manager and task manager with the docker compose under deployment/docker folder or flink-1.19.1/bin/start_cluster.sh for local installation.
  • To submit a job to a Session cluster, use the following command which uses the flink cli. This can be done usin the flink cli on the local install or inside the running JobManager container when using docker or k8s:
# One way with mounted files to task manager and job manager containers.
CNAME="jbcodeforce.p1.WordCountMain"
JMC=$(docker ps --filter name=jobmanager --format={{.ID}})
docker exec -ti $JMC flink run -d -c $CNAME /home/my-flink/target/my-flink-1.0.0-runner.jar --input file:///home/my-flink/data/wc.txt --output file:///home/my-flink/data/out.csv 

# inside the jobmanager
flink run -d -c jbcodeforce.p1.WordCountMain /home/my-flink/target/my-flink-
1.0.0-runner.jar --input file:///home/my-flink/data/wc.txt --output file:///home/my-flink/data/out.csv

See the coding practice summary for more datastream examples.

And the official operators documentation to understand how to transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated data flow topologies.


TO REWORK

Unit testing

There are three type of function to test:

  • Stateless
  • Stateful
  • Timed process

Stateless

For stateless, the data flow can be isolated in static method within the main class, or defined within a separate class. The test instantiates the class and provides the data.

For example testing a string to a tuple mapping (MapTrip() is a MapFunction(...) extension):

 public void testMapToTuple() throws Exception {
        MapTrip mapFunction = new MapTrip();
        Tuple5<String,String,String, Boolean, Integer> t = mapFunction.map("id_4214,PB7526,Sedan,Wanda,yes,Sector 19,Sector 10,5");
        assertEquals("Wanda",t.f0);
        assertEquals("Sector 19",t.f1);
        assertEquals("Sector 10",t.f2);
        assertTrue(t.f3);
        assertEquals(5,t.f4);
    }

Stateful

The test needs to check whether the operator state is updated correctly and if it is cleaned up properly, along with the output of the operator. Flink provides TestHarness classes so that we don’t have to create the mock objects.


Example of standalone job docker-compose file

Change the --job-classname parameter of the standalone-job command within the docker-compose file:

version: "2.2"
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] [job arguments]
    volumes:
      - /host/path/to/job/artifacts:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        parallelism.default: 2

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    volumes:
      - /host/path/to/job/artifacts:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        parallelism.default: 2