【kafka+Kraft模式集群+SASL安全认证】
kafka+Kraft模式下的集群配置SASL安全认证
- kafka+Kraft模式下的集群配置
-
- 配置kafka服务的配置文件
- 创建一个kafka sasl认证的服务配置
- 配置kafka服务的启动脚本
- 启动kafka集群
-
- 生成kafka集群uuid
- 格式化所有存储目录
- 启动kafka服务器
- 停止运行
- 查看是否启动
- kafka可视化客户端配置
- springboot项目配置文件
kafka+Kraft模式下的集群配置
准备3个kafka,我这里用的kafka版本为:kafka_2.13-3.6.0,下载后解压:
tar zxvf kafka_2.13-3.6.0.tgz
更改解压后的文件名称:
cp kafka_2.13-3.6.0 kafka_2.13-3.6.0-1/2/3
分别得到kafka_2.13-3.6.0-1、kafka_2.13-3.6.0-2、kafka_2.13-3.6.0-3
配置kafka服务的配置文件
copy一份config/kraft/server.properties配置文件,修改名称 server-sasl.properties
cp config/kraft/server.properties config/kraft/server-sasl.properties
进入各个config/kraft/server-sasl.properties中做配置:
kafka_2.13-3.6.0-1:
node.id=1 controller.quorum.voters=1@127.0.0.1:19091,2@127.0.0.1:19092,3@127.0.0.1:19093 listeners=SASL_PLAINTEXT://:29091,CONTROLLER://:19091 sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN inter.broker.listener.name=SASL_PLAINTEXT advertised.listeners=SASL_PLAINTEXT://192.168.8.122:29091 controller.listener.names=CONTROLLER log.dirs=/data/kafka-cluster/kafka_2.13-3.6.0-1/kraft-combined-logs
kafka_2.13-3.6.0-2:
node.id=2 controller.quorum.voters=1@127.0.0.1:19091,2@127.0.0.1:19092,3@127.0.0.1:19093 listeners=SASL_PLAINTEXT://:29092,CONTROLLER://:19092 sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN inter.broker.listener.name=SASL_PLAINTEXT advertised.listeners=SASL_PLAINTEXT://192.168.8.122:29092 controller.listener.names=CONTROLLER log.dirs=/data/kafka-cluster/kafka_2.13-3.6.0-2/kraft-combined-logs
kafka_2.13-3.6.0-3:
node.id=3 controller.quorum.voters=1@127.0.0.1:19091,2@127.0.0.1:19092,3@127.0.0.1:19093 listeners=SASL_PLAINTEXT://:29093,CONTROLLER://:19093 sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN inter.broker.listener.name=SASL_PLAINTEXT advertised.listeners=SASL_PLAINTEXT://192.168.8.122:29093 controller.listener.names=CONTROLLER log.dirs=/data/kafka-cluster/kafka_2.13-3.6.0-3/kraft-combined-logs
创建一个kafka sasl认证的服务配置
可以在/data/kafka-cluster目录下新建一个kafka_server_jaas.conf全局配置文件,然后认证信息写好:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
username="admin"
password="admin"
user_kafka="admin";
};
配置kafka服务的启动脚本
copy一份kafka-server-start.sh ,修改名称 kafka-server-start-sasl.sh启动脚本修改名称,引入加密文件; 注意路径
cp kafka-server-start.sh kafka-server-start-sasl.sh
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/data/kafka-cluster/global_config/kafka_server_jaas.conf"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
kafka_2.13-3.6.0-1、kafka_2.13-3.6.0-2、kafka_2.13-3.6.0-3修改部分为:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/data/kafka-cluster/global_config/kafka_server_jaas.conf"
fi
启动kafka集群
生成kafka集群uuid
我们需要在启动服务器之前创建kafka集群id。执行下列命令,并记下运行生成的uuid,只需要在其中一个kafka中执行一次:
./bin/kafka-storage.sh random-uuid
格式化所有存储目录
接下来我们需要在每个kafka格式化所有存储目录
./bin/kafka-storage.sh format -t 你的UUID -c ./config/kraft/server-sasl.properties
启动kafka服务器
可以使用以下命令在守护程序模式下启动每个kafka服务器
./bin/kafka-server-start-sasl.sh -daemon ./config/kraft/server-sasl.properties
停止运行
./bin/kafka-server-stop ./config/kraft/server-sasl.properties
查看是否启动
jps
kafka可视化客户端配置
新建链接,Properties配置:

选择Security,这里一定要对应配置文件的类型:

选择Advanced:

选择JAAS Config:

到这里需要配置的已经完毕,可以点击测试或者直接连接了
springboot项目配置文件
在kafka配置文件下面加上该配置:
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username='你的userName' password='你的password';
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/32775250fa.html
