SpringBoot分布式Netty集群,通过Redis发布/订阅广播

一、前言

        之前做用springboot+websocket做双向通讯时,websocket的session存在无法序列化导致集群不能通过共享session来实现,后来采取了记录需要推送客户端ip,然后用http去请求web接口这个不友好的方式。当然需求只需要做扫码登录,这种方式影响也不会有什么影响。但集群问题一直没解决就在心里埋下了个种子。

二、正文

        用netty搭建websocket集群服务,因为netty中的channel是在本地的需要整合rabbitmq或者redis等发布/订阅模式来实现消息发送(或者Channel共享,具体还没考究母鸡能不能实现)。这里用redis做广播(如果用redis需要考虑消息丢失和ack等情况,这里只做演示)

三、流程图 

        根据流程图,用户端client1想给client2发送消息,如果是单机的话就很简单,直接拿到client2的Channel发送Message就行了。但如果是server集群的话,就需要广播消息,让每个集群节点都收到内容,Channel不在本地的话就忽略。

            SpringBoot分布式Netty集群,通过Redis发布/订阅广播

四、环境搭建

1、pom文件需要的依赖

       
        
            org.springframework.boot
            spring-boot-starter-data-redis
        
        
        
            org.redisson
            redisson
            2.13.0
        
        
        
            io.netty
            netty-all
            4.1.29.Final
        
         
        
            org.projectlombok
            lombok
            true
        

2、RedisConfig配置文件,注意:要与MessageListener实现类统一序列化器

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import java.net.UnknownHostException;

/**
 * @Description
 * @Author kele
 * @Data 2023/8/31 16:34
 */
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        // 将template 泛型设置为 
        RedisTemplate template = new RedisTemplate();
        // 连接工厂,不必修改
        template.setConnectionFactory(redisConnectionFactory);
        /*
         * 序列化设置
         */
        // key、hash的key 采用 String序列化方式
        template.setKeySerializer(RedisSerializer.string());
        template.setHashKeySerializer(RedisSerializer.string());
        // value、hash的value 采用 Jackson 序列化方式
        template.setValueSerializer(RedisSerializer.json());
        template.setHashValueSerializer(RedisSerializer.json());
        template.afterPropertiesSet();

        return template;
    }
}

3.定义redis订阅监听类配置

RedisMessageListenerConfiguration .java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.util.Arrays;
import java.util.List;

/**
 * @Description
 * @Author kele
 * @Data 2023/8/31 16:07
 */
@Configuration
public class RedisMessageListenerConfiguration {

    @Autowired
    private LifeRedisMessageListener lifeRedisMessageListener;
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    /**
     * 配置订阅关系
     */
    @Bean
    public RedisMessageListenerContainer container() {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        //订阅频道
        List topicList = Arrays.asList(new PatternTopic("life.*"),new PatternTopic("*.life"));
        container.addMessageListener(lifeRedisMessageListener, topicList);
        return container;
    }
}

LifeRedisMessageListener .java

import com.alibaba.fastjson.JSONObject;
import com.na.integration.socket.websocket.WebSocketHandler;
import com.na.model.dto.NettyRedisConnectionDto;
import com.na.common.utils.JSONUtils;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Map;

/**
 * @Description
 * @Author kele
 * @Data 2023/8/31 16:07
 */
@Component
@Slf4j
public class LifeRedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        //需要在加载bean的时候配置相同的redis序列化器,否则会乱码
        RedisSerializer keySerializer = redisTemplate.getKeySerializer();
        RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
        log.info("----------Life接收到发布者消息----------");
        log.info("|频道:{}", keySerializer.deserialize(message.getChannel()));
        log.info("|当前监听器绑定的pattern:{}", new String(pattern));
        log.info("|消息内容:{}", valueSerializer.deserialize(message.getBody()));
        log.info("---------------------------------");
        //反序列化
        JSONObject jsonObject = JSONUtils.toJsonObj((String) valueSerializer.deserialize(message.getBody()));
        NettyRedisConnectionDto dto = new NettyRedisConnectionDto();
        Long id = Long.valueOf(jsonObject.get("id").toString());
        dto.setId(id);
        dto.setChannel((Channel) jsonObject.get("key"));
        dto.setSendMessage(jsonObject.get("sendMessage").toString());
        dto.setSendId(Long.valueOf(jsonObject.get("sendId").toString()));

        /**
         * 发送内容
         */
        //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
        Map channelMap = WebSocketHandler.getChannelMap();
        //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
        Map clientMap = WebSocketHandler.getClientMap();
        //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
        String v = clientMap.get(dto.getSendId());
        Channel channel = null;
        try {
            channel = channelMap.get(v);
        } catch (NullPointerException e) {
            log.error("消息id:" + id + "通道不在本地");
            return;
        }
        Channel finalChannel = channel;
        channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(
                Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "来自id为"
                        + dto.getId() + ",发送的内容message=" + dto.getSendMessage())));
//        redisTemplate.opsForValue().get("");
    }

4、消息接收对象

import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;

@Data
@Accessors(chain = true)
public class MessageRequest implements Serializable {

    private Long unionId;

    private Integer current = 1;

    private Integer size = 10;
}

5、websocket通道初始化器

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @Description websocket通道初始化器
 **/
@Component
public class WebsocketChannelInitializer extends ChannelInitializer {

    @Autowired
    private WebSocketHandler webSocketHandler;

    @Value("${websocket.url}")
    private String websocketUrl;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {

        //获取pipeline通道
        ChannelPipeline pipeline = socketChannel.pipeline();
        //因为基于http协议,使用http的编码和解码器
        pipeline.addLast(new HttpServerCodec());
        //是以块方式写,添加ChunkedWriteHandler处理器
        pipeline.addLast(new ChunkedWriteHandler());
        /*
          说明
          1\. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
          2\. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
        */
        pipeline.addLast(new HttpObjectAggregator(8192));
        /* 说明
          1\. 对应websocket ,它的数据是以 帧(frame) 形式传递
          2\. 可以看到WebSocketFrame 下面有六个子类
          3\. 浏览器请求时 ws://localhost:7000/msg 表示请求的uri
          4\. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
          5\. 是通过一个 状态码 101
        */
        pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl));
        //自定义的handler ,处理业务逻辑
        pipeline.addLast(webSocketHandler);
    }
}

6、定义Handler处理器

import com.alibaba.fastjson.JSON;
import com.na.enums.ResultCode;
import com.na.exceptions.RRException;
import com.na.utils.RedisLockUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @Description websocket处理器
 **/
@Slf4j
@Component
@ChannelHandler.Sharable//保证处理器,在整个生命周期中就是以单例的形式存在,方便统计客户端的在线数量
public class WebSocketHandler extends SimpleChannelInboundHandler {
    @Autowired
    private RedisLockUtil redisLockUtil;

    //通道map,存储channel,用于群发消息,以及统计客户端的在线数量,解决问题问题三,如果是集群环境使用redis的hash数据类型存储即可
    private static Map channelMap = new ConcurrentHashMap();
    //任务map,存储future,用于停止队列任务
    private static Map futureMap = new ConcurrentHashMap();
    //存储channel的id和用户主键的映射,客户端保证用户主键传入的是唯一值,解决问题四,如果是集群中需要换成redis的hash数据类型存储即可
    private static Map clientMap = new ConcurrentHashMap();
    @Resource
    private RedisTemplate redisTemplate;

    /**
     * 客户端发送给服务端的消息
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

        try {
            //接受客户端发送的消息
            MessageRequest messageRequest = JSON.parseObject(msg.text(), MessageRequest.class);

            //每个channel都有id,asLongText是全局channel唯一id
            String key = ctx.channel().id().asLongText();
            //存储channel的id和用户的主键
            clientMap.put(messageRequest.getUnionId(), key);
            log.info("接受客户端的消息......" + ctx.channel().remoteAddress() + "-参数[" + messageRequest.getUnionId() + "]");

            if (!channelMap.containsKey(key)) {
                //使用channel中的任务队列,做周期循环推送客户端消息,解决问题二和问题五
                Future future = ctx.channel().eventLoop().scheduleAtFixedRate(new WebsocketRunnable(ctx, messageRequest), 0, 10, TimeUnit.SECONDS);
                //存储客户端和服务的通信的Chanel
                channelMap.put(key, ctx.channel());
                //存储每个channel中的future,保证每个channel中有一个定时任务在执行
                futureMap.put(key, future);
            } else {
                //每次客户端和服务的主动通信,和服务端周期向客户端推送消息互不影响 解决问题一
                ctx.channel().writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy"));
            }

        } catch (Exception e) {

            log.error("websocket服务器推送消息发生错误:", e);

        }
    }

    /**
     * 注册时执行
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        log.info("--channelRegistered注册时执行--" + ctx.channel().id().toString());
    }

    /**
     * 客户端连接时候的操作
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("一个客户端连接......" + ctx.channel().remoteAddress() + Thread.currentThread().getName());
    }

    /**
     * 离线时执行
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        log.info("--channelUnregistered离线时执行--" + ctx.channel().id().toString());
    }


    /**
     * 客户端掉线时的操作
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        String key = ctx.channel().id().asLongText();
        //移除通信过的channel
        channelMap.remove(key);
        //移除和用户绑定的channel
        clientMap.remove(key);
        //关闭掉线客户端的future
        Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
            future.cancel(true);
            futureMap.remove(key);
        });
        log.info("一个客户端移除......" + ctx.channel().remoteAddress());
        ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
    }

    /**
     * 从客户端收到新的数据、读取完成时调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
        log.info("--channelReadComplete从客户端收到新的数据--");
        ctx.flush();
    }


    /**
     * 发生异常时执行的操作
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        String key = ctx.channel().id().asLongText();
        //移除通信过的channel
        channelMap.remove(key);
        //移除和用户绑定的channel
        clientMap.remove(key);
        //移除定时任务
        Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
            future.cancel(true);
            futureMap.remove(key);
        });
        //关闭长连接
        ctx.close();
        log.info("异常发生 " + cause.getMessage());
    }

    public static Map getChannelMap() {
        return channelMap;
    }

    public static Map getFutureMap() {
        return futureMap;
    }

    public static Map getClientMap() {
        return clientMap;
    }

    /**
     * redission 防止重复在线,多个实例的本地缓存是否存在同一个id和与幂等性,这样会导致想要接收方混乱的bug
     * @param key
     * @param v
     */
    private void addChannelMap(String key, Channel v) {
        try {
            //定义keykey的锁
            redisLockUtil.lock(key, key, 10000, 3, 5000);
            WebSocketHandler.channelMap.put(key, v);
        } finally {
            redisLockUtil.unlock(key.toString(), key.toString());
        }
    }

    private void addFutureMap(String key, Future v) {
        try {
            //定义keykey的锁
            redisLockUtil.lock(key, key, 10000, 3, 5000);
            WebSocketHandler.futureMap.put(key, v);
        } finally {
            redisLockUtil.unlock(key.toString(), key.toString());
        }
    }

    private void addClientMap(Long key, String v) {
        try {
            //定义keykey的锁
            redisLockUtil.lock(key.toString(), key.toString(), 10000, 3, 5000);
            WebSocketHandler.clientMap.put(key, v);
        } finally {
            redisLockUtil.unlock(key.toString(), key.toString());
        }
    }

    public static void sendMessage(String key, String message) {
        if (StringUtils.isEmpty(key)) {
            throw new RRException(ResultCode.PARAM_NOT_NULL);
        }
        Channel channel = channelMap.get(key);
        if (channel == null) {
            throw new RRException(ResultCode.ID_IS_NULL);
        }
        try {
            channel.writeAndFlush(message);
        } catch (Exception e) {
            throw new RRException(ResultCode.SOME_USERS_SEND_FAIL);
        }
    }
}

7、websocket初始化器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * @Description websocket初始化器
 **/
@Slf4j
@Component
public class WebsocketInitialization {

    @Resource
    private WebsocketChannelInitializer websocketChannelInitializer;

    @Value("${websocket.port}")
    private Integer port;

    @Async
    public void init() throws InterruptedException {
        //bossGroup连接线程组,主要负责接受客户端连接,一般一个线程足矣
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //workerGroup工作线程组,主要负责网络IO读写
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //启动辅助类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //bootstrap绑定两个线程组
            serverBootstrap.group(bossGroup, workerGroup);
            //设置通道为NioChannel
            serverBootstrap.channel(NioServerSocketChannel.class);
            //可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
            //设置自定义的通道初始化器,用于入站操作
            serverBootstrap.childHandler(websocketChannelInitializer);
            //启动服务器,本质是Java程序发起系统调用,然后内核底层起了一个处于监听状态的服务,生成一个文件描述符FD
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //异步
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

8、自定义消息发送类

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;

/**
 * @Description 向客户端发送消息
 **/
@Slf4j
public class WebsocketRunnable implements Runnable {

    private ChannelHandlerContext channelHandlerContext;

    private MessageRequest messageRequest;

    public WebsocketRunnable(ChannelHandlerContext channelHandlerContext,MessageRequest messageRequest) {
        this.channelHandlerContext = channelHandlerContext;
        this.messageRequest = messageRequest;
    }

    @Override
    public void run() {
        try {
            log.info(Thread.currentThread().getName()+"--"+LocalDateTime.now().toString());
            channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
        } catch (Exception e) {
            log.error("websocket服务器推送消息发生错误:",e);
        }
    }
}

9、如果使用redission来加锁添加配置,不使用(跳过)

RedissonConfiguration.java配置类
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.ClusterServersConfig;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Configuration
public class RedissonConfiguration {

    @Autowired
    private RedisProperties redisProperties;
    /**
     * 初始化RedissonClient客户端
     * 注意:
     * 此实例集群为3节点,各节点1主1从
     * 集群模式,集群节点的地址须使用“redis://”前缀,否则将会报错。
     *
     * @return {@link RedissonClient}
     */
    @Bean
    public RedissonClient getRedissonClient() {
        Config config = new Config();
        if (redisProperties.getCluster() != null) {
            //集群模式配置
            List nodes = redisProperties.getCluster().getNodes();

            List clusterNodes = new ArrayList();
            for (int i = 0; i < nodes.size(); i++) {
                clusterNodes.add("redis://" + nodes.get(i));
            }
            ClusterServersConfig clusterServersConfig = config.useClusterServers()
                    .addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));

            if (!StringUtils.isEmpty(redisProperties.getPassword())) {
                clusterServersConfig.setPassword(redisProperties.getPassword());
            }
        } else {
            //单节点配置
            String address = "redis://" + redisProperties.getHost() + ":" + redisProperties.getPort();
            SingleServerConfig serverConfig = config.useSingleServer();
            serverConfig.setAddress(address);
            if (!StringUtils.isEmpty(redisProperties.getPassword())) {
                serverConfig.setPassword(redisProperties.getPassword());
            }
            serverConfig.setDatabase(redisProperties.getDatabase());
        }
        //看门狗的锁续期时间,默认30000ms,这里配置成15000ms
//        config.setLockWatchdogTimeout(15000);
        config.setLockWatchdogTimeout(15000);
        return Redisson.create(config);
    }
}
RedisLockUtil.java
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;

@Component
//@Order(value = 2)
public class RedisLockUtil {
    private final Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);

    @Autowired
    private RedissonClient redissonClient;

   /* @Autowired
    public RedisLockUtil(@Qualifier("customRedisson") RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }*/

    /**
     * 源码
     * 1.固定有效期的锁:超过有效期leaseTime后,自动释放锁。
     *
     *     public void lock(long leaseTime, TimeUnit unit) {
     *         try {
     *             this.lockInterruptibly(leaseTime, unit);
     *         } catch (InterruptedException var5) {
     *             Thread.currentThread().interrupt();
     *         }
     *     }
     * 2.没有有效期的锁:默认30秒,然后采用Watchdog进行续期,直到业务逻辑执行完毕。
     *
     *     public void lock() {
     *         try {
     *             this.lockInterruptibly();
     *         } catch (InterruptedException var2) {
     *             Thread.currentThread().interrupt();
     *         }
     *     }
     * ————————————————
     */



    /**
     * 加锁
     * @param key 锁的 key
     * @param value  value ( key + value 必须保证唯一)
     * @param expire key 的过期时间,单位 ms
     * @param retryTimes 重试次数,即加锁失败之后的重试次数
     * @param retryInterval 重试时间间隔,单位 ms
     * @return 加锁 true 成功
     */
    public RLock lock(String key, String value, long expire, int retryTimes, long retryInterval) {
        logger.info("locking... redisK = {}", key);
        RLock fairLock = redissonClient.getFairLock(key + ":" +  value);
        try {
            boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);//是否加锁成功
            if (tryLock) {
                logger.info("locked... redisK = {}", key);
                return fairLock;
            } else {
                //重试获取锁
                logger.info("retry to acquire lock: [redisK = {}]", key);
                int count = 0;
                while(count < retryTimes) {
                    try {
                        Thread.sleep(retryInterval);
                        tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
                        if(tryLock) {
                            logger.info("locked... redisK = {}", key);
                            return fairLock;
                        }
                        logger.warn("{} times try to acquire lock", count + 1);
                        count++;
                    } catch (Exception e) {
                        logger.error("acquire redis occurred an exception", e);
                        break;
                    }
                }

                logger.info("fail to acquire lock {}", key);
            }
        } catch (Throwable e1) {
            logger.error("acquire redis occurred an exception", e1);
        }

        return fairLock;
    }

    /**
     * 加锁
     * @param key 锁的 key
     * @param value  value ( key + value 必须保证唯一)
     * @param expire key 的过期时间,单位 ms
     * @param retryTimes 重试次数,即加锁失败之后的重试次数
     * @param retryInterval 重试时间间隔,单位 ms
     * @return 加锁 true 成功
     */
    public boolean lock2(String key, String value, long expire, int retryTimes, long retryInterval) {
        logger.info("locking... redisK = {}", key);
        RLock fairLock = redissonClient.getFairLock(key + ":" +  value);
        try {
            boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
            if (tryLock) {
                logger.info("locked... redisK = {}", key);
                return true;
            } else {
                //重试获取锁
                logger.info("retry to acquire lock: [redisK = {}]", key);
                int count = 0;
                while(count < retryTimes) {
                    try {
                        Thread.sleep(retryInterval);
                        tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
                        if(tryLock) {
                            logger.info("locked... redisK = {}", key);
                            return true;
                        }
                        logger.warn("{} times try to acquire lock", count + 1);
                        count++;
                    } catch (Exception e) {
                        logger.error("acquire redis occurred an exception", e);
                        break;
                    }
                }

                logger.info("fail to acquire lock {}", key);
                return false;
            }
        } catch (Throwable e1) {
            logger.error("acquire redis occurred an exception", e1);
            return false;
        }
    }


    /**
     * 加锁
     * @param key 锁的 key
     * @param value  value ( key + value 必须保证唯一)
     * @param expire key 的过期时间,单位 ms
     * @return 加锁 true 成功
     */
    public boolean lockCheck(String key, String value, long expire) {
        logger.info("locking... redisK = {}", key);
        RLock fairLock = redissonClient.getFairLock(key + ":" +  value);
        boolean tryLock = false;
        try {
            tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
        } catch (Throwable e1) {
            logger.error("acquire redis occurred an exception", e1);
        }
        return tryLock;
    }



    /**
     * 加锁
     * @param key 锁的 key
     * @param value  value ( key + value 必须保证唯一)
     * @param expire key 的过期时间,单位 ms
     * @return 加锁 true 成功
     */
    public boolean lockDog(String key, String value, long expire) {
        logger.info("locking... redisK = {}", key);
        RLock fairLock = redissonClient.getFairLock(key + ":" +  value);
        boolean tryLock = false;
        try {
            fairLock.tryLock(0,  TimeUnit.MILLISECONDS);
        } catch (Throwable e1) {
            logger.error("acquire redis occurred an exception", e1);
        }
        return tryLock;
    }
    /**
     * 释放KEY
     * @return 释放锁 true 成功
     */
    public boolean unlock(String key, String value) {
        RLock fairLock = redissonClient.getFairLock(key + ":" +  value);
        try {
            //如果这里抛异常,后续锁无法释放
            if (fairLock.isLocked()) {
                fairLock.unlock();
                logger.info("release lock success");

                return true;
            }
        } catch (Throwable e) {
            logger.error("release lock occurred an exception", e);
        }finally {
            fairLock.unlock();
        }

        return false;
    }
}

 10、定义controller,自定义发送消息

import com.na.integration.socket.websocket.WebSocketHandler;
import com.na.model.dto.NettyRedisConnectionDto;
import com.na.model.vo.NettyRedisConnectionVo;
import com.na.common.utils.JSONUtils;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

@RequestMapping("ws")
@RestController
public class WebsocketController {
    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 群发消息
     *
     * @param idList 要把消息发送给其他用户的主键
     */
    @RequestMapping("hello1")
    private Map hello(List idList) {
        //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
        Map channelMap = WebSocketHandler.getChannelMap();
        //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
        Map clientMap = WebSocketHandler.getClientMap();
        //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
        idList.stream().forEach(id -> {
            String v = clientMap.get(id);
            Channel channel = channelMap.get(v);
            channel.eventLoop().execute(() -> channel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy")));
        });
        redisTemplate.convertAndSend("life.all", "hello publish/subscribe");
        return clientMap;
    }


    /**
     * 向redis 发布/订阅模式发送消息 可采用广播消息集群监听
     * 需要考虑 接收方是否在线,不在线的情况是缓存还是延迟推送
     * 需要考虑是否重复在线,多个实例的本地缓存是否存在同一个id,这样会导致想要接收方混乱的bug
     */
    @RequestMapping("sendMessage")
    private Map sendMessage(@RequestBody NettyRedisConnectionVo vo) {
        //获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
        Map channelMap = WebSocketHandler.getChannelMap();
        //获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
        Map clientMap = WebSocketHandler.getClientMap();
        //解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
        String v = clientMap.get(vo.getSendId());
        Channel channel = null;
        if (v != null) {
            channel = channelMap.get(v);
            Channel finalChannel = channel;
            //需要发送的 与redis监听定义不同的内容方便测试分辨
            channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + ",message=" + vo.getSendMessage())));
        } else {
            channel = (Channel) redisTemplate.opsForValue().get("id");
            //封装序列化
            NettyRedisConnectionDto dto = new NettyRedisConnectionDto()
                    .setId(vo.getId())
                    .setSendMessage(vo.getSendMessage())
                    .setSendId(vo.getSendId())
                    .setChannel(channel);
            redisTemplate.convertAndSend("life.all", JSONUtils.bean2JSONObject(dto));
        }
        return clientMap;
    }

}

11、启动类配置(与springBoot启动类区分)

也可以启动方法代码写在springboot启动类里

WebsocketApplication.java
import com.na.integration.socket.websocket.WebsocketInitialization;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Slf4j
@Component
public class WebsocketApplication {

    @Resource
    private WebsocketInitialization websocketInitialization;

    @PostConstruct
    public void start() {
        try {
            log.info(Thread.currentThread().getName() + ":websocket启动中......");
            websocketInitialization.init();
            log.info(Thread.currentThread().getName() + ":websocket启动成功!!!");
        } catch (Exception e) {
            log.error("websocket发生错误:",e);
        }
    }
}

12、yml配置,其他配置根据自己实际情况来

websocket:
  port: 7000 #端口
  url: /msg	#访问url

五、集群测试

1、启动两个springboot server实例端口分别18088、18089,websocket端口分别是7000、7001,用测试工具创建ws协议请求,

注意:

1、unionId需要保持全局唯一。

2、websocket端口和springboot端口不一样

SpringBoot分布式Netty集群,通过Redis发布/订阅广播

这里可以看到用户client1已经连接进来了

SpringBoot分布式Netty集群,通过Redis发布/订阅广播

2、如果要是用群发或者指定用户的话,就需要用到广播模式。

指定unionId用户发送请求

http://localhost:18088/ws/sendMessage

SpringBoot分布式Netty集群,通过Redis发布/订阅广播

3 这里可以看到unionId=2的Channel实例在18089服务上从而发送成功,而18088的实例没有unionId=2的Channel就忽略。

SpringBoot分布式Netty集群,通过Redis发布/订阅广播

SpringBoot分布式Netty集群,通过Redis发布/订阅广播

7001客户端也接收到了服务器发来的消息 

SpringBoot分布式Netty集群,通过Redis发布/订阅广播

部分代码参照:微服务springcloud环境下基于Netty搭建websocket集群实现服务器消息推送—-netty是yyds_netty 集群_码学弟的博客-CSDN博客 

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/a410b70ebb.html