Close

Kafka Manual Commit - CommitAsync With Callback and Specified Offset

[Last Updated: Aug 11, 2020]

Following examples shows how to commit offset asynchronously with a callback and with the specified offset.

KafkaConsumer defines following method:

public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

Where parameter 'offsets' is a map of offsets by partition with associate metadata. The callback is to recieve async commit callback (see last tutorial);

Example

Run Kafka server as described here

Example Config

package com.logicbig.example;

import java.util.Properties;

public class ExampleConfig {
    public static final String BROKERS = "localhost:9092";

    public static Properties getProducerProps() {
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKERS);
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

    public static Properties getConsumerProps() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", BROKERS);
        props.setProperty("group.id", "testGroup");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

Creating Example Topic

Let's create a topic with one partition

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
    public static void main(String[] args) throws Exception {
        createTopic("example-topic-2020-5-28c", 1);
    }

    private static void createTopic(String topicName, int numPartitions) throws Exception {
        Properties config = new Properties();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
        AdminClient admin = AdminClient.create(config);

        //checking if topic already exists
        boolean alreadyExists = admin.listTopics().names().get().stream()
                                     .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
        if (alreadyExists) {
            System.out.printf("topic already exits: %s%n", topicName);
        } else {
            //creating new topic
            System.out.printf("creating topic: %s%n", topicName);
            NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
            admin.createTopics(Collections.singleton(newTopic)).all().get();
        }

        //describing
        System.out.println("-- describing topic --");
        admin.describeTopics(Collections.singleton(topicName)).all().get()
             .forEach((topic, desc) -> {
                 System.out.println("Topic: " + topic);
                 System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                         desc.partitions()
                             .stream()
                             .map(p -> Integer.toString(p.partition()))
                             .collect(Collectors.joining(",")));
             });

        admin.close();
    }
}
creating topic: example-topic-2020-5-28c
-- describing topic --
Topic: example-topic-2020-5-28c
Partitions: 1, partition ids: 0

Using commitAsync(offsets, callback)

package com.logicbig.example;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

public class CommitAsyncExample {
    private static String TOPIC_NAME = "example-topic-2020-5-28c";
    private static KafkaConsumer<String, String> consumer;
    private static TopicPartition topicPartition;

    public static void main(String[] args) throws Exception {
        Properties consumerProps = ExampleConfig.getConsumerProps();
        consumer = new KafkaConsumer<>(consumerProps);
        topicPartition = new TopicPartition(TOPIC_NAME, 0);
        consumer.assign(Collections.singleton(topicPartition));
        printOffsets("before consumer loop");
        sendMessages();
        startConsumer();
    }

    private static void startConsumer() {

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                        record.key(), record.value(), record.partition(), record.offset());
                consumer.commitAsync(
                        Collections.singletonMap(topicPartition,  new OffsetAndMetadata(record.offset())),
                        (offsets, exception) -> {
                    System.out.printf("Callback, offset: %s, exception %s%n", offsets, exception);
                });
            }
            if (records.isEmpty()) {
                System.out.println("-- terminating consumer --");
                break;
            }
        }
        printOffsets("after consumer loop");
    }

    private static void printOffsets(String message) {
        Map<TopicPartition, OffsetAndMetadata> committed = consumer
                .committed(new HashSet<>(Arrays.asList(topicPartition)));
        OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
        long position = consumer.position(topicPartition);
        System.out
                .printf("Offset info %s, Committed: %s, current position %s%n", message,
                        offsetAndMetadata == null ? null : offsetAndMetadata
                        .offset(), position);
    }

    private static void sendMessages() {
        Properties producerProps = ExampleConfig.getProducerProps();
        KafkaProducer producer = new KafkaProducer<>(producerProps);
        for (int i = 0; i < 4; i++) {
            String value = "message-" + i;
            System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
            producer.send(new ProducerRecord<>(TOPIC_NAME, value));
        }
        producer.flush();
        producer.close();
    }
}
Offset info before consumer loop, Committed: null, current position 0
Sending message topic: example-topic-2020-5-28c, value: message-0
Sending message topic: example-topic-2020-5-28c, value: message-1
Sending message topic: example-topic-2020-5-28c, value: message-2
Sending message topic: example-topic-2020-5-28c, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 0
consumed: key = null, value = message-1, partition id= 0, offset = 1
consumed: key = null, value = message-2, partition id= 0, offset = 2
consumed: key = null, value = message-3, partition id= 0, offset = 3
Callback, offset: {example-topic-2020-5-28c-0=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}}, exception null
Callback, offset: {example-topic-2020-5-28c-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}, exception null
Callback, offset: {example-topic-2020-5-28c-0=OffsetAndMetadata{offset=2, leaderEpoch=null, metadata=''}}, exception null
Callback, offset: {example-topic-2020-5-28c-0=OffsetAndMetadata{offset=3, leaderEpoch=null, metadata=''}}, exception null
-- terminating consumer --
Offset info after consumer loop, Committed: 3, current position 4

Running one more time

Offset info before consumer loop, Committed: 3, current position 3
Sending message topic: example-topic-2020-5-28c, value: message-0
Sending message topic: example-topic-2020-5-28c, value: message-1
Sending message topic: example-topic-2020-5-28c, value: message-2
Sending message topic: example-topic-2020-5-28c, value: message-3
consumed: key = null, value = message-3, partition id= 0, offset = 3
consumed: key = null, value = message-0, partition id= 0, offset = 4
consumed: key = null, value = message-1, partition id= 0, offset = 5
consumed: key = null, value = message-2, partition id= 0, offset = 6
consumed: key = null, value = message-3, partition id= 0, offset = 7
Callback, offset: {example-topic-2020-5-28c-0=OffsetAndMetadata{offset=3, leaderEpoch=null, metadata=''}}, exception null
Callback, offset: {example-topic-2020-5-28c-0=OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''}}, exception null
Callback, offset: {example-topic-2020-5-28c-0=OffsetAndMetadata{offset=5, leaderEpoch=null, metadata=''}}, exception null
Callback, offset: {example-topic-2020-5-28c-0=OffsetAndMetadata{offset=6, leaderEpoch=null, metadata=''}}, exception null
Callback, offset: {example-topic-2020-5-28c-0=OffsetAndMetadata{offset=7, leaderEpoch=null, metadata=''}}, exception null
-- terminating consumer --
Offset info after consumer loop, Committed: 7, current position 8

Example Project

Dependencies and Technologies Used:

  • kafka_2.12 2.5.0 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Kafka Manual Commit - CommitAsync With Callback and Specified Offset Select All Download
  • kafka-manual-commit-async-callback-specified-offset-example
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • CommitAsyncExample.java

    See Also