Close

Kafka - Understanding Topic Partitions

[Last Updated: Apr 29, 2020]

For each topic, Kafka maintains a partitioned storage (log) that looks like this:

Each partition is an ordered, immutable sequence of records.

The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

Producer can assign a partition id while sending a record (message) to the broker.

When a message arrives at Kafka server, the partition for the topic is selected and the record is placed at the end of the sequence, assigning the next sequential offset .

Normally a consumer will advance its offset linearly as it reads records.

Example

Run Kafka server as described here.

In this example we will create two topics with partition count 1 and 2.

Creating Topics using Admin API

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
    public static void main(String[] args) throws Exception {
        createTopic("example-topic-1", 1);
        createTopic("example-topic-2", 2);
    }

    private static void createTopic(String topicName, int numPartitions) throws Exception {
        Properties config = new Properties();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleHelper.BROKERS);
        AdminClient admin = AdminClient.create(config);

        //checking if topic already exists
        boolean alreadyExists = admin.listTopics().names().get().stream()
                                     .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
        if (alreadyExists) {
            System.out.printf("topic already exits: %s%n", topicName);
        } else {
            //creating new topic
            System.out.printf("creating topic: %s%n", topicName);
            NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
            admin.createTopics(Collections.singleton(newTopic)).all().get();
        }

        //describing
        System.out.println("-- describing topic --");
        admin.describeTopics(Collections.singleton(topicName)).all().get()
             .forEach((topic, desc) -> {
                 System.out.println("Topic: " + topic);
                 System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                         desc.partitions()
                             .stream()
                             .map(p -> Integer.toString(p.partition()))
                             .collect(Collectors.joining(",")));
             });
    }
}
creating topic: example-topic-1
-- describing topic --
Topic: example-topic-1
Partitions: 1, partition ids: 0
creating topic: example-topic-2
-- describing topic --
Topic: example-topic-2
Partitions: 2, partition ids: 0,1

A helper class

This utility class to create consumers and producers:

package com.logicbig.example;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Arrays;
import java.util.Properties;

public class ExampleHelper {
    public static final String BROKERS = "localhost:9092";

    public static KafkaProducer createProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKERS);
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<>(props);
    }

    public static KafkaConsumer<String, String> createConsumer(String topicName) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", BROKERS);
        props.setProperty("group.id", "testGroup");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));
        return consumer;
    }
}

Sending messages to the topic with 1 partition

public class PartitionExample1 {
    private static int PARTITION_COUNT = 1;
    private static String TOPIC_NAME = "example-topic-1";
    private static int MSG_COUNT = 4;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(PartitionExample1::startConsumer);
        executorService.execute(PartitionExample1::sendMessages);
        executorService.shutdown();
        executorService.awaitTermination(10, TimeUnit.MINUTES);
    }

    private static void startConsumer() {
        KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(TOPIC_NAME);
        int numMsgReceived = 0;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                numMsgReceived++;
                System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
            consumer.commitSync();
            if (numMsgReceived == MSG_COUNT * PARTITION_COUNT) {
                break;
            }
        }
    }

    private static void sendMessages() {
        KafkaProducer producer = ExampleHelper.createProducer();
        for (int i = 0; i < MSG_COUNT; i++) {
            for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
                String value = "message-" + i;
                String key = Integer.toString(i);
                System.out.printf("Sending message topic: %s, key: %s, value: %s, partition id: %s%n",
                        TOPIC_NAME, key, value, partitionId);
                producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value));
            }
        }
    }
}
Sending message topic: example-topic-1, key: 0, value: message - 0, partition: 0
Sending message topic: example-topic-1, key: 1, value: message - 1, partition: 0
Sending message topic: example-topic-1, key: 2, value: message - 2, partition: 0
Sending message topic: example-topic-1, key: 3, value: message - 3, partition: 0
consumed: key = 0, value = message - 0, partition id= 0, offset = 0
consumed: key = 1, value = message - 1, partition id= 0, offset = 1
consumed: key = 2, value = message - 2, partition id= 0, offset = 2
consumed: key = 3, value = message - 3, partition id= 0, offset = 3

Running above class one more time:

Sending message topic: example-topic-1, key: 0, value: message - 0, partition: 0
Sending message topic: example-topic-1, key: 1, value: message - 1, partition: 0
Sending message topic: example-topic-1, key: 2, value: message - 2, partition: 0
Sending message topic: example-topic-1, key: 3, value: message - 3, partition: 0
consumed: key = 0, value = message - 0, partition id= 0, offset = 4
consumed: key = 1, value = message - 1, partition id= 0, offset = 5
consumed: key = 2, value = message - 2, partition id= 0, offset = 6
consumed: key = 3, value = message - 3, partition id= 0, offset = 7

As seen above, message offset at consumer has increased linearly second time.

Sending messages to the topic with 2 partition

public class PartitionExample2 {
    private static int PARTITION_COUNT = 2;
    private static String TOPIC_NAME = "example-topic-2";
    private static int MSG_COUNT = 4;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(PartitionExample2::startConsumer);
        executorService.execute(PartitionExample2::sendMessages);
        executorService.shutdown();
        executorService.awaitTermination(10, TimeUnit.MINUTES);
    }

    private static void startConsumer() {
        KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(TOPIC_NAME);
        int numMsgReceived = 0;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                numMsgReceived++;
                System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
            consumer.commitSync();
            if (numMsgReceived == MSG_COUNT * PARTITION_COUNT) {
                break;
            }
        }
    }

    private static void sendMessages() {
        KafkaProducer producer = ExampleHelper.createProducer();
        for (int i = 0; i < MSG_COUNT; i++) {
            for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
                String value = "message-" + i;
                String key = Integer.toString(i);
                System.out.printf("Sending message topic: %s, key: %s, value: %s, partition id: %s%n",
                        TOPIC_NAME, key, value, partitionId);
                producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value));
            }
        }
    }
}
Sending message topic: example-topic-2, key: 0, value: message - 0, partition: 0
Sending message topic: example-topic-2, key: 0, value: message - 0, partition: 1
Sending message topic: example-topic-2, key: 1, value: message - 1, partition: 0
Sending message topic: example-topic-2, key: 1, value: message - 1, partition: 1
Sending message topic: example-topic-2, key: 2, value: message - 2, partition: 0
Sending message topic: example-topic-2, key: 2, value: message - 2, partition: 1
Sending message topic: example-topic-2, key: 3, value: message - 3, partition: 0
Sending message topic: example-topic-2, key: 3, value: message - 3, partition: 1
consumed: key = 0, value = message - 0, partition id= 0, offset = 0
consumed: key = 1, value = message - 1, partition id= 0, offset = 1
consumed: key = 2, value = message - 2, partition id= 0, offset = 2
consumed: key = 3, value = message - 3, partition id= 0, offset = 3
consumed: key = 0, value = message - 0, partition id= 1, offset = 0
consumed: key = 1, value = message - 1, partition id= 1, offset = 1
consumed: key = 2, value = message - 2, partition id= 1, offset = 2
consumed: key = 3, value = message - 3, partition id= 1, offset = 3

Running above class one more:

Sending message topic: example-topic-2, key: 0, value: message - 0, partition: 0
Sending message topic: example-topic-2, key: 0, value: message - 0, partition: 1
Sending message topic: example-topic-2, key: 1, value: message - 1, partition: 0
Sending message topic: example-topic-2, key: 1, value: message - 1, partition: 1
Sending message topic: example-topic-2, key: 2, value: message - 2, partition: 0
Sending message topic: example-topic-2, key: 2, value: message - 2, partition: 1
Sending message topic: example-topic-2, key: 3, value: message - 3, partition: 0
Sending message topic: example-topic-2, key: 3, value: message - 3, partition: 1
consumed: key = 0, value = message - 0, partition id= 0, offset = 4
consumed: key = 1, value = message - 1, partition id= 0, offset = 5
consumed: key = 2, value = message - 2, partition id= 0, offset = 6
consumed: key = 3, value = message - 3, partition id= 0, offset = 7
consumed: key = 0, value = message - 0, partition id= 1, offset = 4
consumed: key = 1, value = message - 1, partition id= 1, offset = 5
consumed: key = 2, value = message - 2, partition id= 1, offset = 6
consumed: key = 3, value = message - 3, partition id= 1, offset = 7

Not specifying partition from Producer

If we don't explicitly specify a partition on KafkaProducer side, one partition is automatically picked for the topic.

public class PartitionExample3 {
    private static String TOPIC_NAME = "example-topic-2";
    private static int MSG_COUNT = 4;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(PartitionExample3::startConsumer);
        executorService.execute(PartitionExample3::sendMessages);
        executorService.shutdown();
        executorService.awaitTermination(10, TimeUnit.MINUTES);
    }

    private static void startConsumer() {
        KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(TOPIC_NAME);
        int numMsgReceived = 0;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                numMsgReceived++;
                System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
            consumer.commitSync();
            if (numMsgReceived == MSG_COUNT ) {
                break;
            }
        }
    }

    private static void sendMessages() {
        KafkaProducer producer = ExampleHelper.createProducer();
        for (int i = 0; i < MSG_COUNT; i++) {
                String value = "message-" + i;
                String key = Integer.toString(i);
                System.out.printf("Sending message topic: %s, key: %s, value: %s, partition id: %s%n",
                        TOPIC_NAME, key, value, "Not specified");
                producer.send(new ProducerRecord<>(TOPIC_NAME, key, value));
        }
    }
}
Sending message topic: example-topic-2, key: 0, value: message-0, partition id: Not specified
Partitioned assigned: 0
Sending message topic: example-topic-2, key: 1, value: message-1, partition id: Not specified
Partitioned assigned: 1
Sending message topic: example-topic-2, key: 2, value: message-2, partition id: Not specified
Partitioned assigned: 0
Sending message topic: example-topic-2, key: 3, value: message-3, partition id: Not specified
Partitioned assigned: 1
consumed: key = 0, value = message-0, partition id= 0, offset = 8
consumed: key = 2, value = message-2, partition id= 0, offset = 9
consumed: key = 1, value = message-1, partition id= 1, offset = 8
consumed: key = 3, value = message-3, partition id= 1, offset = 9

Example Project

Dependencies and Technologies Used:

  • kafka-clients 2.4.1 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Kafka - Understanding Topic Partitions Select All Download
  • kafka-understanding-partitions
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • TopicCreator.java

    See Also