使用JavaApi获取Kafka的topic、topic的分区数量与副本数量
•
大数据
目录
1、代码
2、结果
1、代码
package com.zsh.kafkatest.topic;
import com.zsh.kafkatest.connect.KafkaConnection;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* @Author ZhaoShuHao
* @Date 2023/7/21 16:58
*/
public class GetTopicAboutDatasource {
public static void main(String[] args) {
String kafka = "192.168.140.65:9092";
String[] kafkas = kafka.split(";");
for(int i=0;i<kafkas.length;i++){
String[] _kafka = kafkas[i].split(":");
if(_kafka.length<2){
System.out.println("有个地址缺少IP或端口,获取topic失败");
}
}
List<Map> tVos = new ArrayList();
List list = new ArrayList();
AdminClient adminiClient = KafkaConnection.kafkaTestConnection(kafka);
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult topicsResult = adminiClient.listTopics(options);
try {
Set topicNames = topicsResult.names().get();
Iterator it = topicNames.iterator();
while (it.hasNext()){
Map map = new HashMap();
String topicName = it.next().toString();
Map topicInfo = getTopicInfo(kafka,topicName);
map.put("tableName",topicName);
map.put("issame","0");
map.put("Partitions", String.valueOf(topicInfo.get("Partitions")));
map.put("PartitionSize", String.valueOf(topicInfo.get("PartitionSize")));
map.put("ReplicationFactor", String.valueOf(topicInfo.get("ReplicationFactor")));
tVos.add(map);
}
} catch (Exception e) {
System.out.println("获取topic失败");
}finally {
KafkaConnection.close(adminiClient);
}
System.out.println("所有信息查询成功tVos:"+tVos);
tVos.stream().forEach(maptopic -> {
System.out.println("————————————————————————————————————");
System.out.println("topic主题名称:"+maptopic.get("tableName"));
maptopic.get("issame");
maptopic.get("Partitions");
System.out.println("topic分区信息:"+maptopic.get("Partitions"));
maptopic.get("PartitionSize");
System.out.println("topic分区数量:"+maptopic.get("PartitionSize"));
maptopic.get("ReplicationFactor");
System.out.println("topic副本数量:"+maptopic.get("ReplicationFactor"));
System.out.println("————————————————————————————————————");
});
}
//获取topic的详细信息
public static Map getTopicInfo(String ipAndPort,String topic){
Map map = new HashMap();
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ipAndPort);
AdminClient adminClient = AdminClient.create(props);
String topicName = topic;
DescribeTopicsOptions describeTopicsOptions = new DescribeTopicsOptions().timeoutMs(5000);
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(topicName), describeTopicsOptions);
KafkaFuture topicDescriptionFuture = describeTopicsResult.values().get(topicName);
try {
TopicDescription topicDescription = topicDescriptionFuture.get();
List partitions = topicDescription.partitions();
int replicationFactor = partitions.get(0).replicas().size();
map.put("Partitions", partitions);
map.put("PartitionSize", partitions.size());
map.put("ReplicationFactor", replicationFactor);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return map;
}
}
2、结果


本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/bba0684cdc.html
