Close

Kafka - Manually Assign Partition To A Consumer

[Last Updated: Apr 6, 2020]

A topic partition can be assigned to a consumer by calling KafkaConsumer#assign()

public void assign(java.util.Collection<TopicPartition> partitions)

Note that KafkaConsumer#assign() and KafkaConsumer#subscribe() cannot be used together.

Example

Run Kafka server as described here.

Creating a Topic with multiple partitions

public class TopicCreator {
  public static void main(String[] args) throws Exception {
      createTopic("example-topic", 3);
  }

  private static void createTopic(String topicName, int numPartitions) throws Exception {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleHelper.BROKERS);
      AdminClient admin = AdminClient.create(config);

      //checking if topic already exists
      boolean alreadyExists = admin.listTopics().names().get().stream()
                                   .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
      if (alreadyExists) {
          System.out.printf("topic already exits: %s%n", topicName);
      } else {
          //creating new topic
          System.out.printf("creating topic: %s%n", topicName);
          NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
          admin.createTopics(Collections.singleton(newTopic)).all().get();
      }

      //describing
      System.out.println("-- describing topic --");
      admin.describeTopics(Collections.singleton(topicName)).all().get()
           .forEach((topic, desc) -> {
               System.out.println("Topic: " + topic);
               System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                       desc.partitions()
                           .stream()
                           .map(p -> Integer.toString(p.partition()))
                           .collect(Collectors.joining(",")));
           });

      admin.close();
  }
}
creating topic: example-topic
-- describing topic --
Topic: example-topic
Partitions: 3, partition ids: 0,1,2

Helper class

public class ExampleHelper {
  public static final String BROKERS = "localhost:9092";

  public static Properties getProducerProps() {
      Properties props = new Properties();
      props.put("bootstrap.servers", BROKERS);
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      return props;
  }

  public static Properties getConsumerProps(String topicName) {
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", BROKERS);
      props.setProperty("group.id", "testGroup");
      props.setProperty("enable.auto.commit", "false");
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      return props;
  }
}

Using assign()

public class PartitionAssignmentExample {
  private static int PARTITION_COUNT = 3;
  private static String TOPIC_NAME = "example-topic";
  private static int MSG_COUNT = 4;

  public static void main(String[] args) throws Exception {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      executorService.execute(PartitionAssignmentExample::startConsumer);
      executorService.execute(PartitionAssignmentExample::sendMessages);
      executorService.shutdown();
      executorService.awaitTermination(10, TimeUnit.MINUTES);
  }

  private static void startConsumer() {
      Properties consumerProps = ExampleHelper.getConsumerProps(TOPIC_NAME);
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
      //don't call consumer#subscribe()
      //assigning partition-id=1
      consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 1)));
      int numMsgReceived = 0;
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
          for (ConsumerRecord<String, String> record : records) {
              numMsgReceived++;
              System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                      record.key(), record.value(), record.partition(), record.offset());
          }
          consumer.commitSync();
          if (numMsgReceived == MSG_COUNT) {
              break;
          }
      }
  }

  private static void sendMessages() {
      Properties producerProps = ExampleHelper.getProducerProps();
      KafkaProducer producer = new KafkaProducer<>(producerProps);;
      for (int i = 0; i < MSG_COUNT; i++) {
          for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
              String value = "message-" + i;
              String key = Integer.toString(i);
              System.out.printf("Sending message topic: %s, key: %s, value: %s, partition id: %s%n",
                      TOPIC_NAME, key, value, partitionId);
              producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value));
          }
      }
  }
}
Sending message topic: example-topic, key: 0, value: message-0, partition id: 0
Sending message topic: example-topic, key: 0, value: message-0, partition id: 1
Sending message topic: example-topic, key: 0, value: message-0, partition id: 2
Sending message topic: example-topic, key: 1, value: message-1, partition id: 0
Sending message topic: example-topic, key: 1, value: message-1, partition id: 1
Sending message topic: example-topic, key: 1, value: message-1, partition id: 2
Sending message topic: example-topic, key: 2, value: message-2, partition id: 0
Sending message topic: example-topic, key: 2, value: message-2, partition id: 1
Sending message topic: example-topic, key: 2, value: message-2, partition id: 2
Sending message topic: example-topic, key: 3, value: message-3, partition id: 0
Sending message topic: example-topic, key: 3, value: message-3, partition id: 1
Sending message topic: example-topic, key: 3, value: message-3, partition id: 2
consumed: key = 0, value = message-0, partition id= 1, offset = 4
consumed: key = 1, value = message-1, partition id= 1, offset = 5
consumed: key = 2, value = message-2, partition id= 1, offset = 6
consumed: key = 3, value = message-3, partition id= 1, offset = 7

Example Project

Dependencies and Technologies Used:

  • kafka-clients 2.4.1 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Assigning Partition To A Consumer Select All Download
  • kafka-consumer-partitions-assignment
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • PartitionAssignmentExample.java

    See Also