Solution Design¶
Simple domain model for client¶
We can use a simple client model to define a life insurance client that can have different benifeciary or transfer the life insurance to other persons. The model can be see as:
Life insurance policy can be transferred to a family member or someone else, therefore the model stores not only information about the client to whom the policy belongs but also information about any related people and their relationship to the client.
Client information is in Person
objecr, but also as a Client
. Other people related to the client to whom the policy may be transferred or who may receive the policy benefit upon the client’s death are also Person
s.
Client Category is to be able to classify client for marketing reason based on demographics and financial details.
The remaining two classes are needed for describing the nature of the relationship between clients and other people. Relation types is stored in the ClientRelationType
.
The ClientRelated
instances store references to the client (client_id), the related person (person_id), the nature of that relation (client_relation_type_id), all addition details (details), if any, and a flag indicating whether the relation is currently active (is_active).
The java classes for this model are in the lf-tx-simulator
project.
In the future, can extend this model with the Life Insurance offer and product.
Model inspiration is coming from Vertabelo blog
Transaction Simulator¶
The code is under the folder lf-tx-simulator and use JMS API to interact with MQ.
The main class is in MQProducer.java and expose a send(client) method with the domain class and prepare a transaction event message. The following code extract illustrates this:
// this is more a demo trick. Should think of a better implementation
TransactionEvent tx = new TransactionEvent();
if (newClient) {
tx.type = TransactionEvent.TX_CLIENT_CREATED;
} else {
tx.type = TransactionEvent.TX_CLIENT_UPDATED;
}
tx.payload = client;
tx.txid = client.id;
tx.timestamp = new Date().getTime();
String msg = parser.writeValueAsString(tx);
TextMessage message = jmsContext.createTextMessage(msg);
message.setJMSCorrelationID(client.id);
producer.send(destination, message);
The transaction event adds meta data and supports generic payload. We may replace it with CloudEvent.
The code is using the JMSCorrelationID to define the potential Key to be used by Kafka producer. We will detail that in next section.
The rest of this application exposes a REST resource to control the simulation and sends some new client data or update to existing client's one. The APIs should be enough to demonstrate the needed requirements.
The deployment descriptors are in the environements/apps folder
The MQ source connector¶
The declaration of the Kafka connect cluster is done in the environements/service folder and the MQ source connector in kafka-mq-src-connector.yaml.
The interesting part of the connector configuration is the use of JMS and the JMSCorrelationID as a source for the Kafka Record key.
mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
mq.connection.mode: client
mq.message.body.jms: true
mq.record.builder.key.header: JMSCorrelationID
The Client event stream processing.¶
The code is in the client-event-processing folder, and supports the implementation of the green component in figure below:
The streaming algorithm is quite simple:
-
get continuous update of the category reference data: this should be rare, but the process will get any new updated to those data. This will be a Table in memory and persisted in Kafka to keep only the last update per record key. The table below illustrates the mockup data used:
Key Value 1 "category_name": "Personal" 2 "category_name": "VIP" 3 "category_name": "Employee" 4 "category_name": "Business" Topology code with GlobalKtable so if we partition the input stream we have a unique table.
-
Process transaction events in a streams, validate the data, and any transaction in error goes to dead letter queue.
{ "id": "101012", "code": "C02", "insuredPerson": { "id": 2, "code": "P02", "first_name": "julie", "last_name": "thesimmer", "address": "10 market street, CA, San Franciso", "phone": "650-650-650", "mobile": "", "email": "jswimmer@email.com" }, "client_category_id": 1 }
The transaction streaming processing start with the following statements:
KStream<String, TransactionEvent> transactions = builder.stream(transactionsInputStreamName, Consumed.with(Serdes.String(), transactionEventSerder)); Map<String, KStream<String, TransactionEvent>> branches = transactions .split(Named.as("A-")) .branch((k,v) -> transactionNotValid(v), Branched.as("error")) .defaultBranch(Branched.as("good-data"));
This is an exactly once delivery.
-
Transform the input transaction hierarchical model into a flat model: ClientOutput class
public class ClientOutput implements JSONSerdeCompatible { public String client_id; public String client_code; public Integer client_category_id; public String client_category_name; public String first_name; public String last_name; public String address; public String phone; public String mobile; public String email;
Data transformation is simple:
-
Enrich with the category name by doing a join with the categories table
-
Route based on category name content to different target.
The deployment descriptors are in the environments/apps/client-event-processing folder