SpringBoot分布式Netty集群,通过Redis发布/订阅广播
一、前言
之前做用springboot+websocket做双向通讯时,websocket的session存在无法序列化导致集群不能通过共享session来实现,后来采取了记录需要推送客户端ip,然后用http去请求web接口这个不友好的方式。当然需求只需要做扫码登录,这种方式影响也不会有什么影响。但集群问题一直没解决就在心里埋下了个种子。
二、正文
用netty搭建websocket集群服务,因为netty中的channel是在本地的需要整合rabbitmq或者redis等发布/订阅模式来实现消息发送(或者Channel共享,具体还没考究母鸡能不能实现)。这里用redis做广播(如果用redis需要考虑消息丢失和ack等情况,这里只做演示)
三、流程图
根据流程图,用户端client1想给client2发送消息,如果是单机的话就很简单,直接拿到client2的Channel发送Message就行了。但如果是server集群的话,就需要广播消息,让每个集群节点都收到内容,Channel不在本地的话就忽略。

四、环境搭建
1、pom文件需要的依赖
org.springframework.boot
spring-boot-starter-data-redis
org.redisson
redisson
2.13.0
io.netty
netty-all
4.1.29.Final
org.projectlombok
lombok
true
2、RedisConfig配置文件,注意:要与MessageListener实现类统一序列化器
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import java.net.UnknownHostException;
/**
* @Description
* @Author kele
* @Data 2023/8/31 16:34
*/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
// 将template 泛型设置为
RedisTemplate template = new RedisTemplate();
// 连接工厂,不必修改
template.setConnectionFactory(redisConnectionFactory);
/*
* 序列化设置
*/
// key、hash的key 采用 String序列化方式
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
// value、hash的value 采用 Jackson 序列化方式
template.setValueSerializer(RedisSerializer.json());
template.setHashValueSerializer(RedisSerializer.json());
template.afterPropertiesSet();
return template;
}
}
3.定义redis订阅监听类配置
RedisMessageListenerConfiguration .java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.util.Arrays;
import java.util.List;
/**
* @Description
* @Author kele
* @Data 2023/8/31 16:07
*/
@Configuration
public class RedisMessageListenerConfiguration {
@Autowired
private LifeRedisMessageListener lifeRedisMessageListener;
@Autowired
private RedisConnectionFactory redisConnectionFactory;
/**
* 配置订阅关系
*/
@Bean
public RedisMessageListenerContainer container() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
//订阅频道
List topicList = Arrays.asList(new PatternTopic("life.*"),new PatternTopic("*.life"));
container.addMessageListener(lifeRedisMessageListener, topicList);
return container;
}
}
LifeRedisMessageListener .java
import com.alibaba.fastjson.JSONObject;
import com.na.integration.socket.websocket.WebSocketHandler;
import com.na.model.dto.NettyRedisConnectionDto;
import com.na.common.utils.JSONUtils;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Map;
/**
* @Description
* @Author kele
* @Data 2023/8/31 16:07
*/
@Component
@Slf4j
public class LifeRedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
//需要在加载bean的时候配置相同的redis序列化器,否则会乱码
RedisSerializer keySerializer = redisTemplate.getKeySerializer();
RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
log.info("----------Life接收到发布者消息----------");
log.info("|频道:{}", keySerializer.deserialize(message.getChannel()));
log.info("|当前监听器绑定的pattern:{}", new String(pattern));
log.info("|消息内容:{}", valueSerializer.deserialize(message.getBody()));
log.info("---------------------------------");
//反序列化
JSONObject jsonObject = JSONUtils.toJsonObj((String) valueSerializer.deserialize(message.getBody()));
NettyRedisConnectionDto dto = new NettyRedisConnectionDto();
Long id = Long.valueOf(jsonObject.get("id").toString());
dto.setId(id);
dto.setChannel((Channel) jsonObject.get("key"));
dto.setSendMessage(jsonObject.get("sendMessage").toString());
dto.setSendId(Long.valueOf(jsonObject.get("sendId").toString()));
/**
* 发送内容
*/
//获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
Map channelMap = WebSocketHandler.getChannelMap();
//获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
Map clientMap = WebSocketHandler.getClientMap();
//解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
String v = clientMap.get(dto.getSendId());
Channel channel = null;
try {
channel = channelMap.get(v);
} catch (NullPointerException e) {
log.error("消息id:" + id + "通道不在本地");
return;
}
Channel finalChannel = channel;
channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(
Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "来自id为"
+ dto.getId() + ",发送的内容message=" + dto.getSendMessage())));
// redisTemplate.opsForValue().get("");
}
4、消息接收对象
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
@Data
@Accessors(chain = true)
public class MessageRequest implements Serializable {
private Long unionId;
private Integer current = 1;
private Integer size = 10;
}
5、websocket通道初始化器
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @Description websocket通道初始化器
**/
@Component
public class WebsocketChannelInitializer extends ChannelInitializer {
@Autowired
private WebSocketHandler webSocketHandler;
@Value("${websocket.url}")
private String websocketUrl;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取pipeline通道
ChannelPipeline pipeline = socketChannel.pipeline();
//因为基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
说明
1\. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
2\. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/* 说明
1\. 对应websocket ,它的数据是以 帧(frame) 形式传递
2\. 可以看到WebSocketFrame 下面有六个子类
3\. 浏览器请求时 ws://localhost:7000/msg 表示请求的uri
4\. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
5\. 是通过一个 状态码 101
*/
pipeline.addLast(new WebSocketServerProtocolHandler(websocketUrl));
//自定义的handler ,处理业务逻辑
pipeline.addLast(webSocketHandler);
}
}
6、定义Handler处理器
import com.alibaba.fastjson.JSON;
import com.na.enums.ResultCode;
import com.na.exceptions.RRException;
import com.na.utils.RedisLockUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @Description websocket处理器
**/
@Slf4j
@Component
@ChannelHandler.Sharable//保证处理器,在整个生命周期中就是以单例的形式存在,方便统计客户端的在线数量
public class WebSocketHandler extends SimpleChannelInboundHandler {
@Autowired
private RedisLockUtil redisLockUtil;
//通道map,存储channel,用于群发消息,以及统计客户端的在线数量,解决问题问题三,如果是集群环境使用redis的hash数据类型存储即可
private static Map channelMap = new ConcurrentHashMap();
//任务map,存储future,用于停止队列任务
private static Map futureMap = new ConcurrentHashMap();
//存储channel的id和用户主键的映射,客户端保证用户主键传入的是唯一值,解决问题四,如果是集群中需要换成redis的hash数据类型存储即可
private static Map clientMap = new ConcurrentHashMap();
@Resource
private RedisTemplate redisTemplate;
/**
* 客户端发送给服务端的消息
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
try {
//接受客户端发送的消息
MessageRequest messageRequest = JSON.parseObject(msg.text(), MessageRequest.class);
//每个channel都有id,asLongText是全局channel唯一id
String key = ctx.channel().id().asLongText();
//存储channel的id和用户的主键
clientMap.put(messageRequest.getUnionId(), key);
log.info("接受客户端的消息......" + ctx.channel().remoteAddress() + "-参数[" + messageRequest.getUnionId() + "]");
if (!channelMap.containsKey(key)) {
//使用channel中的任务队列,做周期循环推送客户端消息,解决问题二和问题五
Future future = ctx.channel().eventLoop().scheduleAtFixedRate(new WebsocketRunnable(ctx, messageRequest), 0, 10, TimeUnit.SECONDS);
//存储客户端和服务的通信的Chanel
channelMap.put(key, ctx.channel());
//存储每个channel中的future,保证每个channel中有一个定时任务在执行
futureMap.put(key, future);
} else {
//每次客户端和服务的主动通信,和服务端周期向客户端推送消息互不影响 解决问题一
ctx.channel().writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy"));
}
} catch (Exception e) {
log.error("websocket服务器推送消息发生错误:", e);
}
}
/**
* 注册时执行
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
log.info("--channelRegistered注册时执行--" + ctx.channel().id().toString());
}
/**
* 客户端连接时候的操作
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("一个客户端连接......" + ctx.channel().remoteAddress() + Thread.currentThread().getName());
}
/**
* 离线时执行
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
log.info("--channelUnregistered离线时执行--" + ctx.channel().id().toString());
}
/**
* 客户端掉线时的操作
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String key = ctx.channel().id().asLongText();
//移除通信过的channel
channelMap.remove(key);
//移除和用户绑定的channel
clientMap.remove(key);
//关闭掉线客户端的future
Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
future.cancel(true);
futureMap.remove(key);
});
log.info("一个客户端移除......" + ctx.channel().remoteAddress());
ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
}
/**
* 从客户端收到新的数据、读取完成时调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
log.info("--channelReadComplete从客户端收到新的数据--");
ctx.flush();
}
/**
* 发生异常时执行的操作
* 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
String key = ctx.channel().id().asLongText();
//移除通信过的channel
channelMap.remove(key);
//移除和用户绑定的channel
clientMap.remove(key);
//移除定时任务
Optional.ofNullable(futureMap.get(key)).ifPresent(future -> {
future.cancel(true);
futureMap.remove(key);
});
//关闭长连接
ctx.close();
log.info("异常发生 " + cause.getMessage());
}
public static Map getChannelMap() {
return channelMap;
}
public static Map getFutureMap() {
return futureMap;
}
public static Map getClientMap() {
return clientMap;
}
/**
* redission 防止重复在线,多个实例的本地缓存是否存在同一个id和与幂等性,这样会导致想要接收方混乱的bug
* @param key
* @param v
*/
private void addChannelMap(String key, Channel v) {
try {
//定义keykey的锁
redisLockUtil.lock(key, key, 10000, 3, 5000);
WebSocketHandler.channelMap.put(key, v);
} finally {
redisLockUtil.unlock(key.toString(), key.toString());
}
}
private void addFutureMap(String key, Future v) {
try {
//定义keykey的锁
redisLockUtil.lock(key, key, 10000, 3, 5000);
WebSocketHandler.futureMap.put(key, v);
} finally {
redisLockUtil.unlock(key.toString(), key.toString());
}
}
private void addClientMap(Long key, String v) {
try {
//定义keykey的锁
redisLockUtil.lock(key.toString(), key.toString(), 10000, 3, 5000);
WebSocketHandler.clientMap.put(key, v);
} finally {
redisLockUtil.unlock(key.toString(), key.toString());
}
}
public static void sendMessage(String key, String message) {
if (StringUtils.isEmpty(key)) {
throw new RRException(ResultCode.PARAM_NOT_NULL);
}
Channel channel = channelMap.get(key);
if (channel == null) {
throw new RRException(ResultCode.ID_IS_NULL);
}
try {
channel.writeAndFlush(message);
} catch (Exception e) {
throw new RRException(ResultCode.SOME_USERS_SEND_FAIL);
}
}
}
7、websocket初始化器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Description websocket初始化器
**/
@Slf4j
@Component
public class WebsocketInitialization {
@Resource
private WebsocketChannelInitializer websocketChannelInitializer;
@Value("${websocket.port}")
private Integer port;
@Async
public void init() throws InterruptedException {
//bossGroup连接线程组,主要负责接受客户端连接,一般一个线程足矣
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//workerGroup工作线程组,主要负责网络IO读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//启动辅助类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//bootstrap绑定两个线程组
serverBootstrap.group(bossGroup, workerGroup);
//设置通道为NioChannel
serverBootstrap.channel(NioServerSocketChannel.class);
//可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
//设置自定义的通道初始化器,用于入站操作
serverBootstrap.childHandler(websocketChannelInitializer);
//启动服务器,本质是Java程序发起系统调用,然后内核底层起了一个处于监听状态的服务,生成一个文件描述符FD
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//异步
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
8、自定义消息发送类
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
/**
* @Description 向客户端发送消息
**/
@Slf4j
public class WebsocketRunnable implements Runnable {
private ChannelHandlerContext channelHandlerContext;
private MessageRequest messageRequest;
public WebsocketRunnable(ChannelHandlerContext channelHandlerContext,MessageRequest messageRequest) {
this.channelHandlerContext = channelHandlerContext;
this.messageRequest = messageRequest;
}
@Override
public void run() {
try {
log.info(Thread.currentThread().getName()+"--"+LocalDateTime.now().toString());
channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
} catch (Exception e) {
log.error("websocket服务器推送消息发生错误:",e);
}
}
}
9、如果使用redission来加锁添加配置,不使用(跳过)
RedissonConfiguration.java配置类
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.ClusterServersConfig;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Configuration
public class RedissonConfiguration {
@Autowired
private RedisProperties redisProperties;
/**
* 初始化RedissonClient客户端
* 注意:
* 此实例集群为3节点,各节点1主1从
* 集群模式,集群节点的地址须使用“redis://”前缀,否则将会报错。
*
* @return {@link RedissonClient}
*/
@Bean
public RedissonClient getRedissonClient() {
Config config = new Config();
if (redisProperties.getCluster() != null) {
//集群模式配置
List nodes = redisProperties.getCluster().getNodes();
List clusterNodes = new ArrayList();
for (int i = 0; i < nodes.size(); i++) {
clusterNodes.add("redis://" + nodes.get(i));
}
ClusterServersConfig clusterServersConfig = config.useClusterServers()
.addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
if (!StringUtils.isEmpty(redisProperties.getPassword())) {
clusterServersConfig.setPassword(redisProperties.getPassword());
}
} else {
//单节点配置
String address = "redis://" + redisProperties.getHost() + ":" + redisProperties.getPort();
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress(address);
if (!StringUtils.isEmpty(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
serverConfig.setDatabase(redisProperties.getDatabase());
}
//看门狗的锁续期时间,默认30000ms,这里配置成15000ms
// config.setLockWatchdogTimeout(15000);
config.setLockWatchdogTimeout(15000);
return Redisson.create(config);
}
}
RedisLockUtil.java
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
//@Order(value = 2)
public class RedisLockUtil {
private final Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);
@Autowired
private RedissonClient redissonClient;
/* @Autowired
public RedisLockUtil(@Qualifier("customRedisson") RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}*/
/**
* 源码
* 1.固定有效期的锁:超过有效期leaseTime后,自动释放锁。
*
* public void lock(long leaseTime, TimeUnit unit) {
* try {
* this.lockInterruptibly(leaseTime, unit);
* } catch (InterruptedException var5) {
* Thread.currentThread().interrupt();
* }
* }
* 2.没有有效期的锁:默认30秒,然后采用Watchdog进行续期,直到业务逻辑执行完毕。
*
* public void lock() {
* try {
* this.lockInterruptibly();
* } catch (InterruptedException var2) {
* Thread.currentThread().interrupt();
* }
* }
* ————————————————
*/
/**
* 加锁
* @param key 锁的 key
* @param value value ( key + value 必须保证唯一)
* @param expire key 的过期时间,单位 ms
* @param retryTimes 重试次数,即加锁失败之后的重试次数
* @param retryInterval 重试时间间隔,单位 ms
* @return 加锁 true 成功
*/
public RLock lock(String key, String value, long expire, int retryTimes, long retryInterval) {
logger.info("locking... redisK = {}", key);
RLock fairLock = redissonClient.getFairLock(key + ":" + value);
try {
boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);//是否加锁成功
if (tryLock) {
logger.info("locked... redisK = {}", key);
return fairLock;
} else {
//重试获取锁
logger.info("retry to acquire lock: [redisK = {}]", key);
int count = 0;
while(count < retryTimes) {
try {
Thread.sleep(retryInterval);
tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
if(tryLock) {
logger.info("locked... redisK = {}", key);
return fairLock;
}
logger.warn("{} times try to acquire lock", count + 1);
count++;
} catch (Exception e) {
logger.error("acquire redis occurred an exception", e);
break;
}
}
logger.info("fail to acquire lock {}", key);
}
} catch (Throwable e1) {
logger.error("acquire redis occurred an exception", e1);
}
return fairLock;
}
/**
* 加锁
* @param key 锁的 key
* @param value value ( key + value 必须保证唯一)
* @param expire key 的过期时间,单位 ms
* @param retryTimes 重试次数,即加锁失败之后的重试次数
* @param retryInterval 重试时间间隔,单位 ms
* @return 加锁 true 成功
*/
public boolean lock2(String key, String value, long expire, int retryTimes, long retryInterval) {
logger.info("locking... redisK = {}", key);
RLock fairLock = redissonClient.getFairLock(key + ":" + value);
try {
boolean tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
if (tryLock) {
logger.info("locked... redisK = {}", key);
return true;
} else {
//重试获取锁
logger.info("retry to acquire lock: [redisK = {}]", key);
int count = 0;
while(count < retryTimes) {
try {
Thread.sleep(retryInterval);
tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
if(tryLock) {
logger.info("locked... redisK = {}", key);
return true;
}
logger.warn("{} times try to acquire lock", count + 1);
count++;
} catch (Exception e) {
logger.error("acquire redis occurred an exception", e);
break;
}
}
logger.info("fail to acquire lock {}", key);
return false;
}
} catch (Throwable e1) {
logger.error("acquire redis occurred an exception", e1);
return false;
}
}
/**
* 加锁
* @param key 锁的 key
* @param value value ( key + value 必须保证唯一)
* @param expire key 的过期时间,单位 ms
* @return 加锁 true 成功
*/
public boolean lockCheck(String key, String value, long expire) {
logger.info("locking... redisK = {}", key);
RLock fairLock = redissonClient.getFairLock(key + ":" + value);
boolean tryLock = false;
try {
tryLock = fairLock.tryLock(0, expire, TimeUnit.MILLISECONDS);
} catch (Throwable e1) {
logger.error("acquire redis occurred an exception", e1);
}
return tryLock;
}
/**
* 加锁
* @param key 锁的 key
* @param value value ( key + value 必须保证唯一)
* @param expire key 的过期时间,单位 ms
* @return 加锁 true 成功
*/
public boolean lockDog(String key, String value, long expire) {
logger.info("locking... redisK = {}", key);
RLock fairLock = redissonClient.getFairLock(key + ":" + value);
boolean tryLock = false;
try {
fairLock.tryLock(0, TimeUnit.MILLISECONDS);
} catch (Throwable e1) {
logger.error("acquire redis occurred an exception", e1);
}
return tryLock;
}
/**
* 释放KEY
* @return 释放锁 true 成功
*/
public boolean unlock(String key, String value) {
RLock fairLock = redissonClient.getFairLock(key + ":" + value);
try {
//如果这里抛异常,后续锁无法释放
if (fairLock.isLocked()) {
fairLock.unlock();
logger.info("release lock success");
return true;
}
} catch (Throwable e) {
logger.error("release lock occurred an exception", e);
}finally {
fairLock.unlock();
}
return false;
}
}
10、定义controller,自定义发送消息
import com.na.integration.socket.websocket.WebSocketHandler;
import com.na.model.dto.NettyRedisConnectionDto;
import com.na.model.vo.NettyRedisConnectionVo;
import com.na.common.utils.JSONUtils;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@RequestMapping("ws")
@RestController
public class WebsocketController {
@Autowired
private RedisTemplate redisTemplate;
/**
* 群发消息
*
* @param idList 要把消息发送给其他用户的主键
*/
@RequestMapping("hello1")
private Map hello(List idList) {
//获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
Map channelMap = WebSocketHandler.getChannelMap();
//获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
Map clientMap = WebSocketHandler.getClientMap();
//解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
idList.stream().forEach(id -> {
String v = clientMap.get(id);
Channel channel = channelMap.get(v);
channel.eventLoop().execute(() -> channel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + "wdy")));
});
redisTemplate.convertAndSend("life.all", "hello publish/subscribe");
return clientMap;
}
/**
* 向redis 发布/订阅模式发送消息 可采用广播消息集群监听
* 需要考虑 接收方是否在线,不在线的情况是缓存还是延迟推送
* 需要考虑是否重复在线,多个实例的本地缓存是否存在同一个id,这样会导致想要接收方混乱的bug
*/
@RequestMapping("sendMessage")
private Map sendMessage(@RequestBody NettyRedisConnectionVo vo) {
//获取所有连接的客户端,如果是集群环境使用redis的hash数据类型存储即可
Map channelMap = WebSocketHandler.getChannelMap();
//获取与用户主键绑定的channel,如果是集群环境使用redis的hash数据类型存储即可
Map clientMap = WebSocketHandler.getClientMap();
//解决问题六,websocket集群中一个客户端向其他客户端主动发送消息,如何实现?
String v = clientMap.get(vo.getSendId());
Channel channel = null;
if (v != null) {
channel = channelMap.get(v);
Channel finalChannel = channel;
//需要发送的 与redis监听定义不同的内容方便测试分辨
channel.eventLoop().execute(() -> finalChannel.writeAndFlush(new TextWebSocketFrame(Thread.currentThread().getName() + "服务器时间" + LocalDateTime.now() + ",message=" + vo.getSendMessage())));
} else {
channel = (Channel) redisTemplate.opsForValue().get("id");
//封装序列化
NettyRedisConnectionDto dto = new NettyRedisConnectionDto()
.setId(vo.getId())
.setSendMessage(vo.getSendMessage())
.setSendId(vo.getSendId())
.setChannel(channel);
redisTemplate.convertAndSend("life.all", JSONUtils.bean2JSONObject(dto));
}
return clientMap;
}
}
11、启动类配置(与springBoot启动类区分)
也可以启动方法代码写在springboot启动类里
WebsocketApplication.java
import com.na.integration.socket.websocket.WebsocketInitialization;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Slf4j
@Component
public class WebsocketApplication {
@Resource
private WebsocketInitialization websocketInitialization;
@PostConstruct
public void start() {
try {
log.info(Thread.currentThread().getName() + ":websocket启动中......");
websocketInitialization.init();
log.info(Thread.currentThread().getName() + ":websocket启动成功!!!");
} catch (Exception e) {
log.error("websocket发生错误:",e);
}
}
}
12、yml配置,其他配置根据自己实际情况来
websocket: port: 7000 #端口 url: /msg #访问url
五、集群测试
1、启动两个springboot server实例端口分别18088、18089,websocket端口分别是7000、7001,用测试工具创建ws协议请求,
注意:
1、unionId需要保持全局唯一。
2、websocket端口和springboot端口不一样

这里可以看到用户client1已经连接进来了

2、如果要是用群发或者指定用户的话,就需要用到广播模式。
指定unionId用户发送请求
http://localhost:18088/ws/sendMessage

3 这里可以看到unionId=2的Channel实例在18089服务上从而发送成功,而18088的实例没有unionId=2的Channel就忽略。


7001客户端也接收到了服务器发来的消息

部分代码参照:微服务springcloud环境下基于Netty搭建websocket集群实现服务器消息推送—-netty是yyds_netty 集群_码学弟的博客-CSDN博客
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/a410b70ebb.html
