阅读 571

netty(十九)Netty优化 - option中的参数优化

经过前面的学习,我们已经学会了Netty的使用。本章节开始我们要进行一些细节方面的学习,使其能更好的运用在我们以后的工作当中。

一、什么是option?

前面学习了Netty的服务端,和客户端,知道了创建服务要分别使用ServerBootStrap和BootStrap,不知道有没有关注到其中有一个方法叫做Option的?

我们分别看下ServerBootStrap和BootStrap的option: 1)ServerBootStrap

image.png

如上图所示,有两种option,一个是自己的option(ServerSocketChannel),一个childOption(ScoketChannel的option)。

2)BootStrap 只有一个option方法。

无论是上述哪两种,参数都是ChannelOption,而这个ChannelOption Netty已经帮我们准备好了,可以直接使用。

下面我们会针对几种重要的配置讲解一下。

二、常用参数

2.1 CONNECT_TIMEOUT_MILLIS

ScoketChannel的参数。

用在客户端建立连接时,如果超过指定的时间仍未连接,则抛出timeout异常。

    public static void main(String[] args) {

        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,500);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8080);
            //阻塞等待连接
            channelFuture.sync();
            //阻塞等待释放连接
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("server error:" + e);
        } finally {
            // 释放EventLoopGroup
            worker.shutdownGracefully();
        }
    }复制代码

只启动客户端,抛出如下异常:

Exception in thread "main" io.netty.channel.ConnectTimeoutException: connection timed out: /127.0.0.1:8080
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
	at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)复制代码

若果该参数设置过长,且服务端确实没启动,则会抛出java层面的异常,拒绝连接:

Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080
Caused by: java.net.ConnectException: Connection refused: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)复制代码

2.2 SO_TIMEOUT

这个参数适用于阻塞IO,比如阻塞IO当中的read,accept等方法,修饰阻塞的,如果不想一直阻塞,可以通过改参数设置超时时间。

不要与CONNECT_TIMEOUT_MILLIS弄混了。

2.3 SO_BACKLOG

ServerSocketChannal 参数。

在了解这个参数之前,要先了解下TCP的三次握手,sync_queue(半连接队列)和accept_queue(全连接队列)。

其中半连接队列是在首次握手时,将请求放入半连接队列,当三次握手全部成功后,将请求从半连接队列放入全连接队列。

下图展示netty和三次握手的关系:

image.png

  • 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列

  • 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server

  • 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue。

在上面的过程中,提到的sync_queue和accept_queue是我们本篇文章需要关注的重点。

在linux2.2之前,backlog包括了两个队列的大小。在之后的版本当中,由如下两个参数来控制:

  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略

  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数比较,取二者的较小值。

    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client。

下面回归正题,在netty当中,通过ChannelOption.SO_BACKLOG设置大小,如下所示:

public class Server {

    public static void main(String[] args) {

        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            //此处为了模拟,设置为2
            serverBootstrap.option(ChannelOption.SO_BACKLOG,2);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            //阻塞等待连接
            channelFuture.sync();
            //阻塞等待释放连接
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("server error:" + e);
        } finally {
            // 释放EventLoopGroup
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}复制代码

如上代码所示,设置了一个backlog为2的值。然后我们需要启动至少三个客户端看结果。

通过前面的三次握手的图,可以知道,只有当服务端处理不过来时,才会使用全连接队列,并将其占满,否则会直接走accept()方法,导致我们看不到测试结果。

所以我们这里不做测试了。

我们看下这个backlog的默认值在nio当中是多少:

在NIO当中backlog在ServerSocketChannel当中的bind方法被调用,所以我们从这里跟踪进去找到bind方法:

    public final ServerSocketChannel bind(SocketAddress local)
        throws IOException
    {
        return bind(local, 0);
    }复制代码

查看bind被哪些地方调用,NioServerSocketChannel:

    @SuppressJava6Requirement(reason = "Usage guarded by java version check")
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }复制代码

跟踪config.getBacklog():

private final ServerSocketChannelConfig config;复制代码

这个config是接口,直接看它的实现DefaultServerSocketChannelConfig:

private volatile int backlog = NetUtil.SOMAXCONN;复制代码

找SOMAXCONN:

public static final int SOMAXCONN;复制代码

找到SOMAXCONN赋值的位置,默认是windows200,Linux或mac默认128,如果有前面我们提到的文件/proc/sys/net/core/somaxconn,则走此配置中的文件:

SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
            @Override
            public Integer run() {
                // Determine the default somaxconn (server socket backlog) value of the platform.
                // The known defaults:
                // - Windows NT Server 4.0+: 200
                // - Linux and Mac OS X: 128
                int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
                File file = new File("/proc/sys/net/core/somaxconn");
                BufferedReader in = null;
                try {
                    // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
                    // try / catch block.
                    // See https://github.com/netty/netty/issues/4936
                    if (file.exists()) {
                        in = new BufferedReader(new FileReader(file));
                        somaxconn = Integer.parseInt(in.readLine());
                        if (logger.isDebugEnabled()) {
                            logger.debug("{}: {}", file, somaxconn);
                        }
                    } else {
                        // Try to get from sysctl
                        Integer tmp = null;
                        if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) {
                            tmp = sysctlGetInt("kern.ipc.somaxconn");
                            if (tmp == null) {
                                tmp = sysctlGetInt("kern.ipc.soacceptqueue");
                                if (tmp != null) {
                                    somaxconn = tmp;
                                }
                            } else {
                                somaxconn = tmp;
                            }
                        }

                        if (tmp == null) {
                            logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}", file,
                                         somaxconn);
                        }
                    }
                } catch (Exception e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}",
                                file, somaxconn, e);
                    }
                } finally {
                    if (in != null) {
                        try {
                            in.close();
                        } catch (Exception e) {
                            // Ignored.
                        }
                    }
                }
                return somaxconn;
            }
        });复制代码

2.4 ulimit

属于操作系统参数。

使用ulimit -n 可以查看当前的最大打开文件数。使用ulimit -a 可以查看当前系统的所有限制值。

linux默认1024,当服务器负载较大时,会发生too many open files的错误,所以我们为了提供并发量,需要手动将其调整。

使用如下命令可以将其调整,但是是临时性的,可以考虑将其放在启动脚本当中:

ulimit -n 4096复制代码

2.5 TCP_NODELAY

属于 SocketChannal 参数。

在前面的文章当中,我们提到过TCP的nagle算法,我们使用netty时,它的默认开始的。

nagle算法会使我们某些较小的数据包造成延迟,因为为了提升效率,nagle会等到收集到一定数据后进行发送,这样可能造成我们消息的延迟。

可以通过如下方式设置,开启无延迟的配置:

serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);复制代码

2.6 SO_SNDBUF & SO_RCVBUF

SO_SNDBUF 属于 SocketChannal 参数 SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数

这两个参数不建议我们手动进行设置,因为操作系统会根据当前占用,进行自动的调整。

2.7 ALLOCATOR

属于 SocketChannal 参数。

ByteBuf的分配器。

serverBootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);复制代码

这个参数只有一个DEFAULT可以使用。

这个参数与ch.alloc().buffer()命令有关,关系着我们分配的buf是池化还是非池化,是直接内存还是堆内存。

我们从上面的Default跟踪进去:

ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;复制代码

继续跟踪DEFAULT_ALLOCATOR:

static final ByteBufAllocator DEFAULT_ALLOCATOR;复制代码

找到对其赋值的位置,发现了如下的静态代码块,此处就是设置buf是pooled还是unpooled,通过环境变量:"io.netty.allocator.type" 指定,我们可以在启动项目时指定-Dio.netty.allocator.type=unpooled设置成非池化。从源码可以看到,安卓是unpooled,其他事pooled。

 static {
        MAX_BYTES_PER_CHAR_UTF8 = (int)CharsetUtil.encoder(CharsetUtil.UTF_8).maxBytesPerChar();
        String allocType = SystemPropertyUtil.get("io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();
        Object alloc;
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
        }

        DEFAULT_ALLOCATOR = (ByteBufAllocator)alloc;
        THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0);
        logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);
        MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16384);
        logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
        FIND_NON_ASCII = new ByteProcessor() {
            public boolean process(byte value) {
                return value >= 0;
            }
        };
    }复制代码

如上代码中的UnpooledByteBufAllocator.DEFAULT和PooledByteBufAllocator.DEFAULT就指定了我们使用的直接内存还是堆内存,跟踪其中的UnpooledByteBufAllocator.DEFAULT看一下:

public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());复制代码

跟踪PlatformDependent.directBufferPreferred():

private static final boolean DIRECT_BUFFER_PREFERRED;复制代码

找DIRECT_BUFFER_PREFERRED赋值的位置:

DIRECT_BUFFER_PREFERRED = CLEANER != NOOP && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);复制代码

重点关注上述代码后半段!SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);,我们可用通过-Dio.netty.noPreferDirect=true环境变量指定我们使用堆内存。

2.8 RCVBUF_ALLOCATOR

属于 SocketChannal 参数。

控制 netty 接收缓冲区大小。

这个RCVBUF_ALLOCATOR不要与前面的ALLOCATOR弄混。

负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定。

通俗的讲在handler内部分配的byteBuf可以是直接内存,也可以是堆内存,但是经过网络io的内存,netty会强制为直接内存。

我们启动一个客户端和服务端去验证一下上述的描述: 服务端:

public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            System.out.println(msg);
                            super.channelRead(ctx, msg);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            //阻塞等待连接
            channelFuture.sync();
            //阻塞等待释放连接
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("server error:" + e);
        } finally {
            // 释放EventLoopGroup
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }复制代码

客户端:

public static void main(String[] args) {

        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ByteBuf byteBuf = ctx.alloc().buffer();
                            byteBuf.writeBytes("hello world!".getBytes());
                            System.out.println(byteBuf);
                            ctx.writeAndFlush(byteBuf);
                            super.channelActive(ctx);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);
            //阻塞等待连接
            channelFuture.sync();
            //阻塞等待释放连接
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("server error:" + e);
        } finally {
            // 释放EventLoopGroup
            worker.shutdownGracefully();
        }
    }复制代码

我们重点关注服务端接收到的Object msg,它应该是一个Bytebuf对象,那么他是什么类型的?如何生成的? 我们通过打断点的方式,找到线程调用的堆栈,发现第一个channel的位置:

image.png

点击进去发现如下代码,并且就是我们得到的ByteBuf:

image.png

重点关注这部分有注释代码:

 ChannelPipeline pipeline = AbstractNioByteChannel.this.pipeline();
                // 上一小节提到的allocator,负责ByteBuf是池化还是非池化
                ByteBufAllocator allocator = config.getAllocator();
                //此处Handle是RecvByteBufAllocator内部类
                Handle allocHandle = this.recvBufAllocHandle();
                allocHandle.reset(config);
                ByteBuf byteBuf = null;
                boolean close = false;

                try {
                    do {
                        //allocate方法创建byteBuf
                        byteBuf = allocHandle.allocate(allocator);复制代码

下面重点关注这个allocate方法,这里会分配一个ioBuffer,即直接内存buffer:

        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(this.guess());
        }复制代码

如上的guess()会根据数据量大小,动态分配buffer的大小,范围如下,自适应的AdaptiveRecvByteBufAllocator,最小不会小于64,最大不会超过65536:

public AdaptiveRecvByteBufAllocator() {
        this(64, 1024, 65536);
    }复制代码


作者:我犟不过你
链接:https://juejin.cn/post/7045194375525040141

玩站网免费分享SEO网站优化 技术及文章 伪原创工具 https://www.237it.com/ 


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐