搭建百万连接服务,使用netty完成websocke的推送

(一)使用websocket

  • ① 介绍

webSocket协议是基于TCP的一种新的网络协议。他的出现实现了网络和浏览器全双工通信,允许服务器主动发送信息给客户端。客户端 给 服务器发消息是半双工,服务器给客户端也发送消息就是全双工。

多客户端多语言多浏览器支持:浏览器,php,Java,ruby,nginx,python,Tomcat,erlang,.net等等。

搭建百万连接服务,使用netty完成websocke的推送

专栏
从0开始学会kubernetes
作者:IT人故事会
49.9币
162人已购
查看
  • ② websocket实现

服务端和客户端交流,通过的是websocket这种协议,内部传输的协议,通过websocket这种方式和普通的socket没有什么区别,唯一一点就是协议不同。

  • ③ websocket示例

绑定9001,这是浏览器js方式完成的,可以发送消息和接收消息,生成一个随机数当userId,在设计这个功能的时候,考虑到消息推送是否需要先登录。登录过调转到登录成功,成功之后拿到userId,在建立websocket连接的时候就可以携带userId,最终知道是那个用户。

<html>  <head>  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />  <title>Web Socket Test</title>  </head>   <body>   <script type="text/javascript"> var socket; if (!window.WebSocket) {  window.WebSocket = window.MozWebSocket; } if (window.WebSocket) {  // 随机数  var random = Math.floor(Math.random()*(10000 - 10 +1) + 10)  socket = new WebSocket("ws://127.0.0.1:9001/websocket?userId=" + random);  socket.onmessage = function(event) {  var ta = document.getElementById('responseText');  ta.value = ta.value + '\n' + event.data  };  socket.onopen = function(event) {  var ta = document.getElementById('responseText');  ta.value = "Web Socket opened!";  };  socket.onclose = function(event) {  var ta = document.getElementById('responseText');  ta.value = ta.value + "Web Socket closed";   }; } else {  alert("Your browser does not support Web Socket."); }  function send(message) {  if (!window.WebSocket) { return; }  if (socket.readyState == WebSocket.OPEN) {  socket.send(message);  } else {  alert("The socket is not open.");  } } </script>   <form onsubmit="return false;">   <input type="text" name="message" value="Hello, World!" />  <input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)" />   <h3>Output</h3>   <textarea id="responseText"></textarea>   </form>   </body> </html> 

java代码示例server端

专栏
程序员内功心法-数据结构与算法
作者:IT人故事会
39.9币
2人已购
查看
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;  public final class WebSocketServer {   static int PORT = 9000;   public static void main(String[] args) throws Exception {   EventLoopGroup bossGroup = new NioEventLoopGroup(1);  EventLoopGroup workerGroup = new NioEventLoopGroup();  try {  ServerBootstrap b = new ServerBootstrap();  b.group(bossGroup, workerGroup)  .channel(NioServerSocketChannel.class)  .option(ChannelOption.SO_REUSEADDR, true)  .childHandler(new WebSocketServerInitializer())  .childOption(ChannelOption.SO_REUSEADDR, true);  b.bind(++PORT).addListener(new ChannelFutureListener() {  public void operationComplete(ChannelFuture future) throws Exception {  if ("true".equals(System.getProperty("netease.debug")))  System.out.println("端口绑定完成:" + future.channel().localAddress());  }  });     // 端口绑定完成,启动消息随机推送(测试)  TestCenter.startTest();  System.in.read();  } finally {  bossGroup.shutdownGracefully();  workerGroup.shutdownGracefully();  }  } }  

server端的 channel类。绑定完成相关的业务逻辑处理,
1.解析http协议编解码
2.最大的解析数据包拦截
3.自己的业务处理,得到msg对象,变成了一个request的对象。

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec;  /**  */ public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {  @Override  public void initChannel(SocketChannel ch) throws Exception {  // 职责链, 数据处理流程  ChannelPipeline pipeline = ch.pipeline();  pipeline.addLast(new HttpServerCodec()); //  pipeline.addLast(new HttpObjectAggregator(65536));  pipeline.addLast(new WebSocketServerHandler());  pipeline.addLast(new NewConnectHandler());  } }  

TestCenter 的类

import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;  import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;  // 正常情况是,后台系统通过接口请求,把数据丢到对应的MQ队列,再由推送服务器读取 public class TestCenter {  // 此处假设一个用户一台设备,否则用户的通道应该是多个。  // TODO 还应该有一个定时任务,用于检测失效的连接(类似缓存中的LRU算法,长时间不使用,就拿出来检测一下是否断开了);  static ConcurrentHashMap<String, Channel> userInfos = new ConcurrentHashMap<String, Channel>();   // 保存信息  public static void saveConnection(String userId, Channel channel) {  userInfos.put(userId, channel);  }   // 退出的时候移除掉  public static void removeConnection(Object userId) {  if (userId != null) {  userInfos.remove(userId.toString());  }  } 

server handler 服务

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.util.AttributeKey;  import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger;  // 新连接建立了 public class NewConnectHandler extends SimpleChannelInboundHandler<FullHttpRequest> {  @Override  protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {  // 解析请求,判断token,拿到用户ID。  Map<String, List<String>> parameters = new QueryStringDecoder(req.uri()).parameters();  // String token = parameters.get("token").get(0); 不是所有人都能连接,比如需要登录之后,发放一个推送的token  String userId = parameters.get("userId").get(0);  ctx.channel().attr(AttributeKey.valueOf("userId")).getAndSet(userId); // channel中保存userId  TestCenter.saveConnection(userId, ctx.channel()); // 保存连接   // 结束  } } 

解析 handler,http解码失败,如果不是websocket协议升级的请求,现在开发的是一个websocket服务器,对于一切不符合我要求的请求,可以不管理它,这跟开发springmvc,要接触的东西,不是url,http协议底层,请求的头部协议,如果不符合就返回400,如果是就返回当前处理我可以接收。正常开始通信了,握手,返回响应的状态码。先处理websocket的握手,后处理websocket的消息。 握手就是你同意给我交往了,

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil;  import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder;  import static io.netty.handler.codec.http.HttpMethod.*; import static io.netty.handler.codec.http.HttpResponseStatus.*; import static io.netty.handler.codec.http.HttpVersion.*;  /**  * Handles handshakes and messages  */ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {   private static final String WEBSOCKET_PATH = "/websocket";   private WebSocketServerHandshaker handshaker;   public static final LongAdder counter = new LongAdder();   @Override  public void channelRead0(ChannelHandlerContext ctx, Object msg) {  counter.add(1);  if (msg instanceof FullHttpRequest) {  // 处理websocket握手  handleHttpRequest(ctx, (FullHttpRequest) msg);  } else if (msg instanceof WebSocketFrame) {  // 处理websocket后续的消息  handleWebSocketFrame(ctx, (WebSocketFrame) msg);  }  }   @Override  public void channelReadComplete(ChannelHandlerContext ctx) {  ctx.flush();  }   private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {  // Handle a bad request. //如果http解码失败 则返回http异常 并且判断消息头有没有包含Upgrade字段(协议升级)  if (!req.decoderResult().isSuccess() || req.method() != GET || (!"websocket".equals(req.headers().get("Upgrade")))) {  sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));  return;  }   // 构造握手响应返回  WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(  getWebSocketLocation(req), null, true, 5 * 1024 * 1024);  handshaker = wsFactory.newHandshaker(req);  if (handshaker == null) {  // 版本不支持  WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());  } else {  handshaker.handshake(ctx.channel(), req);  ctx.fireChannelRead(req.retain()); // 继续传播  }  }   private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {  // Check for closing frame 关闭  if (frame instanceof CloseWebSocketFrame) {  Object userId = ctx.channel().attr(AttributeKey.valueOf("userId")).get();  TestCenter.removeConnection(userId);  handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());  return;  }  if (frame instanceof PingWebSocketFrame) { // ping/pong作为心跳  System.out.println("ping: " + frame);  ctx.write(new PongWebSocketFrame(frame.content().retain()));  return;  }  if (frame instanceof TextWebSocketFrame) {  // Echo the frame  // TODO 处理具体的数据请求(... 聊天室,推送给其他的用户)  //发送到客户端websocket  ctx.channel().write(new TextWebSocketFrame(((TextWebSocketFrame) frame).text()  + ", 欢迎使用Netty WebSocket服务, 现在时刻:"  + new java.util.Date().toString()));   return;  }  // 不处理二进制消息  if (frame instanceof BinaryWebSocketFrame) {  // Echo the frame  ctx.write(frame.retain());  }  }   private static void sendHttpResponse(  ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {  // Generate an error page if response getStatus code is not OK (200).  if (res.status().code() != 200) {  ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);  res.content().writeBytes(buf);  buf.release();  HttpUtil.setContentLength(res, res.content().readableBytes());  }   // Send the response and close the connection if necessary.  ChannelFuture f = ctx.channel().writeAndFlush(res);  if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {  f.addListener(ChannelFutureListener.CLOSE);  }  }   @Override  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  cause.printStackTrace();  ctx.close();  }   private static String getWebSocketLocation(FullHttpRequest req) {  String location = req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH;  return "ws://" + location;  } }   

上边是通过html的客户端,这里用java的netty写一个websocket的客户端

import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;  import java.util.concurrent.atomic.AtomicInteger;  public final class WebSocketClient {    public static void main(String[] args) throws Exception {  final String host = System.getProperty("netease.pushserver.host", "127.0.0.1");  final String maxSize = System.getProperty("netease.client.port.maxSize", "100");  final String maxConnections = System.getProperty("netease.client.port.maxConnections", "60000");  int port = 9001;   EventLoopGroup group = new NioEventLoopGroup();  try {   Bootstrap b = new Bootstrap();  b.group(group).channel(NioSocketChannel.class);  b.option(ChannelOption.SO_REUSEADDR, true);  b.handler(new ChannelInitializer<SocketChannel>() {  @Override  protected void initChannel(SocketChannel ch) {  ChannelPipeline p = ch.pipeline();  p.addLast(new HttpClientCodec());  p.addLast(new HttpObjectAggregator(8192));  p.addLast(WebSocketClientCompressionHandler.INSTANCE);  p.addLast("webSocketClientHandler", new WebSocketClientHandler());  }  });  // tcp 建立连接   b.connect(host, port).sync().get();    System.in.read();  } finally   {  group.shutdownGracefully();  }  } }  

对应客户端的handler,先http捂手获取消息,然后服务端给客户端发送消息,客户端收到了,如果是http消息,可能不能解析,如果是websocket消息,进行消息的打印。客户端收到了消息,把消息的内容给打印出来。

import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.CharsetUtil;  import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger;  // handler 处理多个~ tcp连接建立之后的事件 // open websocket public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {   private WebSocketClientHandshaker handshaker;  private ChannelPromise handshakeFuture;   public ChannelFuture handshakeFuture() {  return handshakeFuture;  }   @Override  public void handlerAdded(ChannelHandlerContext ctx) {  handshakeFuture = ctx.newPromise();  }   static AtomicInteger counter = new AtomicInteger(0);   @Override  public void channelActive(ChannelHandlerContext ctx) {  if (handshaker == null) {  InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();  URI uri = null;  try {  uri = new URI("ws://" + address.getHostString() + ":" + address.getPort() + "/websocket?userId=" + counter.incrementAndGet());  } catch (Exception e) {  e.printStackTrace();  }  handshaker = WebSocketClientHandshakerFactory.newHandshaker(  uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());  }  handshaker.handshake(ctx.channel());  }   @Override  public void channelInactive(ChannelHandlerContext ctx) {  if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client disconnected!");  }   @Override  public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  Channel ch = ctx.channel();  if (!handshaker.isHandshakeComplete()) {  try {  handshaker.finishHandshake(ch, (FullHttpResponse) msg);  if ("true".equals(System.getProperty("netease.debug")))  System.out.println("WebSocket Client connected!");  handshakeFuture.setSuccess();  } catch (WebSocketHandshakeException e) {  if ("true".equals(System.getProperty("netease.debug")))  System.out.println("WebSocket Client failed to connect");  handshakeFuture.setFailure(e);  }  return;  }   if (msg instanceof FullHttpResponse) {  FullHttpResponse response = (FullHttpResponse) msg;  throw new IllegalStateException(  "Unexpected FullHttpResponse (getStatus=" + response.status() +  ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');  }   WebSocketFrame frame = (WebSocketFrame) msg;  if (frame instanceof TextWebSocketFrame) {  TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;  if ("true".equals(System.getProperty("netease.debug")))  System.out.println("WebSocket Client received message: " + textFrame.text());  } else if (frame instanceof PongWebSocketFrame) {  if ("true".equals(System.getProperty("netease.debug")))  System.out.println("WebSocket Client received pong");  } else if (frame instanceof CloseWebSocketFrame) {  if ("true".equals(System.getProperty("netease.debug")))  System.out.println("WebSocket Client received closing");  ch.close();  }  }   @Override  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  cause.printStackTrace();  if (!handshakeFuture.isDone()) {  handshakeFuture.setFailure(cause);  }  ctx.close();  } }  

源码的pom文件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>   <groupId>com.idig8.netty</groupId>  <artifactId>netty-push</artifactId>  <version>1.0.0</version>  <build>  <plugins>  <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-compiler-plugin</artifactId>  <configuration>  <source>8</source>  <target>8</target>  </configuration>  </plugin>  </plugins>  </build>   <dependencies>  <dependency>  <groupId>io.netty</groupId>  <artifactId>netty-all</artifactId>  <version>4.1.33.Final</version>  </dependency>  </dependencies> </project>  

(二)说说netty如何实现百万连接

  • ① 介绍

100w连接需要多少台机器才能构建起来,这肯定是很多的,在测试的过程中,不需要几百个服务器完成百万连接。需要注意服务器支持端口的数量是可以支持很多的,但是如何2台服务器要实现百万连接,需要考虑一个TCP层次的一种限制,两台服务器之间建立的连接数量是有限的。网络四元组(客户端IP,客户端端口,服务端IP,服务端端口)。

  1. 同一个IP的端口数不超过65535个,这是个限制,每一个连接不仅仅在服务器上开启一个端口,在客户端也会开启一个端口,每一个TCP连接涉及到端口数量的限制,客户端只有6万多个端口。(不可能搞那么多机器,所以让一个客户端发起100万的连接请求,如果是生产环境就不用考虑这个问题)
  2. 服务器只有一个端口的情况下,同一个客户端只能对他发起6万多个连接。客户端每发起一个请求,就需要开启一个端口。客户端没有端口就说明它没办法发起请求。
  • ② 解决方案
  1. 服务器开启多个端口,网络上区别机器是通过网络四元组来标记的。客户端的端口虽然有限,但是可以复用里面的端口。
  2. 举个例子:客户端端口20000,已经连接了服务端9001 这个端口。20000端口在去连接服务端另一个端口9002这个端口。对于客户端指定端口复用,操作系统会自动处理的。
  3. netty里面开启了 地址的复用。客户端也开启复用。参数都是TCP参数。ChannelOption.SO_REUSEADDR
  4. client 和server端通过循环的方式,增加多个端口的绑定。

搭建百万连接服务,使用netty完成websocke的推送

  • ③ client端 和 server端需要修改的代码

WebSocketClient

 // tcp 建立连接  for (int i = 0; i < 100; i++) {  for (int j = 0; j < 60000; j++) {  b.connect(host, port).sync().get();  }  port++;  } 

搭建百万连接服务,使用netty完成websocke的推送

WebSocketServer

 for (int i = 0; i < 100; i++) {  b.bind(++PORT).addListener(new ChannelFutureListener() {  public void operationComplete(ChannelFuture future) throws Exception {  if ("true".equals(System.getProperty("netease.debug")))  System.out.println("端口绑定完成:" + future.channel().localAddress());  }  });  }  

搭建百万连接服务,使用netty完成websocke的推送

(三)linux下的百万配置

  • ① 环境配置

测试环境: 两台 centos7 jdk8 4核8G

  • ② centos服务器配置

进程最大文件打开添加参数最大限制

vi /etc/security/limits.conf   # 修改下面的内容  * soft nofile 1000000 * hard nofile 1000000 

全局限制 cat /proc/sys/fs/file-nr

echo 1200000 > /proc/sys/fs/file-max 

服务设置

vi /etc/sysctl.conf #修改下面的内容 fs.file-max = 1000000 

问题汇总

# 客户机开不了这么多连接 ,可能的问题原因端口开放数 linux对外随机分配的端口是有限制,理论上单机对外端口数可达65535,但实际对外可建立的连接默认最大只有28232个 查看: cat /proc/sys/net/ipv4/ip_local_port_range echo "net.ipv4.ip_local_port_range= 1024 65535">> /etc/sysctl.conf sysctl -p  # 如果你的机器差,出现了奇怪的问题~ sysctl -w net.ipv4.tcp_tw_recycle=1 #快速回收time_wait的连接 sysctl -w net.ipv4.tcp_tw_reuse=1  sysctl -w net.ipv4.tcp_timestamps=1 
# 如果发现自己的用例跑不上去,就看看linux日志 tail -f /var/log/messages # linux 日志 1、 nf_conntrack: table full, dropping packet 表示防火墙的表满了,加大 nf_conntrack_max 参数 echo "net.nf_conntrack_max = 1000000">> /etc/sysctl.conf  # 2、 TCP: too many orphaned sockets 表示内存不太够,拒绝分配,一般就是TCP缓冲区内存不够用,调大一点 # cat /proc/sys/net/ipv4/tcp_mem  echo "net.ipv4.tcp_mem = 786432 2097152 16777216">> /etc/sysctl.conf echo "net.ipv4.tcp_rmem = 4096 4096 16777216">> /etc/sysctl.conf echo "net.ipv4.tcp_wmem = 4096 4096 16777216">> /etc/sysctl.conf sysctl -p 
  • ③ centos服务器配置
# 查看某个端口的连接情况 netstat -nat|grep -i "9001"|wc -l netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'  # 网络接口的带宽使用情况  #tcpdump https://www.cnblogs.com/maifengqiang/p/3863168.html  # glances工具 yum install -y glances glances 控制台查看 glances -s 服务器模式查看 
  • ④ 启动
# 服务端启动 java -Xmx4096m -Xms4096m -Dnetease.debug=true -cp netty-all-4.1.33.Final.jar:netty-push-1.0.0.jar com.idig8.netty.push.server.WebSocketServer  # 客户端 java -Xmx4096m -Xms4096m -Dnetease.debug=false -Dnetease.pushserver.host=192.168.100.101 -cp netty-all-4.1.33.Final.jar:netty-push-1.0.0.jar com.idig8.netty.push.client.WebSocketClient   # 发送消息服务端启动 java -Xmx4096m -Xms4096m -Dnetease.debug=true -Dneteae.server.test.sendmsg=true -cp netty-all-4.1.33.Final.jar:netty-push-1.0.0.jar com.idig8.netty.push.server.WebSocketServer 

(四)百万连接配置说明

一台机器为什么能支持百万的连接感觉有点科幻,感觉不可能,首先需要理解NIO的概念,NIO如果没有做任何处理的情况下,网络是不需要java程序处理的,java程序在连接没有产生动静的时候,java是不产生事件的时候,只有真正产生事件的时候,selector会通知,在一个海量连接的过程中,只要没有消息的推送,消息传到服务器来,不管有几百万个请求,都可以去接受,这是NIO的特性,不像BIO一个请求要开启一个连接,并非是无限的增长,对于连接这么多,内存资源首先被消耗,占用最大的是内存,操作系统底层有TCP的关联,java的netty里面也有channel,需要保留连接,一个连接产生一个对应的对象,虽然这个对象没有处理但是会占用内存,跟cpu没有太大管理,只是java程序要处理占用了cpu和内存。

有很多连接有一个误解,百万连接需要很多机器,百万连接的关键是NIO网络的机制,对NIO没有直观认识的时候,不知道NIO能带来什么。在真实的生产环境的情况下,服务端不需要这么多端口,开100个端口,为了让测试服务器可以连接,上边有个命令是发送消息服务器启动,一旦涉及到百万连接和发送消息的话肯定设计到大量的资源消耗,netty调用handler,channel,这些都是消耗资源的,服务器cpu资源会挂掉,4核 linux top的方式来查看cpu最大是400%。

PS:最好是通过代码,自己试一下,了解下百万连接的思路,按照正常是分布式的架构,单机始终是有瓶颈的,100万用户的连接的话单机8g4核轻轻松松,分布式系统就要设计到分布式消息队列,负载均衡,注册中心的概念,推送使用netty方便系统的开发,沾包和拆包的问题方法区解决,而不是自己写一个socket程序很复杂,netty是通过责任链的方式,通过pipline控制之后的步骤。netty的底层是基于NIO,NIO的底层是基于多路复用的机制,多路复用机制是依托于操作系统的,百万连接这个是拼操作系统参数的,java代码是使用的NIO,如果不是使用的NIO,不好意思你达不到,设置到一些系统操作的配置。

您可能还会对下面的文章感兴趣: