netty(十九)Netty优化 - option中的参数优化
经过前面的学习,我们已经学会了Netty的使用。本章节开始我们要进行一些细节方面的学习,使其能更好的运用在我们以后的工作当中。
一、什么是option?
前面学习了Netty的服务端,和客户端,知道了创建服务要分别使用ServerBootStrap和BootStrap,不知道有没有关注到其中有一个方法叫做Option的?
我们分别看下ServerBootStrap和BootStrap的option: 1)ServerBootStrap
如上图所示,有两种option,一个是自己的option(ServerSocketChannel),一个childOption(ScoketChannel的option)。
2)BootStrap 只有一个option方法。
无论是上述哪两种,参数都是ChannelOption,而这个ChannelOption Netty已经帮我们准备好了,可以直接使用。
下面我们会针对几种重要的配置讲解一下。
二、常用参数
2.1 CONNECT_TIMEOUT_MILLIS
ScoketChannel的参数。
用在客户端建立连接时,如果超过指定的时间仍未连接,则抛出timeout异常。
public static void main(String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,500); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8080); //阻塞等待连接 channelFuture.sync(); //阻塞等待释放连接 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { System.out.println("server error:" + e); } finally { // 释放EventLoopGroup worker.shutdownGracefully(); } }复制代码
只启动客户端,抛出如下异常:
Exception in thread "main" io.netty.channel.ConnectTimeoutException: connection timed out: /127.0.0.1:8080 at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261) at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)复制代码
若果该参数设置过长,且服务端确实没启动,则会抛出java层面的异常,拒绝连接:
Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080 Caused by: java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)复制代码
2.2 SO_TIMEOUT
这个参数适用于阻塞IO,比如阻塞IO当中的read,accept等方法,修饰阻塞的,如果不想一直阻塞,可以通过改参数设置超时时间。
不要与CONNECT_TIMEOUT_MILLIS弄混了。
2.3 SO_BACKLOG
ServerSocketChannal 参数。
在了解这个参数之前,要先了解下TCP的三次握手,sync_queue(半连接队列)和accept_queue(全连接队列)。
其中半连接队列是在首次握手时,将请求放入半连接队列,当三次握手全部成功后,将请求从半连接队列放入全连接队列。
下图展示netty和三次握手的关系:
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue。
在上面的过程中,提到的sync_queue和accept_queue是我们本篇文章需要关注的重点。
在linux2.2之前,backlog包括了两个队列的大小。在之后的版本当中,由如下两个参数来控制:
sync queue - 半连接队列
大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略accept queue - 全连接队列
其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数比较,取二者的较小值。
如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client。
下面回归正题,在netty当中,通过ChannelOption.SO_BACKLOG设置大小,如下所示:
public class Server { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); //此处为了模拟,设置为2 serverBootstrap.option(ChannelOption.SO_BACKLOG,2); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { } }); ChannelFuture channelFuture = serverBootstrap.bind(8080); //阻塞等待连接 channelFuture.sync(); //阻塞等待释放连接 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { System.out.println("server error:" + e); } finally { // 释放EventLoopGroup boss.shutdownGracefully(); worker.shutdownGracefully(); } } }复制代码
如上代码所示,设置了一个backlog为2的值。然后我们需要启动至少三个客户端看结果。
通过前面的三次握手的图,可以知道,只有当服务端处理不过来时,才会使用全连接队列,并将其占满,否则会直接走accept()方法,导致我们看不到测试结果。
所以我们这里不做测试了。
我们看下这个backlog的默认值在nio当中是多少:
在NIO当中backlog在ServerSocketChannel当中的bind方法被调用,所以我们从这里跟踪进去找到bind方法:
public final ServerSocketChannel bind(SocketAddress local) throws IOException { return bind(local, 0); }复制代码
查看bind被哪些地方调用,NioServerSocketChannel:
@SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }复制代码
跟踪config.getBacklog():
private final ServerSocketChannelConfig config;复制代码
这个config是接口,直接看它的实现DefaultServerSocketChannelConfig:
private volatile int backlog = NetUtil.SOMAXCONN;复制代码
找SOMAXCONN:
public static final int SOMAXCONN;复制代码
找到SOMAXCONN赋值的位置,默认是windows200,Linux或mac默认128,如果有前面我们提到的文件/proc/sys/net/core/somaxconn,则走此配置中的文件:
SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() { @Override public Integer run() { // Determine the default somaxconn (server socket backlog) value of the platform. // The known defaults: // - Windows NT Server 4.0+: 200 // - Linux and Mac OS X: 128 int somaxconn = PlatformDependent.isWindows() ? 200 : 128; File file = new File("/proc/sys/net/core/somaxconn"); BufferedReader in = null; try { // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the // try / catch block. // See https://github.com/netty/netty/issues/4936 if (file.exists()) { in = new BufferedReader(new FileReader(file)); somaxconn = Integer.parseInt(in.readLine()); if (logger.isDebugEnabled()) { logger.debug("{}: {}", file, somaxconn); } } else { // Try to get from sysctl Integer tmp = null; if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) { tmp = sysctlGetInt("kern.ipc.somaxconn"); if (tmp == null) { tmp = sysctlGetInt("kern.ipc.soacceptqueue"); if (tmp != null) { somaxconn = tmp; } } else { somaxconn = tmp; } } if (tmp == null) { logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}", file, somaxconn); } } } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}", file, somaxconn, e); } } finally { if (in != null) { try { in.close(); } catch (Exception e) { // Ignored. } } } return somaxconn; } });复制代码
2.4 ulimit
属于操作系统参数。
使用ulimit -n 可以查看当前的最大打开文件数。使用ulimit -a 可以查看当前系统的所有限制值。
linux默认1024,当服务器负载较大时,会发生too many open files的错误,所以我们为了提供并发量,需要手动将其调整。
使用如下命令可以将其调整,但是是临时性的,可以考虑将其放在启动脚本当中:
ulimit -n 4096复制代码
2.5 TCP_NODELAY
属于 SocketChannal 参数。
在前面的文章当中,我们提到过TCP的nagle算法,我们使用netty时,它的默认开始的。
nagle算法会使我们某些较小的数据包造成延迟,因为为了提升效率,nagle会等到收集到一定数据后进行发送,这样可能造成我们消息的延迟。
可以通过如下方式设置,开启无延迟的配置:
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);复制代码
2.6 SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数 SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数
这两个参数不建议我们手动进行设置,因为操作系统会根据当前占用,进行自动的调整。
2.7 ALLOCATOR
属于 SocketChannal 参数。
ByteBuf的分配器。
serverBootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);复制代码
这个参数只有一个DEFAULT可以使用。
这个参数与ch.alloc().buffer()命令有关,关系着我们分配的buf是池化还是非池化,是直接内存还是堆内存。
我们从上面的Default跟踪进去:
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;复制代码
继续跟踪DEFAULT_ALLOCATOR:
static final ByteBufAllocator DEFAULT_ALLOCATOR;复制代码
找到对其赋值的位置,发现了如下的静态代码块,此处就是设置buf是pooled还是unpooled,通过环境变量:"io.netty.allocator.type" 指定,我们可以在启动项目时指定-Dio.netty.allocator.type=unpooled设置成非池化。从源码可以看到,安卓是unpooled,其他事pooled。
static { MAX_BYTES_PER_CHAR_UTF8 = (int)CharsetUtil.encoder(CharsetUtil.UTF_8).maxBytesPerChar(); String allocType = SystemPropertyUtil.get("io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim(); Object alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = (ByteBufAllocator)alloc; THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0); logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE); MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16384); logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE); FIND_NON_ASCII = new ByteProcessor() { public boolean process(byte value) { return value >= 0; } }; }复制代码
如上代码中的UnpooledByteBufAllocator.DEFAULT和PooledByteBufAllocator.DEFAULT就指定了我们使用的直接内存还是堆内存,跟踪其中的UnpooledByteBufAllocator.DEFAULT看一下:
public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());复制代码
跟踪PlatformDependent.directBufferPreferred():
private static final boolean DIRECT_BUFFER_PREFERRED;复制代码
找DIRECT_BUFFER_PREFERRED赋值的位置:
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);复制代码
重点关注上述代码后半段!SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);,我们可用通过-Dio.netty.noPreferDirect=true环境变量指定我们使用堆内存。
2.8 RCVBUF_ALLOCATOR
属于 SocketChannal 参数。
控制 netty 接收缓冲区大小。
这个RCVBUF_ALLOCATOR不要与前面的ALLOCATOR弄混。
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定。
通俗的讲在handler内部分配的byteBuf可以是直接内存,也可以是堆内存,但是经过网络io的内存,netty会强制为直接内存。
我们启动一个客户端和服务端去验证一下上述的描述: 服务端:
public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); super.channelRead(ctx, msg); } }); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080); //阻塞等待连接 channelFuture.sync(); //阻塞等待释放连接 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { System.out.println("server error:" + e); } finally { // 释放EventLoopGroup boss.shutdownGracefully(); worker.shutdownGracefully(); } }复制代码
客户端:
public static void main(String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf byteBuf = ctx.alloc().buffer(); byteBuf.writeBytes("hello world!".getBytes()); System.out.println(byteBuf); ctx.writeAndFlush(byteBuf); super.channelActive(ctx); } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080); //阻塞等待连接 channelFuture.sync(); //阻塞等待释放连接 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { System.out.println("server error:" + e); } finally { // 释放EventLoopGroup worker.shutdownGracefully(); } }复制代码
我们重点关注服务端接收到的Object msg,它应该是一个Bytebuf对象,那么他是什么类型的?如何生成的? 我们通过打断点的方式,找到线程调用的堆栈,发现第一个channel的位置:
点击进去发现如下代码,并且就是我们得到的ByteBuf:
重点关注这部分有注释代码:
ChannelPipeline pipeline = AbstractNioByteChannel.this.pipeline(); // 上一小节提到的allocator,负责ByteBuf是池化还是非池化 ByteBufAllocator allocator = config.getAllocator(); //此处Handle是RecvByteBufAllocator内部类 Handle allocHandle = this.recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { //allocate方法创建byteBuf byteBuf = allocHandle.allocate(allocator);复制代码
下面重点关注这个allocate方法,这里会分配一个ioBuffer,即直接内存buffer:
public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(this.guess()); }复制代码
如上的guess()会根据数据量大小,动态分配buffer的大小,范围如下,自适应的AdaptiveRecvByteBufAllocator,最小不会小于64,最大不会超过65536:
public AdaptiveRecvByteBufAllocator() { this(64, 1024, 65536); }复制代码
作者:我犟不过你
链接:https://juejin.cn/post/7045194375525040141
玩站网免费分享SEO网站优化 技术及文章 伪原创工具 https://www.237it.com/