【kafka+Kraft模式集群+SASL安全认证】

kafka+Kraft模式下的集群配置SASL安全认证

  • kafka+Kraft模式下的集群配置
    • 配置kafka服务的配置文件
    • 创建一个kafka sasl认证的服务配置
    • 配置kafka服务的启动脚本
    • 启动kafka集群
      • 生成kafka集群uuid
      • 格式化所有存储目录
      • 启动kafka服务器
      • 停止运行
      • 查看是否启动
  • kafka可视化客户端配置
  • springboot项目配置文件

kafka+Kraft模式下的集群配置

准备3个kafka,我这里用的kafka版本为:kafka_2.13-3.6.0,下载后解压:

tar zxvf kafka_2.13-3.6.0.tgz

更改解压后的文件名称:

cp kafka_2.13-3.6.0 kafka_2.13-3.6.0-1/2/3

分别得到kafka_2.13-3.6.0-1、kafka_2.13-3.6.0-2、kafka_2.13-3.6.0-3

配置kafka服务的配置文件

copy一份config/kraft/server.properties配置文件,修改名称 server-sasl.properties

cp config/kraft/server.properties config/kraft/server-sasl.properties

进入各个config/kraft/server-sasl.properties中做配置:

kafka_2.13-3.6.0-1:

node.id=1
controller.quorum.voters=1@127.0.0.1:19091,2@127.0.0.1:19092,3@127.0.0.1:19093
listeners=SASL_PLAINTEXT://:29091,CONTROLLER://:19091
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://192.168.8.122:29091
controller.listener.names=CONTROLLER
log.dirs=/data/kafka-cluster/kafka_2.13-3.6.0-1/kraft-combined-logs

kafka_2.13-3.6.0-2:

node.id=2
controller.quorum.voters=1@127.0.0.1:19091,2@127.0.0.1:19092,3@127.0.0.1:19093
listeners=SASL_PLAINTEXT://:29092,CONTROLLER://:19092
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://192.168.8.122:29092
controller.listener.names=CONTROLLER
log.dirs=/data/kafka-cluster/kafka_2.13-3.6.0-2/kraft-combined-logs

kafka_2.13-3.6.0-3:

node.id=3
controller.quorum.voters=1@127.0.0.1:19091,2@127.0.0.1:19092,3@127.0.0.1:19093
listeners=SASL_PLAINTEXT://:29093,CONTROLLER://:19093
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://192.168.8.122:29093
controller.listener.names=CONTROLLER
log.dirs=/data/kafka-cluster/kafka_2.13-3.6.0-3/kraft-combined-logs

创建一个kafka sasl认证的服务配置

可以在/data/kafka-cluster目录下新建一个kafka_server_jaas.conf全局配置文件,然后认证信息写好:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    serviceName="kafka"
    username="admin"
    password="admin"
    user_kafka="admin";
};

配置kafka服务的启动脚本

copy一份kafka-server-start.sh ,修改名称 kafka-server-start-sasl.sh启动脚本修改名称,引入加密文件; 注意路径

cp kafka-server-start.sh kafka-server-start-sasl.sh
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
	echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
	exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/data/kafka-cluster/global_config/kafka_server_jaas.conf"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

kafka_2.13-3.6.0-1、kafka_2.13-3.6.0-2、kafka_2.13-3.6.0-3修改部分为:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/data/kafka-cluster/global_config/kafka_server_jaas.conf"
fi

启动kafka集群

生成kafka集群uuid

我们需要在启动服务器之前创建kafka集群id。执行下列命令,并记下运行生成的uuid,只需要在其中一个kafka中执行一次:

./bin/kafka-storage.sh random-uuid

格式化所有存储目录

接下来我们需要在每个kafka格式化所有存储目录

./bin/kafka-storage.sh format -t 你的UUID -c ./config/kraft/server-sasl.properties

启动kafka服务器

可以使用以下命令在守护程序模式下启动每个kafka服务器

./bin/kafka-server-start-sasl.sh -daemon ./config/kraft/server-sasl.properties

停止运行

./bin/kafka-server-stop ./config/kraft/server-sasl.properties

查看是否启动

jps

kafka可视化客户端配置

新建链接,Properties配置:

在这里插入图片描述

选择Security,这里一定要对应配置文件的类型:

在这里插入图片描述

选择Advanced:

在这里插入图片描述

选择JAAS Config:

在这里插入图片描述

到这里需要配置的已经完毕,可以点击测试或者直接连接了

springboot项目配置文件

在kafka配置文件下面加上该配置:

properties:
  security:
    protocol: SASL_PLAINTEXT
  sasl:
    mechanism: PLAIN
    jaas:
      config: org.apache.kafka.common.security.plain.PlainLoginModule required username='你的userName' password='你的password';

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/32775250fa.html