First Java Applications¶
Update
- Created 2018
- Updated 10/2024 Flink open source 1.20.x supports Java 17 Use jbang or sdkman (sdk cli) to install jdk 11 or 17.
In this chapter, I will address how to develop basic Flink java application with Table API, DataStream API or SQL in java in the context of three deployment: open-source, Confluent Cloud and Confluent Platform. The chapter is also addressing CI/CD practices.
Introduction¶
Each Java Flink app is a Java main function which defines the data flow to execute on one or more data streams. The flow will be "compiled" during deployment, by the flink cli client, as a directed acyclic graph where each node may run in a task slot, within a distributed cluster of Task Manager nodes.
The code structure follows the following standard steps:
- Obtain a Flink execution environment
-
Define the data pipeline graph (as a separate method to simplify unit testing)
- Load/create the initial data from the stream
- Specify transformations on the data
- Specify where to put the results of the computations
-
Trigger the program execution
Once developers build the application jar file, they use Flink CLI to send the jar as a job to the Job manager server which will schedule and assign the job to task managers.
Create a Java project with maven¶
See the product documentation which can be summarized as:
-
Create project template from quickstart shell
-
Add the following maven dependencies into the
pom.xml
:<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink-version}</version> <scope>provided</scope> </dependency>
-
When using Kafka, add kafka connector dependencies in pom.xml
-
Create a Java Class with a main function and the following code structure (See 01-word-count example):
- get Flink execution context
- defined process flow to apply to the data stream
- start the execution
// Get execution context ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // use file as input so use program arguments to get file name ParameterTool params = ParameterTool.fromArgs(args); env.getConfig().setGlobalJobParameters(params); defineWorkflow(env) env.execute();
The code above uses the ParameterTool class to process the program arguments. So most of the basic examples use
--input filename
and--output filename
as java arguments. Soparams
will have those arguments in a Map. -
Define event structure as POJO, as a separate Java Bean in the
event
folder. - Implement the process logic and the event mapping, filtering logic... We will see more examples later.
- Package with
mvn package
- Access to a Flink compute pool, locally using local installation, docker, Kubernetes or Confluent Cloud for Flink. Use the flink cli to submit the job for a local cluster, use FlinkDeployment(KFF) or FlinkApplication (CMF)
With the open source version, we have access to a lot of different connectors, for example to load data from csv file. This is convenient to do local testing. With Confluent Cloud only Kafka topics can be used as source and sink.
Create a Quarkus java app¶
- Create a Quarkus app:
quarkus create app -DprojectGroupId=jbcodeforce -DprojectArtifactId=my-flink
. See code examples underflink-java/my-flink
folder andjbcodeforce.p1
package. - Do the same steps as above for the main class.
- Be sure to set quarkus uber-jar generation (
quarkus.package.type=uber-jar
) in theapplication.properties
to get all the dependencies in a unique jar: Flink needs all dependencies in the classpath. - Package the jar with
mvn package
- Every Flink application needs a reference to the execution environment (variable
env
in previous example).
Submit job to Flink¶
- Start a job manager and task manager with the docker compose under
deployment/docker
folder orflink-1.19.1/bin/start_cluster.sh
for local installation. - To submit a job to a Session cluster, use the following command which uses the
flink
cli. This can be done usin the flink cli on the local install or inside the runningJobManager
container when using docker or k8s:
# One way with mounted files to task manager and job manager containers.
CNAME="jbcodeforce.p1.WordCountMain"
JMC=$(docker ps --filter name=jobmanager --format={{.ID}})
docker exec -ti $JMC flink run -d -c $CNAME /home/my-flink/target/my-flink-1.0.0-runner.jar --input file:///home/my-flink/data/wc.txt --output file:///home/my-flink/data/out.csv
# inside the jobmanager
flink run -d -c jbcodeforce.p1.WordCountMain /home/my-flink/target/my-flink-
1.0.0-runner.jar --input file:///home/my-flink/data/wc.txt --output file:///home/my-flink/data/out.csv
See the coding practice summary for more datastream examples.
And the official operators documentation to understand how to transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated data flow topologies.
TO REWORK
Unit testing¶
There are three type of function to test:
- Stateless
- Stateful
- Timed process
Stateless¶
For stateless, the data flow can be isolated in static method within the main class, or defined within a separate class. The test instantiates the class and provides the data.
For example testing a string to a tuple mapping (MapTrip() is a MapFunction(...) extension):
public void testMapToTuple() throws Exception {
MapTrip mapFunction = new MapTrip();
Tuple5<String,String,String, Boolean, Integer> t = mapFunction.map("id_4214,PB7526,Sedan,Wanda,yes,Sector 19,Sector 10,5");
assertEquals("Wanda",t.f0);
assertEquals("Sector 19",t.f1);
assertEquals("Sector 10",t.f2);
assertTrue(t.f3);
assertEquals(5,t.f4);
}
Stateful¶
The test needs to check whether the operator state is updated correctly and if it is cleaned up properly, along with the output of the operator. Flink provides TestHarness classes so that we don’t have to create the mock objects.
Example of standalone job docker-compose file¶
Change the --job-classname
parameter of the standalone-job command within the docker-compose file:
version: "2.2"
services:
jobmanager:
image: flink:latest
ports:
- "8081:8081"
command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] [job arguments]
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
parallelism.default: 2
taskmanager:
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
scale: 1
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2