Prerequisite
A basic understanding about Apache Kafka. Please go through the introduction once, should you need it.
Installation
Untar the tar file that is downloaded.
tar -xzf kafka_2.11-2.1.0.tgz
Run The Server
cd kafka_2.11-2.1.0
Start the zookeeper first
bin/zookeeper-server-start.sh config/zookeeper.properties
Zookeeper would be running at 2181 port. You can check that in the following log:
[2018-12-25 14:32:23,634] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
Start the Kafka Server in another tab.
bin/kafka-server-start.sh config/server.properties
Kakfa server would be waiting for connection at 9092.
[2018-12-25 14:35:40,373] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
You can see the kafka config values printed in the logs, a portion of that is shown here:
[2018-12-25 14:35:39,962] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
…………………………………………….
Creating a Topic
In another command tab, run the following command:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myFirstTopic
You should see following message after creating the topic:
Created topic “myFirstTopic”.
Creating Console Producer
Run the following command:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFirstTopic
You should be able to see a kafka command prompt there.
Write some messages there.
Creating Kafka Console Consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myFirstTopic --from-beginning
Run the above command in a new tab and you should be able to see all the messages that were produced before:
Kakfa Java Producer
We have seen how to produce and consume records using Kafka console producer and consumer. We will now produce and consume using kafka java client.
To demonstrate the Kafka in Java, we will create a dropwizard project
We will follow the following steps:
- Create a dropwizard project
- Add dependency of kafka client in pom.xml
- Create an API to post data, let’s name it /bulk-light
- bulk-light will put data in Kafka Queue which will then be consumed by a kafka consumer in same project
Set Up
Set up a maven project. Use the following dependency to add kafka java client:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.0</version>
</dependency>
Kafka Producer
package com.wp.npe.client; | |
import java.util.Properties; | |
import com.sun.tools.internal.xjc.reader.xmlschema.bindinfo.BIConversion; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
public class KafkaProducerAPI { | |
Properties props = new Properties(); | |
public static final String TOPIC = "bulk"; | |
public KafkaProducerAPI() { | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("acks", "all"); | |
props.put("retries", 0); | |
props.put("batch.size", 16384); | |
props.put("linger.ms", 1); | |
props.put("buffer.memory", 33554432); | |
props.put("key.serializer", | |
"org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", | |
"org.apache.kafka.common.serialization.StringSerializer"); | |
} | |
public void send(String key, String value){ | |
Producer<String, String> producer = new KafkaProducer<String, String>(props); | |
producer.send(new ProducerRecord<String, String>(TOPIC, | |
key, value)); | |
System.out.println("Message sent successfully"); | |
producer.close(); | |
} | |
} |
Kafka Consumer
package com.wp.npe.client; | |
import com.google.common.util.concurrent.Runnables; | |
import com.google.gson.Gson; | |
import com.google.inject.Guice; | |
import com.google.inject.Inject; | |
import com.google.inject.Injector; | |
import com.wp.npe.NpeModule; | |
import com.wp.npe.core.KafkaHelper; | |
import com.wp.npe.db.dao.StudentDao; | |
import com.wp.npe.db.entities.Student; | |
import com.wp.npe.models.Bulk; | |
import io.dropwizard.hibernate.UnitOfWork; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import java.util.Arrays; | |
import java.util.Properties; | |
public class KafkaConsumerAPI implements Runnable { | |
Properties props = new Properties(); | |
KafkaConsumer<String, String> consumer; | |
StudentDao studentDao; | |
@Inject | |
public KafkaConsumerAPI(StudentDao studentDao) { | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("group.id", "test"); | |
props.put("enable.auto.commit", "true"); | |
props.put("auto.commit.interval.ms", "1000"); | |
props.put("session.timeout.ms", "30000"); | |
props.put("key.deserializer", | |
"org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", | |
"org.apache.kafka.common.serialization.StringDeserializer"); | |
consumer = new KafkaConsumer | |
<String, String>(props); | |
consumer.subscribe(Arrays.asList(KafkaProducerAPI.TOPIC)); | |
this.studentDao = studentDao; | |
} | |
public void run(){ | |
System.out.println("————————-Consumer———————-"); | |
while (true) { | |
ConsumerRecords<String, String> records = consumer.poll(100); | |
for (ConsumerRecord<String, String> record : records) { | |
System.out.printf("Consumer is working : offset = %d, key = %s, value = %s\n", | |
record.offset(), record.key(), record.value()); | |
processInputAsync(record.value()); | |
} | |
} | |
} | |
} |
Initialising Kafka Consumer
package com.wp.npe; | |
import com.google.inject.Guice; | |
import com.google.inject.Inject; | |
import com.google.inject.Injector; | |
import com.hubspot.dropwizard.guicier.GuiceBundle; | |
import com.wp.npe.client.KafkaConsumerAPI; | |
import com.wp.npe.client.KafkaProducerAPI; | |
import com.wp.npe.core.KafkaHelper; | |
import com.wp.npe.db.dao.BulkInputDao; | |
import com.wp.npe.db.dao.StudentDao; | |
import com.wp.npe.db.entities.Student; | |
import io.dropwizard.Application; | |
import io.dropwizard.db.PooledDataSourceFactory; | |
import io.dropwizard.hibernate.HibernateBundle; | |
import io.dropwizard.hibernate.ScanningHibernateBundle; | |
import io.dropwizard.hibernate.UnitOfWorkAwareProxyFactory; | |
import io.dropwizard.setup.Bootstrap; | |
import io.dropwizard.setup.Environment; | |
public class NpeTutorial extends Application<NpeConfiguration> { | |
private GuiceBundle<NpeConfiguration> guiceBundle; | |
private static HibernateBundle<NpeConfiguration> hibernateBundle = | |
new ScanningHibernateBundle<NpeConfiguration>("com.wp") { | |
public PooledDataSourceFactory getDataSourceFactory(NpeConfiguration sessionConfiguration) { | |
return sessionConfiguration.getDataSourceFactory(); | |
} | |
}; | |
public static void main(final String[] args) throws Exception { | |
new NpeTutorial().run(args); | |
StudentDao dao = new StudentDao(hibernateBundle.getSessionFactory()); | |
KafkaConsumerAPI kafkaConsumerAPI = new UnitOfWorkAwareProxyFactory(hibernateBundle) | |
.create(KafkaConsumerAPI.class, StudentDao.class, dao); | |
Thread th = new Thread(kafkaConsumerAPI); | |
th.start(); | |
} | |
@Override | |
public String getName() { | |
return "npe-tutorials"; | |
} | |
@Override | |
public void initialize(final Bootstrap<NpeConfiguration> bootstrap) { | |
guiceBundle = GuiceBundle.defaultBuilder(NpeConfiguration.class) | |
.modules(new NpeModule(hibernateBundle)) | |
.enableGuiceEnforcer(false) | |
.build(); | |
bootstrap.addBundle(hibernateBundle); | |
bootstrap.addBundle(guiceBundle); | |
} | |
@Override | |
public void run(final NpeConfiguration configuration, | |
final Environment environment) { | |
System.out.println("NpeConfiguration : "+configuration.toString()); | |
// TODO: implement application | |
} | |
} |
Download the demo project from github
Please download the demo project using the link below. Run it on your local to understand it better. I will write another blog post to explain the demo project.
https://github.com/rohitsingh20122992/kafka-demo
Further Reading:
https://www.cloudkarafka.com/blog/2016-11-30-part1-kafka-for-beginners-what-is-apache-kafka.html
https://github.com/rohitsingh20122992/kafka-demo