Skip to content

Mirror Maker 2 Deployment

In this article we are presenting different type of Mirror Maker 2 deployments. Updated 4/4 on Strimzi version 0.17.

  • Using Strimzi operator to deploy on Kubernetes
  • To run MM2 on a VM or as docker image which can be adapted with your own configuration, like for example by adding prometheus JMX Exporter as java agent.

We are using the configuration to deploy from event streams on Cloud to a local Kafka cluster we deployed using Strimzi.

ES to local

Common configuration

When we need to create Kubernetes Secrets to manage APIKEY to access Event Streams, and TLS certificate to access local Kafka brokers, we need to do the following steps:

  • Create a project in OpenShift to deploy Mirror Maker cluster, for example: oc new-project <projectname>.
  • Create a secret for the API KEY of the Event Streams cluster: oc create secret generic es-api-secret --from-literal=password=<replace-with-event-streams-apikey>
  • As your vanilla Kafka source cluster may use TLS to communicate between clients and brokers, you need to use the k8s secret defined when deploying Kafka which includes the CAroot and generic client certificates. These secrets are : eda-demo-24-cluster-clients-ca-cert and eda-demo-24-cluster-cluster-ca-cert.

  • Get the host ip address from the Route resource

    oc get routes my-cluster-kafka-bootstrap -o=jsonpath='{.status.ingress[0].host}{"\n"}'
    
  • Get the TLS CA root certificate from the broker

    oc get secrets
    oc extract secret/eda-demo-24-cluster-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
    oc extract secret/eda-demo-24-cluster-clients-ca-cert --keys=ca.crt --to=- >> ca.crt
    
  • Transform the certificates for java truststore

    keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass password -noprompt
    
  • Create a secret from truststore file so it can be mounted as needed into consumer or producer running in the same OpenShift cluster.

oc create secret generic kafka-truststore --from-file=./truststore.jks

Deploying using Strimzi Mirror Maker operator

We assume you have an existing namespace or project to deploy Mirror Maker. You also need to get the latest (0.17-rc4 at least) Strimzi configuration from the download page.

If you have already installed Strimzi Operators, Cluster Roles, and CRDs, you do not need to do it again as those resources are defined at the kubernetes cluster level. See the provisioning note.

  • Define source and target cluster properties in mirror maker 2.0 es-to-kafka-mm2.yml descriptor file. Here is the file for the replication between Event Streams and local cluster es-to-kafka-mm2.yml. We strongly recommend to study the schema definition of this custom resource from this page.

Note

connectCluster attribute defines the cluster alias used for Kafka Connect, it must match a cluster in the list at spec.clusters. The config part can match the Kafka configuration for consumer or producer, except properties starting by ssl, sasl, security, listeners, rest, bootstarp.servers which are declared at the cluster definition level.

  alias: event-streams-wdc-as-target
    bootstrapServers: broker-3...
    tls: {}
    authentication:
      passwordSecret:
          secretName: es-api-secret  
          password: password
      username: token
      type: plain

The example above use a secret to get the Event Streams API KEY, which as create with a command like: oc create secret generic es-api-secret --from-literal=password=<replace with ES key>

  • Deploy Mirror maker 2.0 within your project.
oc apply -f es-to-kafka-mm2.yml

This commmand creates a kubernetes deployment as illustrated below, with one pod as the replicas is set to 1. If we need to add parallel processing because of the topics to replicate have multiple partitions, or there are a lot of topics to replicate, then adding pods will help to scale horizontally. The pods are in the same consumer group, so Kafka Brokers will do the partition rebalancing among those new added consumers.

Mirror maker deployment

Now with this deployment we can test consumer and producer as described in the scenario 4.

MM2 topology

In this section we want to address horizontal scalability and how to organize the MirrorMaker 2 topology for multi tenancy. The simplest approach is to use one MirrorMaker instance per familly of topics: the classification of familly of topic can be anything, from line of business, to team, to application. Suppose an application is using 1000 topic - partitions, for data replication it may make sense to have one MM2 instance for this application responsible to manage the topics replication. The configuration will define the groupId to match the application name for example.

The following diagram illustrates this kind of topology by using regular expression on the topic white list selection, there are three MirrorMaker 2 instances mirroring the different topics with name starting with topic-name-A*, topic-name-B*, topic-name-C*, respectively.

Each connect instance is a JVM workers that replicate the topic/parititions and has different group.id.

For Bi-directional replication for the same topic name, MirrorMaker 2 will use the cluster name as prefix. With MM2 we do not need to have 2 clusters but only one and bidirectional definitions. The following example is showing the configuration for a MM2 bidirectional settings, with accounts topic to be replicated on both cluster:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker2
...
 mirrors:
  - sourceCluster: "event-streams-wdc"
    targetCluster: "kafka-on-premise"
    ...
    topicsPattern: "accounts,orders"
  - sourceCluster: "kafka-on-premise"
    targetCluster: "event-streams-wdc"
    ...
    topicsPattern: "accounts,customers"

Capacity planning

To address capacity planning, we need to review some characteristic of the Kafka Connect framework: For each topic/partition there will be a task running. We can see in the trace that tasks are mapped to threads inside the JVM. So the parallelism will be bound by the number of CPUs the JVM runs on. The parameters max.tasks specifies the max parallel processing we can have per JVM. So for each Topic we need to assess the number of partitions to be replicated. Each task is using the consumer API and is part of the same consumer group, the partition within a group are balanced by an internal controler. With Kafka connect any changes to the topic topology triggers a partition rebalancing. In MM2 each consumer / task is assigned a partition by the controller. So the rebalancing is done internally. Still adding a broker node into the cluster will generate rebalancing.

The task processing is stateless, consume - produce wait for acknowledge, commit offet. In this case the CPU and network are key. For platform tuning activity we need to monitor operating system performance metrics. If the CPU becomes the bottleneck, we can allocate more CPU or start to scale horizontally by adding mirror maker 2 instance. If the network at the server level is the bottleneck, then adding more servers will help. Kafka will automatically balance the load among all the tasks running on all the machines. The size of the message impacts also the throughtput as with small message the throughput is CPU bounded. With 100 bytes messages or more we can observe network saturation.

The parameters to consider for sizing are the following:

Parameter Description Impact
Number of topic/ partition Each task processes one partition For pure parallel processing max.tasks is the number of CPU
Record size Size of the message in each partition in average Memory usage and Throughput: the # of records/s descrease when size increase, while MB/s throughput increases in logarithmic
Expected input throughput The producer writing to the source topic throughput Be sure the consumers inside MM2 absorb the demand
Network latency This is where positioning MM2 close to the target cluster may help improve latency

Version migration

Once the MirrorMaker cluster is up and running, it may be needed to update the underlying code when a new product version is released. Based on Kafka Connect distributed mode multiple workers JVM coordinate the topic / partition repartition among themselves via Kafka topic. If a worker process dies, the cluster is rebalanced to distribute the work fairly over the remaining workers. If a new worker starts work, a rebalance ensures it takes over some work from the existing workers.

Using the REST API it is possible to stop and restart a connector. As of now the recommendation is to start a new MirrorMaker instance with the new version and the same groupId as the existing workers you want to migrate. Then stop the existing version. As each MirrorMaker workers are part of the same group, the internal worker controller will coordinate with the other workers the 'consumer' task to partition assignment.

We have presentate a similar approach in this section, where we tested that each instance of MirrorMaker 2 could assume the replication. First we will stop the Node 1 instance, upgrade it to the latest version, then start it again. Then we’ll repeat the same procedure on Node 2. We’ll continue to watch the Consumer VM window to note that replication should not stop at any point.

We’ve now upgraded to Kafka 2.5 including the latest MirrorMaker 2. Meanwhile, replication was uninterrupted due to the second instance of MirrorMaker 2:

Now we’ll restart the Node 1 instance of MirrorMaker 2, stop the Node 2 instance, we can still see replication occurring on the upgraded Node 1 instance of MirrorMaker 2.

We upgrade Node 2’s instance of MirrorMaker 2 exactly as on Node 1, and start it again, and once again, replication is still going.

When using Strimzi, if the update applies to the MM2 Custom Resource Definition, just reapplying the CRD should be enough.

Be sure to verify the product documentation as new version may enforce to have new topics. It was the case when Kafka connect added the config topic in a recent version.

Deploying a custom MirrorMaker docker image

We want to use custom docker image when we want to add Prometheus JMX exporter as Java Agent so we can monitor MM2 with Prometheus. The proposed docker file is in this folder and may look like:

FROM strimzi/kafka:latest-kafka-2.4.0
# ...
ENV LOG_DIR=/tmp/logs
ENV EXTRA_ARGS="-javaagent:/usr/local/share/jars/jmx_prometheus_javaagent-0.12.0.jar=9400:/etc/jmx_exporter/jmx_exporter.yaml "

# ....
EXPOSE 9400

CMD /opt/kafka/bin/connect-mirror-maker.sh  /home/mm2.properties

As the mirror maker 2 is using properties file, we want to define source and target cluster and the security settings for both clusters. As the goal is to run within the same OpenShift cluster as Kafka, the broker list for the source matches the URL within the broker service:

# get the service URL
oc describe svc my-cluster-kafka-bootstrap
# URL my-cluster-kafka-bootstrap:9092

The target cluster uses the bootstrap servers from the Event Streams Credentials, and the API KEY is defined with the manager role, so mirror maker can create topic dynamically.

Properties template file can be seen here: kafka-to-es-mm2

clusters=source, target
source.bootstrap.servers=eda-demo-24-cluster-kafka-bootstrap:9092
source.ssl.endpoint.identification.algorithm=
target.bootstrap.servers=broker-3-h6s2xk6b2t77g4p1.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-1-h6s2xk6b2t77g4p1.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-0-h6s2xk6b2t77g4p1.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-5-h6s2xk6b2t77g4p1.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-2-h6s2xk6b2t77g4p1.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-4-h6s2xk6b2t77g4p1.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093
target.security.protocol=SASL_SSL
target.ssl.protocol=TLSv1.2
target.ssl.endpoint.identification.algorithm=https
target.sasl.mechanism=PLAIN
target.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required 
username="token" password="<Manager API KEY from Event Streams>";
# enable and configure individual replication flows
source->target.enabled=true
sync.topic.acls.enabled=false
replication.factor=3
internal.topic.replication.factor=3
refresh.topics.interval.seconds=10
refresh.groups.interval.seconds=10
source->target.topics=products
tasks.max=10

Upload the properties as a secret

oc create secret generic mm2-std-properties --from-file=es-cluster/mm2.properties

The file could be copied inside the docker image or better mounted from a secret when deployed to kubernetes.

Build and push the image to a docker registry.

docker build -t ibmcase/mm2ocp:v0.0.2  .
docker push ibmcase/mm2ocp:v0.0.2

Then using a deployment configuration like this one, we can deploy our custom mirror maker 2 with:

oc apply -f mm2-deployment.yaml
# to assess the cluster 
oc get kafkamirrormaker2
NAME          DESIRED REPLICAS
mm2-cluster   1

Define the monitoring rules

As explained in the monitoring note, we need to define the Prometheus rules within a yaml file so that Mirror Maker 2 can report metrics:

lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
  - pattern : "kafka.connect<type=connect-worker-metrics>([^:]+):"
    name: "kafka_connect_connect_worker_metrics_$1"
  - pattern : "kafka.connect<type=connect-metrics, client-id=([^:]+)><>([^:]+)"
    name: "kafka_connect_connect_metrics_$1_$2"   
  # Rules below match the Kafka Connect/MirrorMaker MBeans in the jconsole order
  # Worker task states
  - pattern: kafka.connect<type=connect-worker-metrics, connector=(\w+)><>(connector-destroyed-task-count|connector-failed-task-count|connector-paused-task-count|connector-running-task-count|connector-total-task-count|connector-unassigned-task-count)
    name: connect_worker_metrics_$1_$2
  # Task metrics
  - pattern: kafka.connect<type=connector-task-metrics, connector=(\w+), task=(\d+)><>(batch-size-avg|batch-size-max|offset-commit-avg-time-ms|offset-commit-failure-percentage|offset-commit-max-time-ms|offset-commit-success-percentage|running-ratio)
    name: connect_connector_task_metrics_$1_$3
    labels:
       task: "$2"
  # Source task metrics
  - pattern: kafka.connect<type=source-task-metrics, connector=(\w+), task=(\d+)><>(source-record-active-count|source-record-poll-total|source-record-write-total)
    name: connect_source_task_metrics_$1_$3
    labels:
       task: "$2"
  # Task errors
  - pattern: kafka.connect<type=task-error-metrics, connector=(\w+), task=(\d+)><>(total-record-errors|total-record-failures|total-records-skipped|total-retries)
    name: connect_task_error_metrics_$1_$3
    labels:
      task: "$2"
  # CheckpointConnector metrics 
  - pattern: kafka.connect.mirror<type=MirrorCheckpointConnector, source=(.+), target=(.+), group=(.+), topic=(.+), partition=(\d+)><>(checkpoint-latency-ms)
    name: connect_mirror_mirrorcheckpointconnector_$6
    labels:
       source: "$1"
       target: "$2"
       group: "$3"
       topic: "$4"
       partition: "$5"
  # SourceConnector metrics
  - pattern: kafka.connect.mirror<type=MirrorSourceConnector, target=(.+), topic=(.+), partition=(\d+)><>(byte-rate|byte-count|record-age-ms|record-rate|record-count|replication-latency-ms)
    name: connect_mirror_mirrorsourceconnector_$4
    labels:
       target: "$1"
       topic: "$2"
       partition: "$3"

Then upload this yaml file in a secret (the following command, represents a trick to update an existing configmap)

oc create secret generic mm2-jmx-exporter --from-file=./mm2-jmx-exporter.yaml

Deploying on VM

On virtual machine, it is possible to deploy the Apache Kafka 2.4+ binary file and then use the command /opt/kafka/bin/connect-mirror-maker.sh with the good properties file as argument. We are documenting the VM installation process in this article.

Within a VM we can run multiple mirror maker instances. When needed we can add more VMs to scale horizontally.

Deploying Mirror Maker 2 on its own project

In this section we address another approach to, deploy a Kafka Connect cluster with Mirror Maker 2.0 connectors but without any local Kafka Cluster. The approach may be used with Event Streams on Cloud as backend Kafka cluster and Mirror Maker 2 for replication.

Using the Strimzi operator we need to define a Yaml file for the connector and white and black lists for the topics to replicate. Here is an example of such descriptor.

If we need to run a custom Mirror Maker 2, we have documented in the section above on how to use Dockerfile and properties file and deployment descriptor to do the deployment on kubernetes or OpenShift cluster.

Provisioning automation

For IT operation automation we can use Ansible to define a playbook to provision the Mirror Maker 2 environment. The Strimzi Ansible playbook repository containts playbook examples for creating cluster roles and service accounts and deploy operators.

The automation approach will include:

  • Deploy all cluster objects needed into a OpenShift cluster: Cluster Roles, Strimzi CRDs: Kafka, KafkaTopic, KafkaUser, Kafka connect, mirror maker(s).
  • Deploy all namespaced objects needed into an OpenShift namespace: Service Accounts, Cluster Role Bindings and Role Bindings, Cluster Operator deployment.
  • Deploy the kafka cluster if needed.
  • Deploy the different mirror maker 2 instances.

Typical errors in Mirror Maker 2 traces

  • Plugin class loader for connector: 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found.
    • This error message is a light issue in kafka 2.4 and does not impact the replication. In Kafka 2.5 this message is for DEBUG logs.
  • Error while fetching metadata with correlation id 2314 : {source.heartbeats=UNKNOWN_TOPIC_OR_PARTITION}:
    • Those messages may come from multiple reasons. One is that the named topic is not created. In Event Streams is the target cluster the topics may need to be created via CLI or User Interface. It can also being related to the fact the consumer polls on a topic that has just been created and the leader for this topic-partition is not yet available, you are in the middle of a leadership election.
    • The advertised listener may not be set or found.
  • Exception on not being able to create Log directory: do the following: export LOG_DIR=/tmp/logs
  • ERROR WorkerSourceTask{id=MirrorSourceConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages
  • ERROR WorkerSourceTask{id=MirrorSourceConnector-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

Some useful commands

  • Connect to local cluster: oc exec -ti eda-demo-24-cluster-kafka-0 bash
  • list the topics: ./kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --list
  • Get the description of the topics from one cluster:
 for t in $(./kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --list)
 do 
 ./kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --describe --topic $t
 done

Create MM2 topics manually

Here are some examples of command to create topic to the target cluster

If you want to delete the topic on your local cluster

 for t in $(/opt/kafka/bin/kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --list)
 do 
 ./kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --delete --topic $t
 done

To create the topics manually on the target cluster:exit

/opt/kafka/bin/kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --create --partitions 5 --topic mm2-offset-syncs.kafka-on-premise-cluster-source.internal

/opt/kafka/bin/kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --create  --partitions 5 --replication-factor 3 --topic mirrormaker2-cluster-status

/opt/kafka/bin/kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --create  --partitions 25 --replication-factor 3 --topic mirrormaker2-cluster-offsets

/opt/kafka/bin/kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --create  --partitions 1 --replication-factor 3  --topic mirrormaker2-cluster-configs

/opt/kafka/bin/kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --create  --partitions 1 --replication-factor 3  --topic heartbeats

/opt/kafka/bin/kafka-topics.sh --bootstrap-server eda-demo-24-cluster-kafka-bootstrap:9092 --create  --partitions 1 --replication-factor 1  --topic  event-streams-wdc.checkpoints.internal