Apache Kafka Introductory Tutorial – Set up your first Kafka Producer and Consumer

Prerequisite

A basic understanding about Apache Kafka. Please go through the introduction once, should you need it.

Installation

Download the Kafka from here.

Untar the tar file that is downloaded.

tar -xzf kafka_2.11-2.1.0.tgz

Run The Server

cd kafka_2.11-2.1.0

Start the zookeeper first

bin/zookeeper-server-start.sh config/zookeeper.properties

Zookeeper would be running at 2181 port. You can check that in the following log:

[2018-12-25 14:32:23,634] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

Start the Kafka Server in another tab.

bin/kafka-server-start.sh config/server.properties 

Kakfa server would be waiting for connection at 9092.

[2018-12-25 14:35:40,373] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)    

You can see the kafka config values printed in the logs, a portion of that is shown here:

[2018-12-25 14:35:39,962] INFO KafkaConfig values:

advertised.host.name = null

advertised.listeners = null

advertised.port = null

…………………………………………….

Creating a Topic

In another command tab, run the following command:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myFirstTopic

You should see following message after creating the topic:

Created topic “myFirstTopic”.

Creating Console Producer

Run the following command:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFirstTopic

You should be able to see a kafka command prompt there.

Write some messages there.

Screen Shot 2018-12-25 at 2.48.05 PM

Creating Kafka Console Consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myFirstTopic --from-beginning

Run the above command in a new tab and you should be able to see all the messages that were produced before:

Screen Shot 2018-12-25 at 2.51.04 PM

Kakfa Java Producer

We have seen how to produce  and consume records using Kafka console producer and consumer. We will now produce and consume using kafka java client.

To demonstrate the Kafka in Java, we will create a dropwizard project

We will follow the following steps:

  1. Create a dropwizard project
  2. Add dependency of kafka client in pom.xml
  3. Create an API to post data, let’s name it /bulk-light
  4. bulk-light will put data in Kafka Queue which will then be consumed by a kafka consumer in same project

 

Set Up

Set up a maven project. Use the following dependency to add kafka java client:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.0</version>
</dependency>

Kafka Producer

package com.wp.npe.client;
import java.util.Properties;
import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerAPI {
Properties props = new Properties();
public static final String TOPIC = "bulk";
public KafkaProducerAPI() {
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
}
public void send(String key, String value){
Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(TOPIC,
key, value));
System.out.println("Message sent successfully");
producer.close();
}
}

Kafka Consumer

package com.wp.npe.client;
import com.google.common.util.concurrent.Runnables;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.wp.npe.NpeModule;
import com.wp.npe.core.KafkaHelper;
import com.wp.npe.db.dao.StudentDao;
import com.wp.npe.db.entities.Student;
import com.wp.npe.models.Bulk;
import io.dropwizard.hibernate.UnitOfWork;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerAPI implements Runnable {
Properties props = new Properties();
KafkaConsumer<String, String> consumer;
StudentDao studentDao;
@Inject
public KafkaConsumerAPI(StudentDao studentDao) {
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer
<String, String>(props);
consumer.subscribe(Arrays.asList(KafkaProducerAPI.TOPIC));
this.studentDao = studentDao;
}
public void run(){
System.out.println("————————-Consumer———————-");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumer is working : offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
processInputAsync(record.value());
}
}
}
}

Initialising Kafka Consumer

package com.wp.npe;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.hubspot.dropwizard.guicier.GuiceBundle;
import com.wp.npe.client.KafkaConsumerAPI;
import com.wp.npe.client.KafkaProducerAPI;
import com.wp.npe.core.KafkaHelper;
import com.wp.npe.db.dao.BulkInputDao;
import com.wp.npe.db.dao.StudentDao;
import com.wp.npe.db.entities.Student;
import io.dropwizard.Application;
import io.dropwizard.db.PooledDataSourceFactory;
import io.dropwizard.hibernate.HibernateBundle;
import io.dropwizard.hibernate.ScanningHibernateBundle;
import io.dropwizard.hibernate.UnitOfWorkAwareProxyFactory;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
public class NpeTutorial extends Application<NpeConfiguration> {
private GuiceBundle<NpeConfiguration> guiceBundle;
private static HibernateBundle<NpeConfiguration> hibernateBundle =
new ScanningHibernateBundle<NpeConfiguration>("com.wp") {
public PooledDataSourceFactory getDataSourceFactory(NpeConfiguration sessionConfiguration) {
return sessionConfiguration.getDataSourceFactory();
}
};
public static void main(final String[] args) throws Exception {
new NpeTutorial().run(args);
StudentDao dao = new StudentDao(hibernateBundle.getSessionFactory());
KafkaConsumerAPI kafkaConsumerAPI = new UnitOfWorkAwareProxyFactory(hibernateBundle)
.create(KafkaConsumerAPI.class, StudentDao.class, dao);
Thread th = new Thread(kafkaConsumerAPI);
th.start();
}
@Override
public String getName() {
return "npe-tutorials";
}
@Override
public void initialize(final Bootstrap<NpeConfiguration> bootstrap) {
guiceBundle = GuiceBundle.defaultBuilder(NpeConfiguration.class)
.modules(new NpeModule(hibernateBundle))
.enableGuiceEnforcer(false)
.build();
bootstrap.addBundle(hibernateBundle);
bootstrap.addBundle(guiceBundle);
}
@Override
public void run(final NpeConfiguration configuration,
final Environment environment) {
System.out.println("NpeConfiguration : "+configuration.toString());
// TODO: implement application
}
}
view raw NpeTutorial.java hosted with ❤ by GitHub

Download the demo project from github

Please download the demo project using the link below. Run it on your local to understand it better. I will write another blog post to explain the demo project.

https://github.com/rohitsingh20122992/kafka-demo


Further Reading:

Kafka Official Documentation

Dropwizard Introduction

https://www.cloudkarafka.com/blog/2016-11-30-part1-kafka-for-beginners-what-is-apache-kafka.html

https://github.com/rohitsingh20122992/kafka-demo

 

 

2 thoughts on “Apache Kafka Introductory Tutorial – Set up your first Kafka Producer and Consumer

Add yours

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Create a free website or blog at WordPress.com.

Up ↑

%d bloggers like this: