User Defined Functions¶
User-defined functions (UDFs) are extension to Flink SQL and Table API for frequently used logic and custom integration. It can be written in Java or PyFlink.
If an operation cannot be expressed directly using Flink's standard SQL syntax or built-in functions (e.g., integrating a third-party library, implementing a proprietary business logic, or performing a complex machine learning inference), a UDF provides the necessary capability to execute that custom code within the stream or batch job.
Developers can leverage ny existing libraries like Geospatial calculation, Math computation, to implement the UDF.
Four Types of UDF¶
| UDF Type | Description | Input to Output Mapping | Example Use Case |
|---|---|---|---|
| Scalar Function | Maps a set of scalar input values to a single, new scalar output value. | 1 row -> 1 row | Formatting a string, calculating an encryption key. |
| Table Function | Maps a set of scalar input values to one or more rows (a new table). | 1 row -> N rows | Splitting a single column into multiple rows. |
| Aggregate Function | Maps the values of multiple input rows to a single scalar aggregate value. | N rows -> 1 row | Calculating a custom weighted average or variance. |
| Table Aggregate Function | Maps the values of multiple input rows to multiple output rows. | N rows -> M rows | Calculating a running "top-N" list for each group. |
Implementation approach¶
For developer the steps are:
- Develop a Class to extends a
org.apache.flink.table.functions.ScalarFunctionororg.apache.flink.table.functions.TableFunction - Implement one of the eval function
- Add constructor with empty parameters and more constructors if needed
- Prefer specifying the parameter types and function return type, specially for TableFunction
- Build a uber jar
- Deploy to Confluent Cloud or into the lib folder of CP Flink Application or into the lib folder of the OSS Flink distribution.
See this repository to get a set of reusable UDFs implemented as solution for generic problems asked by our customers.
See also the Confluent documentation on UDF and a Confluent git repo with some sample UDFs.
Extending base APIs¶
Scalar function¶
Scalar function generates a unique value.
The product documentation has all the details.
Use AsyncScalarFunction when interacting with external systems. Use thread pools, initialized in constructor, to manage connection multiplexing.
Table function¶
Table function returns an arbitrary number of rows (or structured types) as output. Single scalar value can be emitted and will be implicitly wrapped into a row by the runtime.
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
public void eval(String str) {...
See details in the product documentation.
AsyncTableFunction should be used to generate n rows when integrating with external systems.
The function is used with SQL,using the LATERAL TABLE(<TableFunction>) with JOIN or LEFT JOIN with an ON TRUE join condition.
Aggregate Function¶
Aggregate user defined function, maps scalar values of multiple rows to a new scalar value, using accumulator. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.
See code example
The accumulate() method is called for each input row to update the accumulator.
For more detail, see the product documentation.
Deploying to Confluent Cloud¶
- Get FlinkDeveloper RBAC to be able to manage workspaces and artifacts
-
Use the Confluent CLI to upload the jar file. Example from GEO_DISTANCE:
confluent environment list # then in your environment confluent flink artifact create geo_distance --artifact-file target/geo-distance-udf-1.0-0.jar --cloud aws --region us-west-2 --environment env-nk...+--------------------+--------------+ | ID | cfa-nx6wjz | | Name | geo_distance | | Version | ver-nxnnnd | | Cloud | aws | | Region | us-west-2 | | Environment | env-nknqp3 | | Content Format | JAR | | Description | | | Documentation Link | | +--------------------+--------------+Also visible in the Artifacts menu

-
UDFs are registered inside a Flink database
- Use the function to compute distance between Paris and London:
