Apache Kafka without SpringBoot

Apache Kafka without SpringBoot..

As a follow up to my earlier blog on Apache kafka with SpringBoot my friends and colleagues wanted me to put a small sample of program where someone want to still integrate the Spring application to Kafka but without SprintBoot.

So it is… a small sample code.

Its as easy as adding spring kafka jars to project classpath and writing a small piece of code similar to the one we wrote for SpringBoot.

For code explanation, please refer my example for “Kafka with SpringBoot”

Libraries / Pre-requisite

  • Spring 1.5 core libraries
  • kafka-clients-2.0.1.jar
  • kafka-streams-2.0.1.jar
  • spring-kafka-2.2.9.RELEASE.jar

That’s what all you need to let your spring program to talk to a Kafka-Server to publish/subscribe and consume messages.

import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class MyKafkaConsumer {


    @Override
    public List<StaffResponseKafkaData> consumeMessageFromKafkaServer(String serverName) {
        List<StaffResponseKafkaData> listResponseData = new ArrayList<>();
        final Gson gson = new Gson();

        consumerRecords(gson, listResponseData, serverName, "groupName", "topicName");

        return listResponseData;
    }

    /**
     * consumerRecords by server, group, topin and add convert to data
     * @param gson lib to convert String
     * @param listResponseData listResponseData
     * @param server server
     * @param groupName groupName
     * @param topicName topicName
     */
    private void consumerRecords(Gson gson, List<MyObject> listResponseData, String server, String groupName, String topicName) {
        ConsumerFactory<String, Object> consumerFactory = getConsumerFactoryInstance(server, groupName);
        Consumer<String, Object> consumer = consumerFactory.createConsumer();
        consumer.subscribe(Collections.singletonList(topicName));

        int duration = 01;
        ConsumerRecords<String, Object> consumerRecords = consumer.poll(Duration.ofMinutes(duration));

        for (ConsumerRecord<String, Object> consumerRecord : consumerRecords) {
            MyObject responseValue = gson.fromJson(consumerRecord.value().toString(), StaffResponseKafkaData.class);
            listResponseData.add(responseValue);
        }

        consumer.close();
    }

    /**
     * Gets the consumer factory instance by server and group
     * @param serverName serverName
     * @param groupName groupName
     * @return the consumer factory instance
     */
    private ConsumerFactory<String, Object> getConsumerFactoryInstance(String serverName, String groupName) {
        Map<String, Object> configs = new java.util.HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverName);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
        configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
        
        return new DefaultKafkaConsumerFactory<>(configs);
    }
}
5200cookie-checkApache Kafka without SpringBoot

Leave a Reply

Your email address will not be published.