What is Kafka?
Apache Kafka is a distributed streaming platform. The streams of records can be published and subscribed to registered topics.
Installing and Running Kafka
Follow the instructions here to install and run Kafka server. By default Kafka server runs at localhost:9092
Example
pom.xml<project .....> <modelVersion>4.0.0</modelVersion>
<groupId>com.logicbig.example</groupId> <artifactId>kafka-getting-started</artifactId> <version>1.0-SNAPSHOT</version>
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>8</source> <target>8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
The Producer
package com.logicbig.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ExampleProducer {
private final KafkaProducer<String, String> producer;
ExampleProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
public void sendMessage(String topic, String key, String value) {
System.out.printf("Sending message topic: %s, key: %s, value: %s%n", topic, key, value);
producer.send(new ProducerRecord<>(topic, key, value));
}
}
The Consumer
package com.logicbig.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.function.BiFunction;
public class ExampleConsumer {
public ExampleConsumer(String topic, BiFunction<String, String, Boolean> biFunction) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
out: while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(30));
for (ConsumerRecord<String, String> record : records){
if(biFunction.apply(record.key(), record.value())){
break out;
}
}
}
}
}
Main class
package com.logicbig.example;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class KafkaExampleMain {
public static void main(String[] args) throws InterruptedException {
final String topic = "my-test-topic";
int count = 5;
AtomicInteger atomicInteger = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(2);
//start consumer
executorService.execute(() -> {
new ExampleConsumer(topic, (key, value) -> {
System.out.printf("consumed: key = %s, value = %s%n", key, value);
return atomicInteger.incrementAndGet() == count;
});
});
//start producer
executorService.execute(() -> {
ExampleProducer exampleProducer = new ExampleProducer();
for (int i = 0; i < count; i++) {
exampleProducer.sendMessage(topic, Integer.toString(i), "message - " + i);
}
});
executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.MINUTES);
}
}
Sending message topic: my-test-topic, key: 0, value: message - 0 Sending message topic: my-test-topic, key: 1, value: message - 1 Sending message topic: my-test-topic, key: 2, value: message - 2 Sending message topic: my-test-topic, key: 3, value: message - 3 Sending message topic: my-test-topic, key: 4, value: message - 4 consumed: key = 0, value = message - 0 consumed: key = 1, value = message - 1 consumed: key = 2, value = message - 2 consumed: key = 3, value = message - 3 consumed: key = 4, value = message - 4
Example ProjectDependencies and Technologies Used: - kafka-clients 2.4.1 Apache Kafka
- JDK 8
- Maven 3.5.4
|
|