The Admin API supports managing and inspecting topics, brokers, acls, and other Kafka objects.
In this tutorial we will see getting started examples of how to use Kafka Admin API.
Example
Start Kafka server as describe here.
How to list Kafka configuration?
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ListingConfigs {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(config);
for (Node node : admin.describeCluster().nodes().get()) {
System.out.println("-- node: " + node.id() + " --");
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, "0");
DescribeConfigsResult dcr = admin.describeConfigs(Collections.singleton(cr));
dcr.all().get().forEach((k, c) -> {
c.entries()
.forEach(configEntry -> {System.out.println(configEntry.name() + "= " + configEntry.value());});
});
}
}
}
-- node: 0 -- advertised.host.name= null log.cleaner.min.compaction.lag.ms= 0 metric.reporters= quota.producer.default= 9223372036854775807 offsets.topic.num.partitions= 50 log.flush.interval.messages= 9223372036854775807 controller.socket.timeout.ms= 30000 auto.create.topics.enable= true log.flush.interval.ms= null principal.builder.class= null replica.socket.receive.buffer.bytes= 65536 min.insync.replicas= 1 replica.fetch.wait.max.ms= 500 num.recovery.threads.per.data.dir= 1 ssl.keystore.type= JKS password.encoder.iterations= 4096 sasl.mechanism.inter.broker.protocol= GSSAPI default.replication.factor= 1 ssl.truststore.password= null log.preallocate= false sasl.kerberos.principal.to.local.rules= DEFAULT fetch.purgatory.purge.interval.requests= 1000 ssl.endpoint.identification.algorithm= https replica.socket.timeout.ms= 30000 message.max.bytes= 1000012 transactional.id.expiration.ms= 604800000 transaction.state.log.replication.factor= 1 control.plane.listener.name= null num.io.threads= 8 sasl.login.refresh.buffer.seconds= 300 connections.max.reauth.ms= 0 connection.failed.authentication.delay.ms= 100 offsets.commit.required.acks= -1 log.flush.offset.checkpoint.interval.ms= 60000 delete.topic.enable= true quota.window.size.seconds= 1 ssl.truststore.type= JKS offsets.commit.timeout.ms= 5000 quota.window.num= 11 zookeeper.connect= localhost:2181 authorizer.class.name= password.encoder.secret= null log.cleaner.max.compaction.lag.ms= 9223372036854775807 num.replica.fetchers= 1 alter.log.dirs.replication.quota.window.size.seconds= 1 log.retention.ms= null alter.log.dirs.replication.quota.window.num= 11 log.roll.jitter.hours= 0 password.encoder.old.secret= null log.cleaner.enable= true offsets.load.buffer.size= 5242880 log.cleaner.delete.retention.ms= 86400000 ssl.client.auth= none controlled.shutdown.max.retries= 3 offsets.topic.replication.factor= 1 queued.max.requests= 500 transaction.state.log.min.isr= 1 log.cleaner.threads= 1 ssl.secure.random.implementation= null sasl.kerberos.service.name= null sasl.kerberos.ticket.renew.jitter= 0.05 socket.request.max.bytes= 104857600 ssl.trustmanager.algorithm= PKIX zookeeper.session.timeout.ms= 6000 log.retention.bytes= -1 sasl.jaas.config= null log.message.timestamp.type= CreateTime sasl.kerberos.min.time.before.relogin= 60000 zookeeper.set.acl= false connections.max.idle.ms= 600000 offsets.retention.minutes= 10080 max.connections= 2147483647 delegation.token.expiry.time.ms= 86400000 transaction.state.log.num.partitions= 50 replica.fetch.backoff.ms= 1000 inter.broker.protocol.version= 2.4-IV1 kafka.metrics.reporters= listener.security.protocol.map= PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL log.retention.hours= 168 num.partitions= 1 client.quota.callback.class= null broker.id.generation.enable= true listeners= null ssl.provider= null ssl.enabled.protocols= TLSv1.2,TLSv1.1,TLSv1 inter.broker.listener.name= null delete.records.purgatory.purge.interval.requests= 1 log.roll.ms= null alter.config.policy.class.name= null delegation.token.expiry.check.interval.ms= 3600000 ssl.cipher.suites= zookeeper.max.in.flight.requests= 10 log.flush.scheduler.interval.ms= 9223372036854775807 log.index.size.max.bytes= 10485760 ssl.keymanager.algorithm= SunX509 sasl.login.callback.handler.class= null security.inter.broker.protocol= PLAINTEXT replica.fetch.max.bytes= 1048576 sasl.server.callback.handler.class= null advertised.port= null log.cleaner.dedupe.buffer.size= 134217728 replica.high.watermark.checkpoint.interval.ms= 5000 replication.quota.window.size.seconds= 1 log.cleaner.io.buffer.size= 524288 sasl.kerberos.ticket.renew.window.factor= 0.8 create.topic.policy.class.name= null zookeeper.connection.timeout.ms= 6000 metrics.recording.level= INFO password.encoder.cipher.algorithm= AES/CBC/PKCS5Padding controlled.shutdown.retry.backoff.ms= 5000 security.providers= null log.roll.hours= 168 log.cleanup.policy= delete log.flush.start.offset.checkpoint.interval.ms= 60000 ssl.principal.mapping.rules= DEFAULT host.name= replica.selector.class= null log.roll.jitter.ms= null transaction.state.log.segment.bytes= 104857600 max.connections.per.ip= 2147483647 offsets.topic.segment.bytes= 104857600 background.threads= 10 quota.consumer.default= 9223372036854775807 request.timeout.ms= 30000 group.initial.rebalance.delay.ms= 0 log.message.format.version= 2.4-IV1 sasl.login.class= null log.index.interval.bytes= 4096 log.dir= /tmp/kafka-logs log.segment.bytes= 1073741824 log.cleaner.backoff.ms= 15000 offset.metadata.max.bytes= 4096 ssl.truststore.location= null replica.fetch.response.max.bytes= 10485760 group.max.session.timeout.ms= 1800000 ssl.keystore.password= null port= 9092 zookeeper.sync.time.ms= 2000 log.retention.minutes= null log.segment.delete.delay.ms= 60000 log.dirs= /tmp/kafka-logs controlled.shutdown.enable= true compression.type= producer max.connections.per.ip.overrides= log.message.timestamp.difference.max.ms= 9223372036854775807 sasl.login.refresh.min.period.seconds= 60 password.encoder.key.length= 128 sasl.login.refresh.window.factor= 0.8 kafka.metrics.polling.interval.secs= 10 transaction.abort.timed.out.transaction.cleanup.interval.ms= 60000 sasl.kerberos.kinit.cmd= /usr/bin/kinit log.cleaner.io.max.bytes.per.second= 1.7976931348623157E308 auto.leader.rebalance.enable= true leader.imbalance.check.interval.seconds= 300 log.cleaner.min.cleanable.ratio= 0.5 replica.lag.time.max.ms= 10000 max.incremental.fetch.session.cache.slots= 1000 delegation.token.master.key= null num.network.threads= 3 ssl.key.password= null reserved.broker.max.id= 1000 sasl.client.callback.handler.class= null metrics.num.samples= 2 transaction.remove.expired.transaction.cleanup.interval.ms= 3600000 socket.send.buffer.bytes= 102400 log.message.downconversion.enable= true ssl.protocol= TLS password.encoder.keyfactory.algorithm= null transaction.state.log.load.buffer.size= 5242880 socket.receive.buffer.bytes= 102400 ssl.keystore.location= null replica.fetch.min.bytes= 1 broker.rack= null unclean.leader.election.enable= false num.replica.alter.log.dirs.threads= null sasl.enabled.mechanisms= GSSAPI group.min.session.timeout.ms= 6000 offsets.retention.check.interval.ms= 600000 log.cleaner.io.buffer.load.factor= 0.9 transaction.max.timeout.ms= 900000 producer.purgatory.purge.interval.requests= 1000 metrics.sample.window.ms= 30000 group.max.size= 2147483647 broker.id= 0 offsets.topic.compression.codec= 0 delegation.token.max.lifetime.ms= 604800000 replication.quota.window.num= 11 log.retention.check.interval.ms= 300000 advertised.listeners= null leader.imbalance.per.broker.percentage= 10 sasl.login.refresh.window.jitter= 0.05 queued.max.request.bytes= -1
How to list all topics?
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.TopicListing;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ListingTopics {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(config);
for (TopicListing topicListing : admin.listTopics().listings().get()) {
System.out.println(topicListing);
}
}
}
(name=test-topic-1, internal=false) (name=test-topic-2, internal=false)
How to create topics?
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CreateTopic {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(config);
//creating new topic
System.out.println("-- creating --");
NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
admin.createTopics(Collections.singleton(newTopic));
//listing
System.out.println("-- listing --");
admin.listTopics().names().get().forEach(System.out::println);
}
}
-- creating -- -- listing -- my-new-topic test-topic-1 test-topic-2
Example ProjectDependencies and Technologies Used: - kafka-clients 2.4.1 Apache Kafka
- JDK 8
- Maven 3.5.4
|