Following examples shows how to commit offset asynchronously with a callback.
KafkaConsumer defines following method:
public void commitAsync(OffsetCommitCallback callback)
Where OffsetCommitCallback is an functional interface defined as:
package org.apache.kafka.clients.consumer;
....
public interface OffsetCommitCallback {
void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception);
}
The method onComplete() is a callback method the user can implement to provide asynchronous handling of commit request completion. This method will be called when the commit request sent to the server has been acknowledged. The parameter 'offsets' is the map of the offsets and associated metadata that this callback applies to. The parameter 'exception' is the exception thrown during processing of the request, or null if the commit completed successfully
Example
Run Kafka server as described here.
Example Config
package com.logicbig.example;
import java.util.Properties;
public class ExampleConfig {
public static final String BROKERS = "localhost:9092";
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKERS);
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", BROKERS);
props.setProperty("group.id", "testGroup");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
Creating Example Topic
Let's create a topic with one partition:
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;
public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic("example-topic-2020-5-28b", 1);
}
private static void createTopic(String topicName, int numPartitions) throws Exception {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
AdminClient admin = AdminClient.create(config);
//checking if topic already exists
boolean alreadyExists = admin.listTopics().names().get().stream()
.anyMatch(existingTopicName -> existingTopicName.equals(topicName));
if (alreadyExists) {
System.out.printf("topic already exits: %s%n", topicName);
} else {
//creating new topic
System.out.printf("creating topic: %s%n", topicName);
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
//describing
System.out.println("-- describing topic --");
admin.describeTopics(Collections.singleton(topicName)).all().get()
.forEach((topic, desc) -> {
System.out.println("Topic: " + topic);
System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
desc.partitions()
.stream()
.map(p -> Integer.toString(p.partition()))
.collect(Collectors.joining(",")));
});
admin.close();
}
}
creating topic: example-topic-2020-5-28b
-- describing topic --
Topic: example-topic-2020-5-28b
Using commitAsync(OffsetCommitCallback callback)
package com.logicbig.example;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class CommitAsyncCallbackExample {
private static String TOPIC_NAME = "example-topic-2020-5-28b";
private static KafkaConsumer<String, String> consumer;
private static TopicPartition topicPartition;
public static void main(String[] args) throws Exception {
Properties consumerProps = ExampleConfig.getConsumerProps();
consumer = new KafkaConsumer<>(consumerProps);
topicPartition = new TopicPartition(TOPIC_NAME, 0);
consumer.assign(Collections.singleton(topicPartition));
printOffsets("before consumer loop", consumer, topicPartition);
sendMessages();
startConsumer();
}
private static void startConsumer() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
record.key(), record.value(), record.partition(), record.offset());
}
if (records.isEmpty()) {
System.out.println("-- terminating consumer --");
break;
}
printOffsets("before commitAsync() call", consumer, topicPartition);
consumer.commitAsync((offsets, exception) -> {
System.out.printf("Callback, offset: %s, exception %s%n", offsets, exception);
});
printOffsets("after commitAsync() call", consumer, topicPartition);
}
printOffsets("after consumer loop", consumer, topicPartition);
}
private static void printOffsets(String message, KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
Map<TopicPartition, OffsetAndMetadata> committed = consumer
.committed(new HashSet<>(Arrays.asList(topicPartition)));
OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
long position = consumer.position(topicPartition);
System.out
.printf("Offset info %s, Committed: %s, current position %s%n", message,
offsetAndMetadata == null ? null : offsetAndMetadata
.offset(), position);
}
private static void sendMessages() {
Properties producerProps = ExampleConfig.getProducerProps();
KafkaProducer producer = new KafkaProducer<>(producerProps);
for (int i = 0; i < 4; i++) {
String value = "message-" + i;
System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
producer.send(new ProducerRecord<>(TOPIC_NAME, value));
}
producer.flush();
producer.close();
}
}
Offset info before consumer loop, Committed: null, current position 0
Sending message topic: example-topic-2020-5-28b, value: message-0
Sending message topic: example-topic-2020-5-28b, value: message-1
Sending message topic: example-topic-2020-5-28b, value: message-2
Sending message topic: example-topic-2020-5-28b, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 0
consumed: key = null, value = message-1, partition id= 0, offset = 1
consumed: key = null, value = message-2, partition id= 0, offset = 2
consumed: key = null, value = message-3, partition id= 0, offset = 3
Offset info before commitAsync() call, Committed: null, current position 4
Offset info after commitAsync() call, Committed: 4, current position 4
Callback, offset: {example-topic-2020-5-28b-0=OffsetAndMetadata{offset=4, leaderEpoch=0, metadata=''}}, exception null
-- terminating consumer --
Offset info after consumer loop, Committed: 4, current position 4
Running one more time:
Offset info before consumer loop, Committed: 4, current position 4
Sending message topic: example-topic-2020-5-28b, value: message-0
Sending message topic: example-topic-2020-5-28b, value: message-1
Sending message topic: example-topic-2020-5-28b, value: message-2
Sending message topic: example-topic-2020-5-28b, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 4
consumed: key = null, value = message-1, partition id= 0, offset = 5
consumed: key = null, value = message-2, partition id= 0, offset = 6
consumed: key = null, value = message-3, partition id= 0, offset = 7
Offset info before commitAsync() call, Committed: 4, current position 8
Offset info after commitAsync() call, Committed: 8, current position 8
Callback, offset: {example-topic-2020-5-28b-0=OffsetAndMetadata{offset=8, leaderEpoch=0, metadata=''}}, exception null
-- terminating consumer --
Offset info after consumer loop, Committed: 8, current position 8
Example ProjectDependencies and Technologies Used: - kafka_2.12 2.5.0 Apache Kafka
- JDK 8
- Maven 3.5.4
|
|