涓轰粈涔堣浣跨敤netty (netty鐨勪娇鐢ㄥ師鐞嗗強浼樺娍)

Netty是一个NIO客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化并简化了TCP和UDP套接字服务器等网络编程。 “快速简便”并不意味着最终的应用程序会受到可维护性或性能问题的影响。 Netty经过精心设计,具有丰富的协议,如FTP,SMTP,HTTP以及各种二进制和基于文本的传统协议。因此,Netty成功地找到了一种在不妥协的情况下实现易于开发,性能,稳定性和灵活性的方法。(--来自netty官网翻译)

基于Netty开发,我能能更快捷,更容易的开发高性能IO网络应用。从应用开发角度出发,netty有一下核心组件:

Bootstrap与ServerBootstrap(前者客户端,后者服务端)

EventLoop

ChannelHandler

ChannelFuture

Channel

ChannelPipeline

回顾Netty官网提供的demo

public final class EchoServer {
 
 static final boolean SSL = System.getProperty("ssl") != null;
 //服务器端口
 static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
 
 public static void main(String[] args) throws Exception {
 // 配置 ssl
 final SslContext sslCtx;
 if (SSL) {
 SelfSignedCertificate ssc = new SelfSignedCertificate();
 sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
 } else {
 sslCtx = null;
 }
 
 // 配置服务器.
 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 
 //自定义处理器
 final EchoServerHandler serverHandler = new EchoServerHandler();
 try {
 ServerBootstrap b = new ServerBootstrap();
 b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
 @Override
 public void initChannel(SocketChannel ch) throws Exception {
 ChannelPipeline p = ch.pipeline();
 if (sslCtx != null) {
 p.addLast(sslCtx.newHandler(ch.alloc()));
 }
 //p.addLast(new LoggingHandler(LogLevel.INFO));
 p.addLast(serverHandler);
 }
 });
 
 // 启动服务
 ChannelFuture f = b.bind(PORT).sync();
 
 // 等待直至服务socket关闭
 f.channel().closeFuture().sync();
 } finally {
 // 关闭所有EventLoop
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
}
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 ctx.write(msg);
 }
 
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) {
 ctx.flush();
 }
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 cause.printStackTrace();
 ctx.close();
 }
}
 <dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-transport</artifactId>
 <version>4.1.24.Final</version>
 <scope>test</scope>
 <type>jar</type>
 </dependency>

线程模型

Netty是一个高度抽象的架构,其主要特点之一就是他的线程模型,上述官方提供的EchoServer demo,采用的是Reactor模式。如图:

涓轰粈涔堣鐢╪etty,涓轰粈涔堣浣跨敤netty

因此细看netty的几个核心组件,可以发现

  • EventLoopGroop 实际上是继承了 java.util.concurrent.ScheduledExecutorService。
  • ChannelFuture 是继承了 java.util.concurrent.Future

那么程序启动时做了什么,从入口出发,ServerBootstrap bind(int inetPort)。和nio一样最终通过调用本地方法来绑定指定端口如下:sun.​nio.​ch.​ServerSocketChannelImpl bind(SocketAddress local, int backlog)

 public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
 synchronized (lock) {
 if (!isOpen())
 throw new ClosedChannelException();
 if (isBound())
 throw new AlreadyBoundException();
 InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
 Net.checkAddress(local);
 SecurityManager sm = System.getSecurityManager();
 if (sm != null)
 sm.checkListen(isa.getPort());
 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
 Net.bind(fd, isa.getAddress(), isa.getPort());
 Net.listen(fd, backlog < 1 ? 50 : backlog);
 synchronized (stateLock) {
 localAddress = Net.localAddress(fd);
 }
 }
 return this;
 }

负责监听接受端口信息,并提交给指定的Handler来响应请求,代码如下:

io.​netty.​channel.​nio.​NioEventLoop void run()

@Override
 protected void run() {
 for (;;) {
 try {
 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
 case SelectStrategy.CONTINUE:
 continue;
 case SelectStrategy.SELECT:
 select(wakenUp.getAndSet(false));
 
 // ’wakenUp.compareAndSet(false, true)’ is always evaluated
 // before calling ’selector.wakeup()’ to reduce the wake-up
 // overhead. (Selector.wakeup() is an expensive operation.)
 //
 // However, there is a race condition in this approach.
 // The race condition is triggered when ’wakenUp’ is set to
 // true too early.
 //
 // ’wakenUp’ is set to true too early if:
 // 1) Selector is waken up between ’wakenUp.set(false)’ and
 // ’selector.select(...)’. (BAD)
 // 2) Selector is waken up between ’selector.select(...)’ and
 // ’if (wakenUp.get()) { ... }’. (OK)
 //
 // In the first case, ’wakenUp’ is set to true and the
 // following ’selector.select(...)’ will wake up immediately.
 // Until ’wakenUp’ is set to false again in the next round,
 // ’wakenUp.compareAndSet(false, true)’ will fail, and therefore
 // any attempt to wake up the Selector will fail, too, causing
 // the following ’selector.select(...)’ call to block
 // unnecessarily.
 //
 // To fix this problem, we wake up the selector again if wakenUp
 // is true immediately after selector.select(...).
 // It is inefficient in that it wakes up the selector for both
 // the first case (BAD - wake-up required) and the second case
 // (OK - no wake-up required).
 
 if (wakenUp.get()) {
 selector.wakeup();
 }
 // fall through
 default:
 }
 
 cancelledKeys = 0;
 needsToSelectAgain = false;
 final int ioRatio = this.ioRatio;
 if (ioRatio == 100) {
 try {
 processSelectedKeys();
 } finally {
 // Ensure we always run tasks.
 runAllTasks();
 }
 } else {
 final long ioStartTime = System.nanoTime();
 try {
 processSelectedKeys();
 } finally {
 // Ensure we always run tasks.
 final long ioTime = System.nanoTime() - ioStartTime;
 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
 }
 }
 } catch (Throwable t) {
 handleLoopException(t);
 }
 // Always handle shutdown even if the loop processing threw an exception.
 try {
 if (isShuttingDown()) {
 closeAll();
 if (confirmShutdown()) {
 return;
 }
 }
 } catch (Throwable t) {
 handleLoopException(t);
 }
 }
 }

1.当服务器有请求过来的时候,程序就进入 processSelectedKeys() ,具体实现如下:

io.​netty.​channel.​nio.​NioEventLoop void processSelectedKeys() 与 void processSelectedKeysOptimized();

private void processSelectedKeys() {
 if (selectedKeys != null) {
 processSelectedKeysOptimized();
 } else {
 processSelectedKeysPlain(selector.selectedKeys());
 }
 }
 
private void processSelectedKeysOptimized() {
	for (int i = 0; i < selectedKeys.size; ++i) {
		final SelectionKey k = selectedKeys.keys[i];
		// null out entry in the array to allow to have it GC’ed once the Channel close
		// See https://github.com/netty/netty/issues/2363
		selectedKeys.keys[i] = null;
 
		final Object a = k.attachment();
 
		if (a instanceof AbstractNioChannel) {
			processSelectedKey(k, (AbstractNioChannel) a);
		} else {
			@SuppressWarnings("unchecked")
			NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
			processSelectedKey(k, task);
		}
 
		if (needsToSelectAgain) {
			// null out entries in the array to allow to have it GC’ed once the Channel close
			// See https://github.com/netty/netty/issues/2363
			selectedKeys.reset(i + 1);
 
			selectAgain();
			i = -1;
		}
	}
}

2.接着会进入 io.​netty.​channel.​nio.​NioEventLoop void processSelectedKey(SelectionKey k, AbstractNioChannel ch),具体实现如下:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
	final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
	if (!k.isValid()) {
		final EventLoop eventLoop;
		try {
			eventLoop = ch.eventLoop();
		} catch (Throwable ignored) {
			// If the channel implementation throws an exception because there is no event loop, we ignore this
			// because we are only trying to determine if ch is registered to this event loop and thus has authority
			// to close ch.
			return;
		}
		// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
		// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
		// still healthy and should not be closed.
		// See https://github.com/netty/netty/issues/5125
		if (eventLoop != this || eventLoop == null) {
			return;
		}
		// close the channel if the key is not valid anymore
		unsafe.close(unsafe.voidPromise());
		return;
	}
 
	try {
		int readyOps = k.readyOps();
		// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
		// the NIO JDK channel implementation may throw a NotYetConnectedException.
		if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
			// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
			// See https://github.com/netty/netty/issues/924
			int ops = k.interestOps();
			ops &= ~SelectionKey.OP_CONNECT;
			k.interestOps(ops);
 
			unsafe.finishConnect();
		}
 
		// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
		if ((readyOps & SelectionKey.OP_WRITE) != 0) {
			// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
			ch.unsafe().forceFlush();
		}
 
		// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
		// to a spin loop
		if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
			unsafe.read();
		}
	} catch (CancelledKeyException ignored) {
		unsafe.close(unsafe.voidPromise());
	}
}

3.紧接着进入 io.​netty.​channel.​nio.​AbstractNioByteChannel.​NioByteUnsafe void read(),具体实现如下:

public final void read() {
	final ChannelConfig config = config();
	if (shouldBreakReadReady(config)) {
		clearReadPending();
		return;
	}
	final ChannelPipeline pipeline = pipeline();
	final ByteBufAllocator allocator = config.getAllocator();
	final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
	allocHandle.reset(config);
 
	ByteBuf byteBuf = null;
	boolean close = false;
	try {
		do {
			byteBuf = allocHandle.allocate(allocator);
			allocHandle.lastBytesRead(doReadBytes(byteBuf));
			if (allocHandle.lastBytesRead() <= 0) {
				// nothing was read. release the buffer.
				byteBuf.release();
				byteBuf = null;
				close = allocHandle.lastBytesRead() < 0;
				if (close) {
					// There is nothing left to read as we received an EOF.
					readPending = false;
				}
				break;
			}
 
			allocHandle.incMessagesRead(1);
			readPending = false;
			pipeline.fireChannelRead(byteBuf);
			byteBuf = null;
		} while (allocHandle.continueReading());
 
		allocHandle.readComplete();
		pipeline.fireChannelReadComplete();
 
		if (close) {
			closeOnRead(pipeline);
		}
	} catch (Throwable t) {
		handleReadException(pipeline, byteBuf, t, close, allocHandle);
	} finally {
		// Check if there is a readPending which was not processed yet.
		// This could be for two reasons:
		// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
		// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
		//
		// See https://github.com/netty/netty/issues/2254
		if (!readPending && !config.isAutoRead()) {
			removeReadOp();
		}
	}
}

4.然后就是具体实现类 io.​netty.​channel.​DefaultChannelPipeline void invokeChannelRead(AbstractChannelHandlerContext next, Object msg),代码如下:

public final ChannelPipeline fireChannelRead(Object msg) {
	AbstractChannelHandlerContext.invokeChannelRead(head, msg);
	return this;
}

5.最后则是 io.​netty.​channel.​AbstractChannelHandlerContext invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),代码如下:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
 EventExecutor executor = next*ex.e**cutor();
 if (executor.inEventLoop()) {
 next.invokeChannelRead(m);
 } else {
 executor*ex.e**cute(new Runnable() {
 @Override
 public void run() {
 next.invokeChannelRead(m);
 }
 });
 }
 }

此处的 next.invokeChannelRead(m); 即是执行对应的 ChannelHandler 的 void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) 方法。

--------------------- 本文来自 马思明 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/u012376084/article/details/82914737?utm_source=copy