User Defined Functions¶
User-defined functions (UDFs) are extension to Flink SQL and Table API for frequently used logic and custom integration. It can be written in Java or PyFlink.
If an operation cannot be expressed directly using Flink's standard SQL syntax or built-in functions (e.g., integrating a third-party library, implementing a proprietary business logic, or performing a complex machine learning inference), a UDF provides the necessary capability to execute that custom code within the stream or batch job.
Developers can leverage ny existing libraries like Geospatial calculation, Math computation, to implement the UDF.
Four Types of UDF¶
| UDF Type | Description | Input to Output Mapping | Example Use Case |
|---|---|---|---|
| Scalar Function | Maps a set of scalar input values to a single, new scalar output value. | 1 row -> 1 row | Formatting a string, calculating an encryption key. |
| Table Function | Maps a set of scalar input values to one or more rows (a new table). | 1 row -> N rows | Splitting a single column into multiple rows. |
| Aggregate Function | Maps the values of multiple input rows to a single scalar aggregate value. | N rows -> 1 row | Calculating a custom weighted average or variance. |
| Table Aggregate Function | Maps the values of multiple input rows to multiple output rows. | N rows -> M rows | Calculating a running "top-N" list for each group. |
Implementation approach¶
See Apache flink UDF implementation guide.
For developer the steps are:
- Develop a Class to extends a
org.apache.flink.table.functions.ScalarFunctionororg.apache.flink.table.functions.TableFunction - Implement one of the eval function
- Add constructor with empty parameters and more constructors if needed
- Prefer specifying the parameter types and function return type, specially for TableFunction
- Build a uber jar
- Deploy to Confluent Cloud or into the lib folder of CP Flink Application or into the lib folder of the OSS Flink distribution.
On Confluent Cloud, be sure to use log4j to get function logs and wrap code into try .. .catch. See log debug messages documentation.
Iterate on UDF development¶
In some case we need to iterate on the deployment of new UDF version. It is possible to deploy UDF with different version.
- Need to drop the function:
- Delete the artifacts
- Upload the new jar as new artifact
- Then recreate it with the new artifact id
Examples¶
- See this repository to get a set of reusable UDFs implemented as solution for generic problems asked by our customers.
- See also the Confluent documentation on UDF and a Confluent git repo with some sample UDFs.
Table API examples¶
Extending base APIs¶
Scalar function¶
Scalar function generates a unique value.
The product documentation has all the details.
Use AsyncScalarFunction when interacting with external systems. Use thread pools, initialized in constructor, to manage connection multiplexing.
Table function¶
Table function returns an arbitrary number of rows (or structured types) as output. Single scalar value can be emitted and will be implicitly wrapped into a row by the runtime.
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
public void eval(String str) {...
See details in the product documentation.
AsyncTableFunction should be used to generate n rows when integrating with external systems.
The function is used with SQL LATERAL TABLE(<TableFunction>) with JOIN or LEFT JOIN with an ON TRUE join condition. See Lateral table section
Aggregate Function¶
Aggregate user defined function, maps scalar values of multiple rows to a new scalar value, using accumulator. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.
The accumulate() method is called for each input row to update the accumulator.
For more detail, see the product documentation.
Deeper considerations¶
- Scalar UDFs in the Table API/SQL are generally expected to be stateless
- Stateful with hashmap to keep state is risky as the map will be wiped if the Task Manager restarts. We need a distributed cache, or use KeyedProcessFunction with the ValueState or ListState. If the UDTF's output depends on internal state that changes over time, it can sometimes lead to unexpected results in complex joins or aggregations. For inner join lateral, when the logic decides not to call collect(), the entire row is filtered out of the result. This effectively acts as a stateful filter. For left join lateral, the non call to collect() will return null, and it is possible to keep the input record from the left table.
- Process Table Function may be needed to define those complex stateful processing as PTF has access to Flinkās managed state, event-time and timer services, and underlying table changelogs. When invoking a PTF, the system automatically adds implicit arguments for state and time management alongside the user-defined input arguments.
Deploying to Confluent Cloud¶
- Get FlinkDeveloper RBAC to be able to manage workspaces and artifacts
-
Use the Confluent CLI to upload the jar file. Example from GEO_DISTANCE:
confluent environment list # then in your environment confluent flink artifact create geo_distance --artifact-file target/geo-distance-udf-1.0-0.jar --cloud aws --region us-west-2 --environment env-nk...+--------------------+--------------+ | ID | cfa-nx6wjz | | Name | geo_distance | | Version | ver-nxnnnd | | Cloud | aws | | Region | us-west-2 | | Environment | env-nknqp3 | | Content Format | JAR | | Description | | | Documentation Link | | +--------------------+--------------+Also visible in the Artifacts menu

-
UDFs are registered inside a Flink database. It may take some time.
-
Use the function to compute distance between Paris and London:

Runtime explanation¶
- UDF invocations are batched (may wait up to 500ms), the runtime tries to accumulate several records before it calls them.
- In Confluent Cloud the UDFs actually run in a separate pod for security isolation, which increases the consumption.