Apache Kafka – How to integrate with Spring Boot Rest API?

Apache Kafka – How to integrate with Spring Boot Rest API? Dear techies, very good morning to all of you. In my last post, In this particular topic, I will create a simple Spring Boot API to demonstrate following important aspect of Kafka tool.

What is Apache Kafka and how to install / start?

I covered a very basic introduction of Kafka setup in another blog @ How to setup and test apache kafka?

Because this blog is written that you already know basics and terminologies in Apache Kafka, I will not be going into very much details of that and will keep the focus only implementation/integration using Spring boot Rest API.

  • Producer API
  • Message Consumer/Listener
  • Message Poller
Integrate Kafka with Rest
Apache Kafka Cluster

Apache Kafka Producer API – Integrate Kafka with Rest

The Producer API allows an application to publish a stream of records to one or more Kafka topics.

Below example Spring Boot Rest API, provides 2 functions named publishMessage and publishMessageAndCheckStatus.

publishMessage function is a simply publishes the message to provided kafka topic as PathVariable in request. It doesn’t really takes care of the success/failure of the message published. Whereas,

publishMessageAndCheckStatus function sends the record and then check if the status was a success or error. And you can perform any action like logging, notifying to caller etc based on the response type of the execution.

In below example, KafkaTemplate keys are pulled from your project properties file available in spring boot application (see above github link for reference). You can also set these properties using created a factory object. I will use that in last example.

package com.hybriscx.demo.kafka.kafkademo.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.hybriscx.demo.kafka.kafkademo.KafkaAwareController;

@RestController
public class KafkaMessageProducer{
	
	@Autowired
	KafkaTemplate<String, String> kafkaTemplate;

	public KafkaTemplate<String, String> getKafkaTemplate() {
		return kafkaTemplate;
	}

	@RequestMapping(value = "/producer/publish-message/{topic}", method = { RequestMethod.POST, RequestMethod.GET })
	public String publishMessage(@PathVariable String topic, @RequestBody String data) {
		getKafkaTemplate().send(topic, data);
		return "Published succcessfully to Kafka";
	}
	
	@RequestMapping(value = "/producer/publish-message-status/{topic}", method = { RequestMethod.POST, RequestMethod.GET })
	public String pubishMessageAndCheckStatus(@PathVariable String topic, @RequestBody String data) {
		
		ListenableFuture<SendResult<String, String>> future = getKafkaTemplate().send(topic, data);
				
		future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

		    @Override
		    public void onSuccess(SendResult<String, String> result) {
		        System.out.println("---success----"+result);
		    }

		    @Override
		    public void onFailure(Throwable ex) {
		        System.out.println("---error----"+ex);

		    }
		});
		return "here you can send required status to producer.";
	}
}

Apache Kafka Consumer – Integrate Kafka with Rest

The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.

It is an API where you can either poll the messages from a kafka topic or if you are using it as a pub/sub system, you can simply keep a listener active which consumes any publishing event and process the received message.

In below, I will share 2 simple code examples for polling of the message and another for simply listening to the publish event occurred in a topic and then consume and process it as per your need.

Consumer with Listener : In below example, I have created a simple service which is listening to topic “sampleTopic” and printing the messages to the console.

Below is a very simple example using @KafkaListener annotation, same can be achieved by using consumer factory and listener factory if you need more control on your program.

package com.hybriscx.demo.kafka.kafkademo.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaMessageConsumer {

	@KafkaListener(topics = {"sampleTopic"})
	public void listenMessage(String message) {
		System.out.println("Below message is recieved from configured topic... Use this to send push notification or elsewhere.");
		System.out.println(message);
	}
}

Poll messages from a Kafka topic – Integrate Kafka with Rest

Consumer with Poller : In below example, Below API works as an async poller where it polls the messages for a apache topic. If you look at it, you will realize it is a simple RestController using kafka lib to poll messages. This can be used in your application on a bell icon or showing message history to users.

The parameter we pass, poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.

Please give an extra notice to ConsumerFactory object created where I am providing various values like Kafka URL, your message deserializer type etc.

package com.hybriscx.demo.kafka.kafkademo.poller;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import com.hybriscx.demo.kafka.kafkademo.KafkaAwareController;


@RestController
public class KafkaMessagePoller {

	/**
	 * Consume message. This controller function polls the messages for a topic available in Kafka.
	 *
	 * @param topic the topic
	 * @return the string
	 */
	@RequestMapping(value = "/producer/consume-message/{topic}", method = { RequestMethod.GET })
	@ResponseBody
	public String consumeMessage(@PathVariable String topic) {
		
		ConsumerFactory<String, Object> consumerFactory = getConsumerFactoryInstance();

		Consumer<String, Object> consumer = consumerFactory.createConsumer();
		
		consumer.subscribe(Collections.singletonList("sampleTopic"));
		
		// poll messages from last 10 days
		ConsumerRecords<String, Object> consumerRecords = consumer.poll(Duration.ofDays(10));

		// print on console or send back as a string/json. Feel free to change controller function implementation for ResponseBody
		consumerRecords.forEach(action -> {
			System.out.println(action.value());
		});
		
		return  "success";
	}
	
	public ConsumerFactory<String, Object> getConsumerFactoryInstance() {
		Map<String, Object> configs = new java.util.HashMap<>();
		configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		configs.put(ConsumerConfig.GROUP_ID_CONFIG, "anyIdForGroup");
		configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
		configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
		ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(configs);
		return consumerFactory;
	}
}

Hope this article will be helpful for you in building a rest API which can connect to Apache Kafka topic as Producer, Consumer and Poller.

Thanks!

2810cookie-checkApache Kafka – How to integrate with Spring Boot Rest API?

Leave a Reply

Your email address will not be published.