Following example shows how to commit offset synchronously.
KafkaConsumer defines following method:
public void commitSync()
This is a synchronous commit and will block until one of following things happens:
(1) the commit succeeds, or (2) an unrecoverable error is encountered (in which case it is thrown to the caller), or the (3) timeout specified by default.api.timeout.ms expires (in which case a TimeoutException is thrown to the caller).
commitSync with time out duration
public void commitSync(Duration timeout)
commitSync with specified offset
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)
Above method commits the specified offsets for the specified list of topics and partitions. See corresponding commitAsync with specified offset tutorial.
commitSync with specified offset and timeout duration
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout)
Example
Run Kafka server as described here
Example Config
package com.logicbig.example;
import java.util.Properties;
public class ExampleConfig {
public static final String BROKERS = "localhost:9092";
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKERS);
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", BROKERS);
props.setProperty("group.id", "testGroup");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
Creating example topic
Let's create a topic with one partition.
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;
public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic("example-topic-2020-5-29", 1);
}
private static void createTopic(String topicName, int numPartitions) throws Exception {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
AdminClient admin = AdminClient.create(config);
//checking if topic already exists
boolean alreadyExists = admin.listTopics().names().get().stream()
.anyMatch(existingTopicName -> existingTopicName.equals(topicName));
if (alreadyExists) {
System.out.printf("topic already exits: %s%n", topicName);
} else {
//creating new topic
System.out.printf("creating topic: %s%n", topicName);
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
//describing
System.out.println("-- describing topic --");
admin.describeTopics(Collections.singleton(topicName)).all().get()
.forEach((topic, desc) -> {
System.out.println("Topic: " + topic);
System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
desc.partitions()
.stream()
.map(p -> Integer.toString(p.partition()))
.collect(Collectors.joining(",")));
});
admin.close();
}
}
creating topic: example-topic-2020-5-29
-- describing topic --
Topic: example-topic-2020-5-29
Partitions: 1, partition ids: 0
Using commitSync()
package com.logicbig.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class CommitSyncExample {
private static String TOPIC_NAME = "example-topic-2020-5-29";
private static KafkaConsumer<String, String> consumer;
private static TopicPartition topicPartition;
public static void main(String[] args) throws Exception {
Properties consumerProps = ExampleConfig.getConsumerProps();
consumer = new KafkaConsumer<>(consumerProps);
topicPartition = new TopicPartition(TOPIC_NAME, 0);
consumer.assign(Collections.singleton(topicPartition));
printOffsets("before consumer loop", consumer, topicPartition);
sendMessages();
startConsumer();
}
private static void startConsumer() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
record.key(), record.value(), record.partition(), record.offset());
}
if (records.isEmpty()) {
System.out.println("-- terminating consumer --");
break;
}
printOffsets("before commitAsync() call", consumer, topicPartition);
consumer.commitSync();
printOffsets("after commitAsync() call", consumer, topicPartition);
}
printOffsets("after consumer loop", consumer, topicPartition);
}
private static void printOffsets(String message, KafkaConsumer<String, String> consumer,
TopicPartition topicPartition) {
Map<TopicPartition, OffsetAndMetadata> committed = consumer
.committed(new HashSet<>(Arrays.asList(topicPartition)));
OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
long position = consumer.position(topicPartition);
System.out
.printf("Offset info %s, Committed: %s, current position %s%n", message, offsetAndMetadata == null ? null :
offsetAndMetadata
.offset(), position);
}
private static void sendMessages() {
Properties producerProps = ExampleConfig.getProducerProps();
KafkaProducer producer = new KafkaProducer<>(producerProps);
for (int i = 0; i < 4; i++) {
String value = "message-" + i;
System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
producer.send(new ProducerRecord<>(TOPIC_NAME, value));
}
producer.flush();
producer.close();
}
}
Offset info before consumer loop, Committed: null, current position 0
Sending message topic: example-topic-2020-5-29, value: message-0
Sending message topic: example-topic-2020-5-29, value: message-1
Sending message topic: example-topic-2020-5-29, value: message-2
Sending message topic: example-topic-2020-5-29, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 0
consumed: key = null, value = message-1, partition id= 0, offset = 1
consumed: key = null, value = message-2, partition id= 0, offset = 2
consumed: key = null, value = message-3, partition id= 0, offset = 3
Offset info before commitAsync() call, Committed: null, current position 4
Offset info after commitAsync() call, Committed: 4, current position 4
-- terminating consumer --
Offset info after consumer loop, Committed: 4, current position 4
Running above class one more time:
Offset info before consumer loop, Committed: 4, current position 4
Sending message topic: example-topic-2020-5-29, value: message-0
Sending message topic: example-topic-2020-5-29, value: message-1
Sending message topic: example-topic-2020-5-29, value: message-2
Sending message topic: example-topic-2020-5-29, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 4
consumed: key = null, value = message-1, partition id= 0, offset = 5
consumed: key = null, value = message-2, partition id= 0, offset = 6
consumed: key = null, value = message-3, partition id= 0, offset = 7
Offset info before commitAsync() call, Committed: 4, current position 8
Offset info after commitAsync() call, Committed: 8, current position 8
-- terminating consumer --
Offset info after consumer loop, Committed: 8, current position 8
Example ProjectDependencies and Technologies Used: - kafka_2.12 2.5.0 Apache Kafka
- JDK 8
- Maven 3.5.4
|
|