netty实现一对一聊天 (netty百万连接)

(一)使用websocket

  • ① 介绍

webSocket协议是基于TCP的一种新的网络协议。他的出现实现了网络和浏览器全双工通信,允许服务器主动发送信息给客户端。客户端 给 服务器发消息是半双工,服务器给客户端也发送消息就是全双工。

多客户端多语言多浏览器支持:浏览器,php,Java,ruby,nginx,python,Tomcat,erlang,.net等等。

netty实现零拷贝,netty秒级数据流接收

  • ② websocket实现

服务端和客户端交流,通过的是websocket这种协议,内部传输的协议,通过websocket这种方式和普通的socket没有什么区别,唯一一点就是协议不同。

  • ③ websocket示例

绑定9001,这是浏览器js方式完成的,可以发送消息和接收消息,生成一个随机数当userId,在设计这个功能的时候,考虑到消息推送是否需要先登录。登录过调转到登录成功,成功之后拿到userId,在建立websocket连接的时候就可以携带userId,最终知道是那个用户。

<html>
 <head>
 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
 <title>Web Socket Test</title>
 </head> 
 <body> 
 <script type="text/javascript">
var socket;
if (!window.WebSocket) {
 window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
 // 随机数
 var random = Math.floor(Math.random()*(10000 - 10 +1) + 10)
 socket = new WebSocket("ws://127.0.0.1:9001/websocket?userId=" + random);
 socket.onmessage = function(event) {
 var ta = document.getElementById('responseText');
 ta.value = ta.value + '\n' + event.data
 };
 socket.onopen = function(event) {
 var ta = document.getElementById('responseText');
 ta.value = "Web Socket opened!";
 };
 socket.onclose = function(event) {
 var ta = document.getElementById('responseText');
 ta.value = ta.value + "Web Socket closed"; 
 };
} else {
 alert("Your browser does not support Web Socket.");
}

function send(message) {
 if (!window.WebSocket) { return; }
 if (socket.readyState == WebSocket.OPEN) {
 socket.send(message);
 } else {
 alert("The socket is not open.");
 }
}
</script> 
 <form onsubmit="return false;"> 
 <input type="text" name="message" value="Hello, World!" />
 <input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)" /> 
 <h3>Output</h3> 
 <textarea id="responseText" style="width:500px;height:300px;"></textarea> 
 </form> 
 </body>
</html>

java代码示例server端

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public final class WebSocketServer {

 static int PORT = 9000;

 public static void main(String[] args) throws Exception {

 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 ServerBootstrap b = new ServerBootstrap();
 b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_REUSEADDR, true)
 .childHandler(new WebSocketServerInitializer())
 .childOption(ChannelOption.SO_REUSEADDR, true);
 b.bind(++PORT).addListener(new ChannelFutureListener() {
 public void operationComplete(ChannelFuture future) throws Exception {
 if ("true".equals(System.getProperty("netease.debug")))
 System.out.println("端口绑定完成:" + future.channel().localAddress());
 }
 });
 

 // 端口绑定完成,启动消息随机推送(测试)
 TestCenter.startTest();
 System.in.read();
 } finally {
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
}

server端的 channel类。绑定完成相关的业务逻辑处理,1.解析http协议编解码2.最大的解析数据包拦截3.自己的业务处理,得到msg对象,变成了一个request的对象。

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;

/**
 */
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
 @Override
 public void initChannel(SocketChannel ch) throws Exception {
 // 职责链, 数据处理流程
 ChannelPipeline pipeline = ch.pipeline();
 pipeline.addLast(new HttpServerCodec()); //
 pipeline.addLast(new HttpObjectAggregator(65536));
 pipeline.addLast(new WebSocketServerHandler());
 pipeline.addLast(new NewConnectHandler());
 }
}

TestCenter 的类

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

// 正常情况是,后台系统通过接口请求,把数据丢到对应的MQ队列,再由推送服务器读取
public class TestCenter {
 // 此处假设一个用户一台设备,否则用户的通道应该是多个。
 // TODO 还应该有一个定时任务,用于检测失效的连接(类似缓存中的LRU算法,长时间不使用,就拿出来检测一下是否断开了);
 static ConcurrentHashMap<String, Channel> userInfos = new ConcurrentHashMap<String, Channel>();

 // 保存信息
 public static void saveConnection(String userId, Channel channel) {
 userInfos.put(userId, channel);
 }

 // 退出的时候移除掉
 public static void removeConnection(Object userId) {
 if (userId != null) {
 userInfos.remove(userId.toString());
 }
 }

server handler 服务

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.util.AttributeKey;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

// 新连接建立了
public class NewConnectHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
 // 解析请求,判断token,拿到用户ID。
 Map<String, List<String>> parameters = new QueryStringDecoder(req.uri()).parameters();
 // String token = parameters.get("token").get(0); 不是所有人都能连接,比如需要登录之后,发放一个推送的token
 String userId = parameters.get("userId").get(0);
 ctx.channel().attr(AttributeKey.valueOf("userId")).getAndSet(userId); // channel中保存userId
 TestCenter.saveConnection(userId, ctx.channel()); // 保存连接

 // 结束
 }
}

解析 handler,http解码失败,如果不是websocket协议升级的请求,现在开发的是一个websocket服务器,对于一切不符合我要求的请求,可以不管理它,这跟开发springmvc,要接触的东西,不是url,http协议底层,请求的头部协议,如果不符合就返回400,如果是就返回当前处理我可以接收。正常开始通信了,握手,返回响应的状态码。先处理websocket的握手,后处理websocket的消息。 握手就是你同意给我交往了,

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

import static io.netty.handler.codec.http.HttpMethod.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;

/**
 * Handles handshakes and messages
 */
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

 private static final String WEBSOCKET_PATH = "/websocket";

 private WebSocketServerHandshaker handshaker;

 public static final LongAdder counter = new LongAdder();

 @Override
 public void channelRead0(ChannelHandlerContext ctx, Object msg) {
 counter.add(1);
 if (msg instanceof FullHttpRequest) {
 // 处理websocket握手
 handleHttpRequest(ctx, (FullHttpRequest) msg);
 } else if (msg instanceof WebSocketFrame) {
 // 处理websocket后续的消息
 handleWebSocketFrame(ctx, (WebSocketFrame) msg);
 }
 }

 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) {
 ctx.flush();
 }

 private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
 // Handle a bad request. //如果http解码失败 则返回http异常 并且判断消息头有没有包含Upgrade字段(协议升级)
 if (!req.decoderResult().isSuccess() || req.method() != GET || (!"websocket".equals(req.headers().get("Upgrade")))) {
 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
 return;
 }

 // 构造握手响应返回
 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
 getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
 handshaker = wsFactory.newHandshaker(req);
 if (handshaker == null) {
 // 版本不支持
 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
 } else {
 handshaker.handshake(ctx.channel(), req);
 ctx.fireChannelRead(req.retain()); // 继续传播
 }
 }

 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
 // Check for closing frame 关闭
 if (frame instanceof CloseWebSocketFrame) {
 Object userId = ctx.channel().attr(AttributeKey.valueOf("userId")).get();
 TestCenter.removeConnection(userId);
 handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
 return;
 }
 if (frame instanceof PingWebSocketFrame) { // ping/pong作为心跳
 System.out.println("ping: " + frame);
 ctx.write(new PongWebSocketFrame(frame.content().retain()));
 return;
 }
 if (frame instanceof TextWebSocketFrame) {
 // Echo the frame
 // TODO 处理具体的数据请求(... 聊天室,推送给其他的用户)
 //发送到客户端websocket
 ctx.channel().write(new TextWebSocketFrame(((TextWebSocketFrame) frame).text()
 + ", 欢迎使用Netty WebSocket服务, 现在时刻:"
 + new java.util.Date().toString()));

 return;
 }
 // 不处理二进制消息
 if (frame instanceof BinaryWebSocketFrame) {
 // Echo the frame
 ctx.write(frame.retain());
 }
 }

 private static void sendHttpResponse(
 ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
 // Generate an error page if response getStatus code is not OK (200).
 if (res.status().code() != 200) {
 ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
 res.content().writeBytes(buf);
 buf.release();
 HttpUtil.setContentLength(res, res.content().readableBytes());
 }

 // Send the response and close the connection if necessary.
 ChannelFuture f = ctx.channel().writeAndFlush(res);
 if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
 f.addListener(ChannelFutureListener.CLOSE);
 }
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 cause.printStackTrace();
 ctx.close();
 }

 private static String getWebSocketLocation(FullHttpRequest req) {
 String location = req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH;
 return "ws://" + location;
 }
}


上边是通过html的客户端,这里用java的netty写一个websocket的客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;

import java.util.concurrent.atomic.AtomicInteger;

public final class WebSocketClient {


 public static void main(String[] args) throws Exception {
 final String host = System.getProperty("netease.pushserver.host", "127.0.0.1");
 final String maxSize = System.getProperty("netease.client.port.maxSize", "100");
 final String maxConnections = System.getProperty("netease.client.port.maxConnections", "60000");
 int port = 9001;

 EventLoopGroup group = new NioEventLoopGroup();
 try {

 Bootstrap b = new Bootstrap();
 b.group(group).channel(NioSocketChannel.class);
 b.option(ChannelOption.SO_REUSEADDR, true);
 b.handler(new ChannelInitializer<SocketChannel>() {
 @Override
 protected void initChannel(SocketChannel ch) {
 ChannelPipeline p = ch.pipeline();
 p.addLast(new HttpClientCodec());
 p.addLast(new HttpObjectAggregator(8192));
 p.addLast(WebSocketClientCompressionHandler.INSTANCE);
 p.addLast("webSocketClientHandler", new WebSocketClientHandler());
 }
 });
 // tcp 建立连接

 b.connect(host, port).sync().get();
 
 System.in.read();
 } finally

 {
 group.shutdownGracefully();
 }
 }
}

对应客户端的handler,先http捂手获取消息,然后服务端给客户端发送消息,客户端收到了,如果是http消息,可能不能解析,如果是websocket消息,进行消息的打印。客户端收到了消息,把消息的内容给打印出来。

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

// handler 处理多个~ tcp连接建立之后的事件
// open websocket
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

 private WebSocketClientHandshaker handshaker;
 private ChannelPromise handshakeFuture;

 public ChannelFuture handshakeFuture() {
 return handshakeFuture;
 }

 @Override
 public void handlerAdded(ChannelHandlerContext ctx) {
 handshakeFuture = ctx.newPromise();
 }

 static AtomicInteger counter = new AtomicInteger(0);

 @Override
 public void channelActive(ChannelHandlerContext ctx) {
 if (handshaker == null) {
 InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
 URI uri = null;
 try {
 uri = new URI("ws://" + address.getHostString() + ":" + address.getPort() + "/websocket?userId=" + counter.incrementAndGet());
 } catch (Exception e) {
 e.printStackTrace();
 }
 handshaker = WebSocketClientHandshakerFactory.newHandshaker(
 uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
 }
 handshaker.handshake(ctx.channel());
 }

 @Override
 public void channelInactive(ChannelHandlerContext ctx) {
 if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client disconnected!");
 }

 @Override
 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
 Channel ch = ctx.channel();
 if (!handshaker.isHandshakeComplete()) {
 try {
 handshaker.finishHandshake(ch, (FullHttpResponse) msg);
 if ("true".equals(System.getProperty("netease.debug")))
 System.out.println("WebSocket Client connected!");
 handshakeFuture.setSuccess();
 } catch (WebSocketHandshakeException e) {
 if ("true".equals(System.getProperty("netease.debug")))
 System.out.println("WebSocket Client failed to connect");
 handshakeFuture.setFailure(e);
 }
 return;
 }

 if (msg instanceof FullHttpResponse) {
 FullHttpResponse response = (FullHttpResponse) msg;
 throw new IllegalStateException(
 "Unexpected FullHttpResponse (getStatus=" + response.status() +
 ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
 }

 WebSocketFrame frame = (WebSocketFrame) msg;
 if (frame instanceof TextWebSocketFrame) {
 TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
 if ("true".equals(System.getProperty("netease.debug")))
 System.out.println("WebSocket Client received message: " + textFrame.text());
 } else if (frame instanceof PongWebSocketFrame) {
 if ("true".equals(System.getProperty("netease.debug")))
 System.out.println("WebSocket Client received pong");
 } else if (frame instanceof CloseWebSocketFrame) {
 if ("true".equals(System.getProperty("netease.debug")))
 System.out.println("WebSocket Client received closing");
 ch.close();
 }
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 cause.printStackTrace();
 if (!handshakeFuture.isDone()) {
 handshakeFuture.setFailure(cause);
 }
 ctx.close();
 }
}

源码的pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.idig8.netty</groupId>
 <artifactId>netty-push</artifactId>
 <version>1.0.0</version>
 <build>
 <plugins>
 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-compiler-plugin</artifactId>
 <configuration>
 <source>8</source>
 <target>8</target>
 </configuration>
 </plugin>
 </plugins>
 </build>

 <dependencies>
 <dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-all</artifactId>
 <version>4.1.33.Final</version>
 </dependency>
 </dependencies>
</project>

(二)说说netty如何实现百万连接

  • ① 介绍

100w连接需要多少台机器才能构建起来,这肯定是很多的,在测试的过程中,不需要几百个服务器完成百万连接。需要注意服务器支持端口的数量是可以支持很多的,但是如何2台服务器要实现百万连接,需要考虑一个TCP层次的一种限制,两台服务器之间建立的连接数量是有限的。网络四元组(客户端IP,客户端端口,服务端IP,服务端端口)。

  1. 同一个IP的端口数不超过65535个,这是个限制,每一个连接不仅仅在服务器上开启一个端口,在客户端也会开启一个端口,每一个TCP连接涉及到端口数量的限制,客户端只有6万多个端口。(不可能搞那么多机器,所以让一个客户端发起100万的连接请求,如果是生产环境就不用考虑这个问题)
  2. 服务器只有一个端口的情况下,同一个客户端只能对他发起6万多个连接。客户端每发起一个请求,就需要开启一个端口。客户端没有端口就说明它没办法发起请求。
  • ② 解决方案
  1. 服务器开启多个端口,网络上区别机器是通过网络四元组来标记的。客户端的端口虽然有限,但是可以复用里面的端口。
  2. 举个例子:客户端端口20000,已经连接了服务端9001 这个端口。20000端口在去连接服务端另一个端口9002这个端口。对于客户端指定端口复用,操作系统会自动处理的。
  3. netty里面开启了 地址的复用。客户端也开启复用。参数都是TCP参数。ChannelOption.SO_REUSEADDR
  4. client 和server端通过循环的方式,增加多个端口的绑定。

netty实现零拷贝,netty秒级数据流接收

  • ③ client端 和 server端需要修改的代码

WebSocketClient

 // tcp 建立连接
 for (int i = 0; i < 100; i++) {
 for (int j = 0; j < 60000; j++) {
 b.connect(host, port).sync().get();
 }
 port++;
 }

netty实现零拷贝,netty秒级数据流接收

WebSocketServer

 for (int i = 0; i < 100; i++) {
 b.bind(++PORT).addListener(new ChannelFutureListener() {
 public void operationComplete(ChannelFuture future) throws Exception {
 if ("true".equals(System.getProperty("netease.debug")))
 System.out.println("端口绑定完成:" + future.channel().localAddress());
 }
 });
 }

netty实现零拷贝,netty秒级数据流接收

(三)linux下的百万配置

  • ① 环境配置

测试环境: 两台 centos7 jdk8 4核8G

  • ② centos服务器配置

进程最大文件打开添加参数最大限制

vi /etc/security/limits.conf 

# 修改下面的内容

* soft nofile 1000000
* hard nofile 1000000

全局限制 cat /proc/sys/fs/file-nr

echo 1200000 > /proc/sys/fs/file-max

服务设置

vi /etc/sysctl.conf
#修改下面的内容
fs.file-max = 1000000

问题汇总

# 客户机开不了这么多连接 ,可能的问题原因端口开放数
linux对外随机分配的端口是有限制,理论上单机对外端口数可达65535,但实际对外可建立的连接默认最大只有28232个
查看: cat /proc/sys/net/ipv4/ip_local_port_range
echo "net.ipv4.ip_local_port_range= 1024 65535">> /etc/sysctl.conf
sysctl -p

# 如果你的机器差,出现了奇怪的问题~
sysctl -w net.ipv4.tcp_tw_recycle=1 #快速回收time_wait的连接
sysctl -w net.ipv4.tcp_tw_reuse=1 
sysctl -w net.ipv4.tcp_timestamps=1
# 如果发现自己的用例跑不上去,就看看linux日志
tail -f /var/log/messages
# linux 日志
1、 nf_conntrack: table full, dropping packet 表示防火墙的表满了,加大 nf_conntrack_max 参数
echo "net.nf_conntrack_max = 1000000">> /etc/sysctl.conf

# 2、 TCP: too many orphaned sockets 表示内存不太够,拒绝分配,一般就是TCP缓冲区内存不够用,调大一点
# cat /proc/sys/net/ipv4/tcp_mem 
echo "net.ipv4.tcp_mem = 786432 2097152 16777216">> /etc/sysctl.conf
echo "net.ipv4.tcp_rmem = 4096 4096 16777216">> /etc/sysctl.conf
echo "net.ipv4.tcp_wmem = 4096 4096 16777216">> /etc/sysctl.conf
sysctl -p
  • ③ centos服务器配置
# 查看某个端口的连接情况
netstat -nat|grep -i "9001"|wc -l
netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'

# 网络接口的带宽使用情况 
#tcpdump https://www.cnblogs.com/maifengqiang/p/3863168.html

# glances工具
yum install -y glances
glances 控制台查看
glances -s 服务器模式查看
  • ④ 启动
# 服务端启动
java -Xmx4096m -Xms4096m -Dnetease.debug=true -cp netty-all-4.1.33.Final.jar:netty-push-1.0.0.jar com.idig8.netty.push.server.WebSocketServer

# 客户端
java -Xmx4096m -Xms4096m -Dnetease.debug=false -Dnetease.pushserver.host=192.168.100.101 -cp netty-all-4.1.33.Final.jar:netty-push-1.0.0.jar com.idig8.netty.push.client.WebSocketClient


# 发送消息服务端启动
java -Xmx4096m -Xms4096m -Dnetease.debug=true -Dneteae.server.test.sendmsg=true -cp netty-all-4.1.33.Final.jar:netty-push-1.0.0.jar com.idig8.netty.push.server.WebSocketServer

(四)百万连接配置说明

一台机器为什么能支持百万的连接感觉有点科幻,感觉不可能,首先需要理解NIO的概念,NIO如果没有做任何处理的情况下,网络是不需要java程序处理的,java程序在连接没有产生动静的时候,java是不产生事件的时候,只有真正产生事件的时候,selector会通知,在一个海量连接的过程中,只要没有消息的推送,消息传到服务器来,不管有几百万个请求,都可以去接受,这是NIO的特性,不像BIO一个请求要开启一个连接,并非是无限的增长,对于连接这么多,内存资源首先被消耗,占用最大的是内存,操作系统底层有TCP的关联,java的netty里面也有channel,需要保留连接,一个连接产生一个对应的对象,虽然这个对象没有处理但是会占用内存,跟cpu没有太大管理,只是java程序要处理占用了cpu和内存。

有很多连接有一个误解,百万连接需要很多机器,百万连接的关键是NIO网络的机制,对NIO没有直观认识的时候,不知道NIO能带来什么。在真实的生产环境的情况下,服务端不需要这么多端口,开100个端口,为了让测试服务器可以连接,上边有个命令是发送消息服务器启动,一旦涉及到百万连接和发送消息的话肯定设计到大量的资源消耗,netty调用handler,channel,这些都是消耗资源的,服务器cpu资源会挂掉,4核 linux top的方式来查看cpu最大是400%。

PS:最好是通过代码,自己试一下,了解下百万连接的思路,按照正常是分布式的架构,单机始终是有瓶颈的,100万用户的连接的话单机8g4核轻轻松松,分布式系统就要设计到分布式消息队列,负载均衡,注册中心的概念,推送使用netty方便系统的开发,沾包和拆包的问题方法区解决,而不是自己写一个socket程序很复杂,netty是通过责任链的方式,通过pipline控制之后的步骤。netty的底层是基于NIO,NIO的底层是基于多路复用的机制,多路复用机制是依托于操作系统的,百万连接这个是拼操作系统参数的,java代码是使用的NIO,如果不是使用的NIO,不好意思你达不到,设置到一些系统操作的配置。