Apache Kafka Introductory Tutorial – Set up your first Kafka Producer and Consumer


A basic understanding about Apache Kafka. Please go through the introduction once, should you need it.


Download the Kafka from here.

Untar the tar file that is downloaded.

tar -xzf kafka_2.11-2.1.0.tgz

Run The Server

cd kafka_2.11-2.1.0

Start the zookeeper first

bin/zookeeper-server-start.sh config/zookeeper.properties

Zookeeper would be running at 2181 port. You can check that in the following log:

[2018-12-25 14:32:23,634] INFO binding to port (org.apache.zookeeper.server.NIOServerCnxnFactory)

Start the Kafka Server in another tab.

bin/kafka-server-start.sh config/server.properties 

Kakfa server would be waiting for connection at 9092.

[2018-12-25 14:35:40,373] INFO Awaiting socket connections on (kafka.network.Acceptor)    

You can see the kafka config values printed in the logs, a portion of that is shown here:

[2018-12-25 14:35:39,962] INFO KafkaConfig values:

advertised.host.name = null

advertised.listeners = null

advertised.port = null


Creating a Topic

In another command tab, run the following command:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myFirstTopic

You should see following message after creating the topic:

Created topic “myFirstTopic”.

Creating Console Producer

Run the following command:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFirstTopic

You should be able to see a kafka command prompt there.

Write some messages there.

Screen Shot 2018-12-25 at 2.48.05 PM

Creating Kafka Console Consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myFirstTopic --from-beginning

Run the above command in a new tab and you should be able to see all the messages that were produced before:

Screen Shot 2018-12-25 at 2.51.04 PM

Kakfa Java Producer

We have seen how to produce  and consume records using Kafka console producer and consumer. We will now produce and consume using kafka java client.

To demonstrate the Kafka in Java, we will create a dropwizard project

We will follow the following steps:

  1. Create a dropwizard project
  2. Add dependency of kafka client in pom.xml
  3. Create an API to post data, let’s name it /bulk-light
  4. bulk-light will put data in Kafka Queue which will then be consumed by a kafka consumer in same project


Set Up

Set up a maven project. Use the following dependency to add kafka java client:


Kafka Producer

package com.wp.npe.client;
import java.util.Properties;
import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerAPI {
Properties props = new Properties();
public static final String TOPIC = "bulk";
public KafkaProducerAPI() {
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
public void send(String key, String value){
Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(TOPIC,
key, value));
System.out.println("Message sent successfully");

Kafka Consumer

package com.wp.npe.client;
import com.google.common.util.concurrent.Runnables;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.wp.npe.NpeModule;
import com.wp.npe.core.KafkaHelper;
import com.wp.npe.db.dao.StudentDao;
import com.wp.npe.db.entities.Student;
import com.wp.npe.models.Bulk;
import io.dropwizard.hibernate.UnitOfWork;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerAPI implements Runnable {
Properties props = new Properties();
KafkaConsumer<String, String> consumer;
StudentDao studentDao;
public KafkaConsumerAPI(StudentDao studentDao) {
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
consumer = new KafkaConsumer
<String, String>(props);
this.studentDao = studentDao;
public void run(){
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumer is working : offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());

Initialising Kafka Consumer

package com.wp.npe;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.hubspot.dropwizard.guicier.GuiceBundle;
import com.wp.npe.client.KafkaConsumerAPI;
import com.wp.npe.client.KafkaProducerAPI;
import com.wp.npe.core.KafkaHelper;
import com.wp.npe.db.dao.BulkInputDao;
import com.wp.npe.db.dao.StudentDao;
import com.wp.npe.db.entities.Student;
import io.dropwizard.Application;
import io.dropwizard.db.PooledDataSourceFactory;
import io.dropwizard.hibernate.HibernateBundle;
import io.dropwizard.hibernate.ScanningHibernateBundle;
import io.dropwizard.hibernate.UnitOfWorkAwareProxyFactory;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
public class NpeTutorial extends Application<NpeConfiguration> {
private GuiceBundle<NpeConfiguration> guiceBundle;
private static HibernateBundle<NpeConfiguration> hibernateBundle =
new ScanningHibernateBundle<NpeConfiguration>("com.wp") {
public PooledDataSourceFactory getDataSourceFactory(NpeConfiguration sessionConfiguration) {
return sessionConfiguration.getDataSourceFactory();
public static void main(final String[] args) throws Exception {
new NpeTutorial().run(args);
StudentDao dao = new StudentDao(hibernateBundle.getSessionFactory());
KafkaConsumerAPI kafkaConsumerAPI = new UnitOfWorkAwareProxyFactory(hibernateBundle)
.create(KafkaConsumerAPI.class, StudentDao.class, dao);
Thread th = new Thread(kafkaConsumerAPI);
public String getName() {
return "npe-tutorials";
public void initialize(final Bootstrap<NpeConfiguration> bootstrap) {
guiceBundle = GuiceBundle.defaultBuilder(NpeConfiguration.class)
.modules(new NpeModule(hibernateBundle))
public void run(final NpeConfiguration configuration,
final Environment environment) {
System.out.println("NpeConfiguration : "+configuration.toString());
// TODO: implement application

Download the demo project from github

Please download the demo project using the link below. Run it on your local to understand it better. I will write another blog post to explain the demo project.


Further Reading:

Kafka Official Documentation

Dropwizard Introduction





Leave a Reply

Powered by WordPress.com.

Up ↑

%d bloggers like this: