From Local cluster to Event Streams
Scenario 1: From Kafka local as source to Event Streams on Cloud as Target
The test scenario goal is to send the product definitions in the local products
topic and then start mirror maker to see the data replicated to the source.products
topic in Event Streams cluster.
- Set the environment variables in
setenv.sh
script for the source broker to be your local cluster, and the target to be event streams. Be sure to also set Event Streams APIKEY:
export KAFKA_SOURCE_BROKERS=kafka1:9092,kafka2:9093,kafka3:9094 export KAFKA_TARGET_BROKERS=broker-3-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-1-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-0-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-5-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-2-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-4-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093 export KAFKA_TARGET_APIKEY="<password attribut in event streams credentials>"
-
It may be needed to create the topics in the target cluster. This depends if mirror maker 2.0 is able to access the AdminClient API. When defining APIKEy in Event streams you can have an admin, write or read access. So for Mirror Maker to create topics automatically it needs admin role.
-
Send some products data to this topic. For that we use a docker python image. The docker file to build this image is
python-kafka/Dockerfile-python
so the command to build this image (if you change the image name be sure to use the new name in future command) is:docker build -f Dockerfile-python -t jbcodeforce/python37 .
Once the image is built, start the python environment with the following commands:
source ./setenv.sh docker run -ti -v $(pwd):/home --rm -e KAFKA_BROKERS=$KAFKA_SOURCE_BROKERS --network kafkanet jbcodeforce/python37 bash
In this isolated python container bash shell, do the following command to send the first five products:
$ echo $KAFKA_BROKERS kafka1:9092,kafka2:9093,kafka3:9094 $ python SendProductToKafka.py ./data/products.json [KafkaProducer] - {'bootstrap.servers': 'kafka1:9092,kafka2:9093,kafka3:9094', 'group.id': 'ProductsProducer'} {'product_id': 'P01', 'description': 'Carrots', 'target_temperature': 4, 'target_humidity_level': 0.4, 'content_type': 1} {'product_id': 'P02', 'description': 'Banana', 'target_temperature': 6, 'target_humidity_level': 0.6, 'content_type': 2} {'product_id': 'P03', 'description': 'Salad', 'target_temperature': 4, 'target_humidity_level': 0.4, 'content_type': 1} {'product_id': 'P04', 'description': 'Avocado', 'target_temperature': 6, 'target_humidity_level': 0.4, 'content_type': 1} {'product_id': 'P05', 'description': 'Tomato', 'target_temperature': 4, 'target_humidity_level': 0.4, 'content_type': 2} [KafkaProducer] - Message delivered to products [0] [KafkaProducer] - Message delivered to products [0] [KafkaProducer] - Message delivered to products [0] [KafkaProducer] - Message delivered to products [0] [KafkaProducer] - Message delivered to products [0]
- To validate the data are in the source topic we can use the kafka console consumer. Here are the basic commands:
docker run -ti -v $(pwd):/home --network kafkanet strimzi/kafka:latest-kafka-2.4.0 bash $ cd bin $ ./kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic products --from-beginning
- Define the event streams cluster properties file for the different Kafka tool commands. Set the password attribute of the
jaas.config
to match Event Streams APIKEY. Theeventstream.properties
file looks like:
bootstrap.servers=broker-3-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-1-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-0-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-5-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-2-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093,broker-4-qnprtqnp7hnkssdz.kafka.svc01.us-east.eventstreams.cloud.ibm.com:9093 security.protocol=SASL_SSL ssl.protocol=TLSv1.2 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password=....;
- Restart the
kafka-console-consumer
with the bootstrap URL to access to Event Streams and with the replicated topic name:source.products
. Use the previously created properties file to get authentication properties so the command looks like:
source /home/setenv.sh ./kafka-console-consumer.sh --bootstrap-server $KAFKA_TARGET_BROKERS --consumer.config /home/eventstream.properties --topic source.products --from-beginning
- Now we are ready to start Mirror Maker 2.0, close to the local cluster, which is your laptop, using yet another docker image:
docker run -ti -v $(pwd):/home --network kafkanet strimzi/kafka:latest-kafka-2.4.0 bash $ /home/local-cluster/launchMM2.sh
This launchMM2.sh
script is updating a template properties file with the values of the environment variables and calls with this updated file: /opt/kafka/bin/connect-mirror-maker.sh mm2.properties
The trace includes a ton of messages, which displays different Kafka connect consumers and producers, workers and tasks. The logs can be found in the /tmp/logs
folder within the docker container. The table includes some of the elements of this configuration:
Name | Description |
---|---|
Worker clientId=connect-2, groupId=target-mm2 | Herder for target cluster topics but reading source topic |
Producer clientId=producer-1 | Producer to taget cluster |
Consumer clientId=consumer-target-mm2-1, groupId=target-mm2] | Subscribed to 25 partition(s): mm2-offsets.target.internal-0 to 24 |
Consumer clientId=consumer-target-mm2-2, groupId=target-mm2] | Subscribed to 5 partition(s): mm2-status.target.internal-0 to 4 |
Consumer clientId=consumer-target-mm2-3, groupId=target-mm2] | Subscribed to partition(s): mm2-configs.target.internal-0 |
Worker clientId=connect-2, groupId=target-mm2 . Starting connectors and tasks using config offset 6. | This trace shows mirror maker will start to consume message from the offset 6. A previous run has already committed the offset for this client id. This illustrate a Mirror Maker restarts |
Starting connector MirrorHeartbeatConnector and Starting task MirrorHeartbeatConnector-0 | |
Starting connector MirrorCheckpointConnector | |
Starting connector MirrorSourceConnector |
As expected, in the consumer console we can see the 5 product messages arriving to the source.topics
after the replication complete.
{'bootstrap.servers': 'kafka1:9092,kafka2:9093,kafka3:9094', 'group.id': 'ProductsProducer'} {'product_id': 'P01', 'description': 'Carrots', 'target_temperature': 4, 'target_humidity_level': 0.4, 'content_type': 1} {'product_id': 'P02', 'description': 'Banana', 'target_temperature': 6, 'target_humidity_level': 0.6, 'content_type': 2} {'product_id': 'P03', 'description': 'Salad', 'target_temperature': 4, 'target_humidity_level': 0.4, 'content_type': 1} {'product_id': 'P04', 'description': 'Avocado', 'target_temperature': 6, 'target_humidity_level': 0.4, 'content_type': 1} {'product_id': 'P05', 'description': 'Tomato', 'target_temperature': 4, 'target_humidity_level': 0.4, 'content_type': 2}
Scenario 2: Run Mirror Maker 2 Cluster close to target cluster
This scenario is similar to the scenario 1 but Mirror Maker 2.0 now, runs within an OpenShift cluster in the same data center as Event Streams cluster, so closer to the target cluster:
We have created an Event Streams cluster on Washington DC data center. We have Strimzi operators deployed in Washington data center OpenShift Cluster (see this note to provision such environment).
Producers are running locally on the same OpenShift cluster, where vanilla Kafka 2.4 is running, or can run remotely using exposed Kafka brokers Openshift route. The black rectangles in the figure above represent those producers.
The goal is to replicate the products
topic from the left to the source.products
to the right.
What needs to be done:
- Get a OpenShift cluster in the same data center as Event Streams service: See this product introduction. It is used to deploy Mirror Maker 2, but for our test we use it as source cluster for replication too.
-
Get the API KEY with manager role for event streams cluster and define a kubernetes secret:
-
Run Mirror Maker 2 with the configuration as define in the file: ``. See next section for starting the MME or, read also some mirror maker 2 deployment details in this provisioning note.
- Start consumer on
source.products
topic - Run a producer to source topic named
products
Run mirror maker 2
- Create a topic to the target cluster:
mm2-offset-syncs.kafka-on-premise-cluster.internal
oc apply -f local-cluster/kafka-to-es-mm2.yml
A new pod is created or if you have an existing mirror maker 2 depoyed the new configuration is added to your cluster.
Run Consumer
To validate the replication works, we will connect a consumer to the source.products
topic on Event Streams. So we define a target cluster property file (eventstreams.properties
) like:
security.protocol=SASL_SSL ssl.protocol=TLSv1.2 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="am_...";
- Start the consumer on
source.products
topic running in Event Streams on the cloud: we use asetenv.sh
shell to export the needed environment variables
docker run -ti -v $(pwd):/home strimzi/kafka:latest-kafka-2.4.0 bash bash-4.2$ source /home/setenv.sh bash-4.2$ ./bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_TARGET_BROKERS --consumer.config /home/mirror-maker-2/eventstream.properties --topic source.products --from-beginning
Produce records to local cluster
- Start a producer to send product records to the source Kafka cluster. If you have done the scenario 1, the first product definitions may be already in the target cluster, so we can send a second batch of products using a second data file:
# Under the mirror-maker-2 folder export KAFKA_BROKERS="eda-demo-24-cluster-kafka-bootstrap-jb-kafka-strimzi.gse-eda-demos-fa9ee67c9ab6a7791435450358e564cc-0001.us-east.containers.appdomain.cloud:443" export KAFKA_CERT="/home/ca.crt" docker run -ti -v $(pwd):/home --rm -e KAFKA_CERT=$KAFKA_CERT -e KAFKA_BROKERS=$KAFKA_BROKERS strimzi/kafka:latest-kafka-2.4.0 bash -c "/opt/kafka/bin/kafka-console-producer.sh --broker-list $KAFKA_BROKERS --producer.config /home/kafka-strimzi.properties --topic products"
As an alternate solution you can run the producer as a pod inside of the source cluster then send the product one by one using the console:
oc run kafka-producer -ti --image=strimzi/kafka:latest-kafka-2.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list eda-demo-24-cluster-kafka-bootstrap:9092 --topic products If you don t see a command prompt, try pressing enter. >{'product_id': 'P01', 'description': 'Carrots', 'target_temperature': 4, 'target_humidity_level': 0.4, 'content_type': 1} >{'product_id': 'P02', 'description': 'Banana', 'target_temperature': 6, 'target_humidity_level': 0.6, 'content_type': 2} >{'product_id': 'P03', 'description': 'Salad', 'target_temperature': 4, 'target_humidity_level': 0.4, 'content_type': 1} >{'product_id': 'P04', 'description': 'Avocado', 'target_temperature': 6, 'target_humidity_level': 0.4, 'content_type': 1} >{'product_id': 'P05', 'description': 'Tomato', 'target_temperature': 4, 'target_humidity_level': 0.4, 'content_type': 2}
Note
There is other solution to send records, like using a Kafka HTTP brigde and use curl post
commands.
- To validate the source
products
topic has records, start a consumer as pod on Openshift within the source Kafka cluster using the Strimzi/kafka image.
oc run kafka-consumer -ti --image=strimzi/kafka:latest-kafka-2.4.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic products --from-beginning