IO入门

简介

netty是由jboss提供的一个开源java框架,新增为github上的独立项目

netty是一个异步,基于事件驱动的网络应用框架,用以快速开发高性能,高可靠性的网络io程序

netty主要针对在tcp协议下,面向clients端的高并发应用,或者peertopeer场景下的大量数据传输的应用

netty本质是一个NIO框架,适用于服务通讯相关的多种场景

NIO

non-blocking io 非阻塞io

有时候也会当做new IO

Channel

channel有点类似于stream,它就是读写数据的双向通道,可以从channel,将数据读入buffer,也可以将buffer的数据写入channel,而之前的stream要么是输入,要么是输出,channel要比stream更为底层

常见的channel有

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

buffer则用来缓冲读写数,常见的buffer有

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

selector选择器

多线程:如果每一个客户端连接都新建一个线程,会导致内存占用高,,上下文切换成本高,只适合连接数少的场景

线程池:线程池还可以让线程的创建和回收成本相对较低,有效控制了线程的最大数量,但问题是,阻塞模式下,一个线程只能处理一个socket连接,所以仅适合短连接场景

selector是如何解决这个问题的

selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程屌丝在一个channel上,适合连接数特别多,但低流量的场景

调用selector的select()会阻塞到channel发生了读写就绪时间,这些事件发生,select方法就会返回这些事件交给thread处理

用餐厅举例子

  • 传统情况下是来一个客户就新招募一个服务员给对应客户
  • 线程池是招募多个服务员,每一个客户对应一个服务员,多的客户只能等待
  • selector是,服务员在客户需要的时候,为对应客户提供服务,在其他的时候可以给别的客户提供服务

实例

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>
package nio;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * @Author: tongck
 * @Date: 2022/8/30 10:37
 */
public class TestByteBuffer {
    public static void main(String[] args)  {
        try {
            //准备管道 fileChannel
            FileChannel channel = new FileInputStream("1.txt").getChannel();
            //准备缓冲区 容量10个字节
            ByteBuffer buffer = ByteBuffer.allocate(10);
            //从channel读取数据,向buffer写入
            while (true){
                int len = channel.read(buffer);
                //返回值为读到的字节数,如果为-1就说明读到末尾了
                if (len==-1){
                    break;
                }
                //打印buffer内容
                buffer.flip();//切换至读模式
                while (buffer.hasRemaining()){//是否还有剩余的数据
                    byte b = buffer.get();
                    System.out.print((char)b);
                }
                buffer.clear();//切换写模式
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

使用步骤

  1. 向buffer写数据,列入channel.read(buffer)
  2. 调用flip()切换至读模式
  3. 从buffer读取数据,列入调用buffer.get()
  4. 调用clear()活compact切换至写模式
  5. 重复1-4步

byteBuffer

数据结构

bytebuffer有以下重要属性

  • capacity:容量
  • position:指针下标
  • limit:读写线程

一开始

写模式下,position是写入位置,limit等于容量,下图表示写入了4个字节后的状态

写完开始读取,flip动作发生后,position切换读取位置,limit切换为读取限制

当读取四个字节后

clear发生之后的状态

compact方法是把未读完的部分向前压缩,然后切换至写模式

常见方法

引入测试工具类

package nio;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * @Author: tongck
 * @Date: 2022/8/30 10:37
 */
public class TestByteBuffer {
    public static void main(String[] args)  {
        try {
            //准备管道 fileChannel
            FileChannel channel = new FileInputStream("1.txt").getChannel();
            //准备缓冲区 容量10个字节
            ByteBuffer buffer = ByteBuffer.allocate(10);
            //从channel读取数据,向buffer写入
            while (true){
                int len = channel.read(buffer);
                //返回值为读到的字节数,如果为-1就说明读到末尾了
                if (len==-1){
                    break;
                }
                //打印buffer内容
                buffer.flip();//切换至读模式
                while (buffer.hasRemaining()){//是否还有剩余的数据
                    byte b = buffer.get();
                    System.out.print((char)b);
                }
                buffer.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

测试程序

package nio;

import java.nio.ByteBuffer;
import static nio.ByteBufferUtil.*;
/**
 * @Author: tongck
 * @Date: 2022/8/30 13:12
 */
public class TestByteBufferReadWrite {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put((byte) 0x61); //'a'
        debugAll(buffer);
        buffer.put(new byte[]{0x62,0x63,0x64});
        debugAll(buffer);
        buffer.flip();//切换读模式
        System.out.print(buffer.get());//输出的时候转成10进制了
        debugAll(buffer);
        buffer.compact();
        debugAll(buffer);
        buffer.put(new byte[]{0x65,0x6f});
        debugAll(buffer);
    }
}

最后输出

+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [10]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 00 00 00 00 00 00 00 00 00                   |a.........      |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [4], limit: [10]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00                   |abcd......      |
+--------+-------------------------------------------------+----------------+
97+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [4]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00                   |abcd......      |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [3], limit: [10]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 64 00 00 00 00 00 00                   |bcdd......      |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [10]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 65 6f 00 00 00 00 00                   |bcdeo.....      |
+--------+-------------------------------------------------+----------------+

分配空间

package nio;

import java.nio.ByteBuffer;

/**
 * @Author: tongck
 * @Date: 2022/8/30 13:37
 */
public class TestByteBufferAllocate {
    public static void main(String[] args) {
        //这个容量是固定的,超过就会报错
        System.out.println(ByteBuffer.allocate(16).getClass());
        System.out.println(ByteBuffer.allocateDirect(16).getClass());
        /*
        *   class java.nio.HeapByteBuffer           -java堆内存,读写效率较低,受到gc影响
            class java.nio.DirectByteBuffer         -直接内存,读写效率高,少一次数据的拷贝,分配内存的效率低下,使用不当会造成内存泄漏
        * */
    }
}

读写数据

写入
  • 调用channel的read方法:从channel向buffer写
  • 调用buffer的put方法:直接往buff写入字节数组
int readBytes=channel.read(buf)

buf.put((byte)127)
读取数据

一样有俩种方法

  • channel的write方法
  • 调用buffer自己的get方法
int writeBytes=channel.write(buf);

byte b= buf.get();

get方法会让position读指针向后走,如果想重复读取数据

可以调用rewind方法将position重新设置为0

调用get(int i)方法读取索引i的内容,它不会移动读指针

package nio;

import java.nio.ByteBuffer;
import static nio.ByteBufferUtil.*;
/**
 * @Author: tongck
 * @Date: 2022/8/30 13:59
 */
public class TestByteBufferRead {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put(new byte[]{'a','b','c','d'});
        buffer.flip();

        //rewind从头开始读
         buffer.get(new byte[4]);
        debugAll(buffer);
        buffer.rewind();
        System.out.println((char)buffer.get());
        buffer.rewind();
        //mark做一个标记,记录position位置,reset是将position重置到mark位置
        System.out.println((char)buffer.get());
        System.out.println((char)buffer.get());
        buffer.mark();//添加标记索引为2的位置
        System.out.println((char)buffer.get());
        System.out.println((char)buffer.get());
        buffer.reset();//重置position到上次mark的位置
        System.out.println((char)buffer.get());
        System.out.println((char)buffer.get());
        //get(i)不会改变读索引的位置
        System.out.println((char)buffer.get(3));

    }
}

与字符串转换

package nio;

import io.netty.buffer.ByteBuf;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import static nio.ByteBufferUtil.debugAll;

/**
 * @Author: tongck
 * @Date: 2022/8/31 9:07
 */
public class TestBufferString {
    public static void main(String[] args) {
//        1.字符串转byteBuffer
        ByteBuffer buffer1 = ByteBuffer.allocate(16);
        buffer1.put("hello".getBytes());
        debugAll(buffer1);
//        2.charset字符集 这里会自动切换到读模式
        ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello!");
        debugAll(buffer2);
//         wrap 也会自动切换到读模式
        ByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());
        debugAll(buffer3);

//        buffer转字符串转buffer1的话会有问题,因为buffer1现在还是写模式,要先flop()2和3就没问题
        String str1 = StandardCharsets.UTF_8.decode(buffer2).toString();
        System.out.println(str1);

    }
}

分散集中读写

拷贝文件的前30字节

package nio;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import static nio.ByteBufferUtil.debugAll;

/**
 * @Author: tongck
 * @Date: 2022/8/31 9:34
 */
public class TestScatteringReads {
    public static void main(String[] args) {
        try(FileChannel channel = new RandomAccessFile("1.txt", "r").getChannel()){
            ByteBuffer b1 = ByteBuffer.allocate(10);
            ByteBuffer b2 = ByteBuffer.allocate(10);
            ByteBuffer b3 = ByteBuffer.allocate(10);
            channel.read(new ByteBuffer[]{b1,b2,b3});
            b1.flip();
            b2.flip();
            b3.flip();
            debugAll(b1);
            debugAll(b2);
            debugAll(b3);
            //rw读写模式
            FileChannel channel2 = new RandomAccessFile("2.txt", "rw").getChannel();
            channel2.write(new ByteBuffer[]{b1,b2,b3});
        }catch (IOException e){

        }
    }
}

黏包半包

网络上有多条数据发给服务器,数据之间使用n进行分割,但是由于某种原因在接收时,被进行了重新组合

package nio;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import static nio.ByteBufferUtil.debugAll;

/**
 * @Author: tongck
 * @Date: 2022/8/31 10:49
 */
public class TestByteBufferExam {
    public static void main(String[] args) throws FileNotFoundException {
        ByteBuffer source = ByteBuffer.allocate(32);
        source.put("hello,world\nI'm zhangsan\nHo".getBytes());
        split(source);
        source.put("w are you?\n".getBytes());
        split(source);

    }
    private static void split(ByteBuffer source){
        source.flip();
//        compact会把未读的向前移动
        for (int i=0;i<source.limit();i++){
            //找到完整的消息
            if (source.get(i)=='\n'){
                int length=i+1-source.position();
                //存入新的byteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                //从source 读,向target写
                for (int j=0;j<length;j++){
                    target.put(source.get());
                }
                debugAll(target);
            }

        }
        source.compact();

    }
}

文件编程

fileChannel只能工作在阻塞模式下

获取

不能直接打开fileChannel,必须通过FileInputStream,FileOutPutStream,或者RandomAccessFile来获取FileChannel,它们都有getChannel方法

实例

package nio;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

/**
 * @Author: tongck
 * @Date: 2022/8/31 11:13
 */
public class TestFileChannelTransferTo {
    public static void main(String[] args) {

        try (FileChannel from = new FileInputStream("1.txt").getChannel();
            FileChannel to = new FileOutputStream("to.txt").getChannel();
        ){
            //效率高,底层会用操作系统的零拷贝进行优化
            from.transferTo(0,from.size(),to);
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

拷贝超过2g的文件

package nio;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

/**
 * @Author: tongck
 * @Date: 2022/8/31 11:13
 */
public class TestFileChannelTransferTo {
    public static void main(String[] args) {

        try (FileChannel from = new FileInputStream("test11.zip").getChannel();
            FileChannel to = new FileOutputStream("t2.zip").getChannel();
        ){
            //效率高,底层会用操作系统的零拷贝进行优化,最多2g数据
            long size=from.size();
            for (long left=size;left>0;){
                //虽然写着从头到尾,但其实只会拷贝2g,剩余数据的大小将会返回出来,然后继续下一轮操作
                left=left-from.transferTo((size-left),left,to);
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

获取文件数量代码

package nio;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: tongck
 * @Date: 2022/8/31 13:38
 */
public class TestFilesWalkFileTree {
    public static void main(String[] args) throws IOException {
        AtomicInteger dirCount = new AtomicInteger();
        AtomicInteger fileCount = new AtomicInteger();
        Files.walkFileTree(Paths.get("E:\\code\\java\\test11"),new SimpleFileVisitor<Path>(){

            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                System.out.println("=====>"+dir);
                dirCount.incrementAndGet();
                return super.preVisitDirectory(dir, attrs);
            }

            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                System.out.println("---->"+file);
                fileCount.incrementAndGet();

                return super.visitFile(file, attrs);
            }
        });
        System.out.println("文件"+fileCount);
        System.out.println("文件夹"+dirCount);//包含最外层
    }
}

删除全部文件

package nio;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

/**
 * @Author: tongck
 * @Date: 2022/8/31 13:38
 */
public class TestFilesWalkFileTree {
    public static void main(String[] args) throws IOException {
       Files.walkFileTree(Paths.get("E:\\code\\java\\test11\\1"),new SimpleFileVisitor<Path>(){

           @Override
           public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
               System.out.println("进入文件夹=====>"+dir);
               return super.preVisitDirectory(dir, attrs);
           }

           @Override
           public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                Files.delete(file);
               return super.visitFile(file, attrs);
           }

           @Override
           public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
               Files.delete(dir);
               System.out.println("退出文件夹<====="+dir);
               return super.postVisitDirectory(dir, exc);
           }
       });
    }
}

复制文件夹

package nio;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

/**
 * @Author: tongck
 * @Date: 2022/8/31 14:17
 */
public class TestCopy {
    public static void main(String[] args) throws IOException {
        String source="E:\\code\\java\\test11\\1";
        String target="E:\\code\\java\\test11\\2";
        Files.walk(Paths.get(source)).forEach(path->{
            String targetName = path.toString().replace(source, target);
            try {
                if (Files.isDirectory(path)){//如果是目录就创建目录
                    Files.createDirectories(Paths.get(targetName));
                }else if (Files.isRegularFile(path)){//如果是文件就拷贝文件
                    Files.copy(path, Paths.get(targetName));
                }
            }catch (IOException e){

            }

        });
    }
}

网络编程

阻塞模式

服务端

package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;

import static nio.ByteBufferUtil.debugRead;

/**
 * @Author: tongck
 * @Date: 2022/8/31 14:59
 */
public class Server {
    public static void main(String[] args) throws IOException {
        //使用nio来理解阻塞模式,单线程处理
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //创建了服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
//        绑定监听端口
        ssc.bind(new InetSocketAddress(7088));
        //连接集合
        List<SocketChannel> channels=new ArrayList<>();
        //accept
        while (true){
            System.out.println("等待连接");
            SocketChannel sc = ssc.accept();//阻塞方法
            System.out.println("连接");
            channels.add(sc);
            for (SocketChannel channel : channels) {
                System.out.println("读取"+channel);
                //接受数据
                channel.read(buffer);
                buffer.flip();
                debugRead(buffer);
                buffer.clear();
                System.out.println("读取完毕"+channel);
            }

        }
    }
}

client

package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

/**
 * @Author: tongck
 * @Date: 2022/8/31 17:20
 */
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",7088));
        sc.write(Charset.defaultCharset().encode("hello"));
        sc.close();
        System.out.println("waiting");
    }
}

非阻塞模式

同时能有多个客户端连接

server

package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;

import static nio.ByteBufferUtil.debugRead;

/**
 * @Author: tongck
 * @Date: 2022/8/31 14:59
 */
public class Server {
    public static void main(String[] args) throws IOException {
        //使用nio来理解阻塞模式,单线程处理
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //创建了服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);//非阻塞模式
//        绑定监听端口
        ssc.bind(new InetSocketAddress(7088));
        //连接集合
        List<SocketChannel> channels=new ArrayList<>();
        //accept
        while (true){
            SocketChannel sc = ssc.accept();//非阻塞,线程还是会继续运行,如果没有建立连接,sc是null
            if(sc!=null){
                sc.configureBlocking(false);
                channels.add(sc);
            }
            for (SocketChannel channel : channels) {
                //接受数据//非阻塞
                int read = channel.read(buffer);
                if (read>0){
                    buffer.flip();
                    debugRead(buffer);
                    buffer.clear();
                }
            }
        }
    }
}

client

package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

/**
 * @Author: tongck
 * @Date: 2022/8/31 17:20
 */
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",7088));
        sc.write(Charset.defaultCharset().encode("hello"));
        sc.close();
        System.out.println("waiting");
    }
}

连接器处理Accept

事件有四种类型

  • accept:客户端发起连接请求是触发
  • connect:客户端在连接建立后触发的事件
  • read:可读事件
  • write:可写事件
package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import static nio.ByteBufferUtil.debugRead;

/**
 * @Author: tongck
 * @Date: 2022/8/31 14:59
 */
public class Server {
    public static void main(String[] args) throws IOException {

        //创建selector,管理多个channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        //建立selector,和chanel的联系
        //事件发生后,可以通过它知道哪个事件和哪个channel的事件
        SelectionKey sscKey = ssc.register(selector, 0, null);
        //key只关注accept事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        System.out.println("注册了key"+sscKey);
        ssc.bind(new InetSocketAddress(7088));
        while (true){
            //select方法是阻塞的,没有事件发生,线程阻塞,有事件,线程才会恢复运行
            //select在事件未处理时,不会阻塞 事件发生后要么处理要么取消,不能不操作
            selector.select();
            //处理事件 selectedKey,内部包含了所有发生的事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            //用迭代器遍历,因为要删除元素,for循环遍历的时候不能删除元素
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                System.out.println("key是"+key);
                ServerSocketChannel channel=(ServerSocketChannel)key.channel();
                //调用accept进行处理,不然再次调用selector.select()方法时,无法知道是否已经处理过这个事假
                //可以调用key.cancel()取消事件,这样能阻塞
                SocketChannel sc = channel.accept();
                System.out.println(sc);
            }

        }
    }
}

处理客户端断开

client

package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

/**
 * @Author: tongck
 * @Date: 2022/8/31 17:20
 */
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",7088));
        sc.write(Charset.defaultCharset().encode("hello"));
        sc.close();
        System.out.println("waiting");
    }
}

服务端

package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import static nio.ByteBufferUtil.debugRead;

/**
 * @Author: tongck
 * @Date: 2022/8/31 14:59
 */
public class Server {
    public static void main(String[] args) throws IOException {

        //创建selector,管理多个channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        //建立selector,和chanel的联系
        //事件发生后,可以通过它知道哪个事件和哪个channel的事件
        SelectionKey sscKey = ssc.register(selector, 0, null);
        //key只关注accept事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        System.out.println("注册了key"+sscKey);
        ssc.bind(new InetSocketAddress(7088));
        while (true){
            //select方法是阻塞的,没有事件发生,线程阻塞,有事件,线程才会恢复运行
            //select在事件未处理时,不会阻塞 事件发生后要么处理要么取消,不能不操作
            selector.select();
            //处理事件 selectedKey,内部包含了所有发生的事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            //用迭代器遍历,因为要删除元素,for循环遍历的时候不能删除元素
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                //处理key时,要从selectedKey中删除,否则下次处理就会有问题
                iter.remove();
                System.out.println("key是"+key);
                //区分事件类型
                if (key.isAcceptable()){//如果是accept事件
                    ServerSocketChannel channel=(ServerSocketChannel)key.channel();
                    //调用accept进行处理,不然再次调用selector.select()方法时,无法知道是否已经处理过这个事假
                    //可以调用key.cancel()取消事件,这样能阻塞
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);//非阻塞模式
                    //建立连接后之后就要写数据了
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);
                    System.out.println(sc);
                }else if (key.isReadable()){//如果是read
                    //拿到触发事件的channel
                    try {
                        SocketChannel channel=(SocketChannel)key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(4);
                        int read = channel.read(buffer);
                        if (read==-1){
                            key.cancel();
                        }else{
                            buffer.flip();
                            debugRead(buffer);
                        }
                        buffer.flip();
                        debugRead(buffer);
                    }catch (IOException e){
                        e.printStackTrace();
                        key.cancel();//客户端断开连接了,因此要将key取消(从selector keys集合中真正删除)
                    }
                }

            }

        }
    }
}

为什么用完key要remove

selector内部会有一个集合

维护了在selector中注册的key

当selector的select()方法发现有一个新的事件,那么就会创建一个新的集合,就是代码中的selector.selectedKeys(),每次有新的事件会在selectedKeys中加入key,但不会主动删除

所以如果不手动remove,在第二次进入循环时,SocketChannel sc = channel.accept();会返回一个null值(之前已经建立过连接了,所以获取不到连接就返回null了),导致最后报错

消息边界问题

以上代码会有消息边界的问题比如客户端发送俩个字,那么第二个字就会分俩批发送(UTF-8编码情况下,一个中文站三个字节)

  • 一种思路是固定消息长度,数据包一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV格式,即Type类型,length长度,value数据,类型长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果过大,影响server吞吐量

    • http1.1是TLV格式
    • Http2.0是LTV格式

ByteBuffer大小分配

每个channel都需要记录可能被切分的消息,因为byteBuffer不是线程安全的,因此需要为每个channel堆维护一个独立的ByteBuffer

ByteBuffer不能太大,比如ByteBuffer1Mb的话,要支持百万连接就要1TB内存,因此需要设计大小可变的ByteBuffer

  • 一种思路是首先分配一个小的buffer,列入4K,如果发现数据不够,再分配8K的buffer,将4kbuffer内容拷贝纸8Kbuffer,优点是消息容易连续处理,缺点书数据拷贝消耗性能
  • 另一种思路是数组组成buffer,一个数组不够,把多的内容写入新数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

处理可写事件

client

package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @Author: tongck
 * @Date: 2022/9/1 14:30
 */
public class WriteClient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8077));
        //接受数据
        int count=0;
        while (true){
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            count+=sc.read(buffer);
            sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}

服务端

package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 * @Author: tongck
 * @Date: 2022/9/1 14:18
 */
public class WriteServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8077));
        while (true){
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
//                 等于   key.channel()
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    //往客户端发送大量数据
                    StringBuilder sb=new StringBuilder();
                    for (int i=0;i<30000000;i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    //实际写入的字节数
                    while (buffer.hasRemaining()){
                        int write = sc.write(buffer);
                        System.out.println(write);
                    }
                }

            }
        }
    }
}

如果这样写的话,很容易导致在数据发送完之前,一直在一个线程徘徊

正确的操作应该是这样

  1. 先写一次,判断是否写完了
  2. 没写完的话,关注一个可写事件
  3. 把没写完的buffer挂到sckey上
  4. 写完之后清理buufer
package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import static nio.ByteBufferUtil.debugRead;

/**
 * @Author: tongck
 * @Date: 2022/8/31 14:59
 */
public class Server {
    public static void main(String[] args) throws IOException {

        //创建selector,管理多个channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        //建立selector,和chanel的联系
        //事件发生后,可以通过它知道哪个事件和哪个channel的事件
        SelectionKey sscKey = ssc.register(selector, 0, null);
        //key只关注accept事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        System.out.println("注册了key"+sscKey);
        ssc.bind(new InetSocketAddress(7088));
        while (true){
            //select方法是阻塞的,没有事件发生,线程阻塞,有事件,线程才会恢复运行
            //select在事件未处理时,不会阻塞 事件发生后要么处理要么取消,不能不操作
            selector.select();
            //处理事件 selectedKey,内部包含了所有发生的事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            //用迭代器遍历,因为要删除元素,for循环遍历的时候不能删除元素
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                //处理key时,要从selectedKey中删除,否则下次处理就会有问题
                iter.remove();
                System.out.println("key是"+key);
                //区分事件类型
                if (key.isAcceptable()){//如果是accept事件
                    ServerSocketChannel channel=(ServerSocketChannel)key.channel();
                    //调用accept进行处理,不然再次调用selector.select()方法时,无法知道是否已经处理过这个事假
                    //可以调用key.cancel()取消事件,这样能阻塞
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);//非阻塞模式
                    //建立连接后之后就要写数据了
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);
                    System.out.println(sc);
                }else if (key.isReadable()){//如果是read
                    //拿到触发事件的channel
                    try {
                        SocketChannel channel=(SocketChannel)key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(4);
                        int read = channel.read(buffer);
                        if (read==-1){
                            key.cancel();
                        }else{
                            buffer.flip();
                            System.out.println(Charset.defaultCharset().decode(buffer).toString());
                        }
                        buffer.flip();
                        debugRead(buffer);
                    }catch (IOException e){
                        e.printStackTrace();
                        key.cancel();//客户端断开连接了,因此要将key取消(从selector keys集合中真正删除)
                    }
                }

            }

        }
    }
}

阻塞和非阻塞区别

非阻塞模式下,相关方法都不会暂停线程

ServerSocketChaneel在没有建立连接时,会返回null,继续运行

SocketChannel.read在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其他SocketChannel的read或是去执行ServerSocketChannel.accept

写数据时,线程只是等待数据写入Channel即可,无需等待Channel通过网络把数据发送出去

但非阻塞模式下,即使没有建立连接,和可读数据,线程仍然在不断运行浪费了cpu

数据复制的过程中,线程实际还是阻塞的(AIO改进的地方)

多路复用

单线程可以配合selector完成对多个Channel可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络IO,普通文件IO没法利用多路复用
  • 如果也有不用Selector的非阻塞模式,线程大部分时间都在做无用功,而Selector能够保证

    • 有可连接采取连接
    • 有可读事件才去读
    • 有可写事件才去写(限于网络传输能力)

select何时不阻塞

事件发生时

selector.wakeup()
调用selector.close()

selector所在线程interrupt

IO模型

NIO VS BIO

stream vs channel

  • stream不会自动缓冲数据,channel
  • stream仅支持阻塞API,channel同时支持非阻塞,阻塞API,网络Channel可以配合selector实现多路复用
  • 二者均为全双工,即读写可同时进行

不同的IO

Unix有以下五种I/O模型

  • 阻塞式I/O
  • 非阻塞式I/O
  • I/O复用(select和poll)
  • 信号驱动式(SIGIO)
  • 异步(AIO)

同步:线程自己去获取结果(一个线程)

异步:线程自己不去获取结果,而是让其他线程获取结果(至少俩个线程)

只有异步IO是异步的,其他都是同步的,如图

异步其实不存在阻塞(这种说法是错误的!异步根本不存在阻塞)

零拷贝

RandomAccessFile file=new RandomAccessFile(file,"r");
byte[]buf=new byte[(int)f.lenth];
file.read(buf);
socket socket=...;
socket.getOutPutStream.write(buf)

这段代码内部是这样工作的

  1. java本身并不具备IO读写能力,因此read方法调用后,要从java查询的用户态切换至内核态,去调用操作系统的读能力,将数据写入缓冲区,这期间用户线程阻塞,操作系统使用DMA(Direct Memory Access)来实现文件读,期间不会使用cpu

DMA也可以理解为硬件单元,用来解放CPU完成文件io

  1. 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(byte []buf),这期间cpu会参与拷贝,无法利用DMA
  2. 调用write方法,这时将数据从用户缓冲区(byte[]buf)写入socket缓冲区,cpu将会参与拷贝
  3. 接下来要向网卡写数据,这项能力java又不具备,又要从用户态切换至内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

可以看到环节多,java io实际上不是物理级别的读写,而是缓存的复制,底层真正的读写是操作系统,用户台与内核态的切换

NIO优化

通过directByteBuffer

ByteBufer.allocate(10):heapByteBuffer使用的还是java内存

ByteBuffer.allocateDirect(10) :DirectByteBuffer使用的是操作系统内存

大部分步骤与优化前相同,唯有一点,可以使用directBytebuffer将堆外内存映射到JVM内存中来

  • 这块内存不受jvm垃圾回收影响,内存地址固定,有助于IO读写
  • java中的DirectByteBuffer维护了内存的虚引用,内存回收分俩部

    • DirectByteBuf对象被垃圾回收,将虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态切换的次数没有减少

进一步优化

底层采用了linux2.1后提供的sendFile方法

  1. java调用transferto方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
  2. 数据从内核缓冲区传输到Socket缓冲区,CPU会参与拷贝
  3. 最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

可以看到

  • 只发生了一次用户态内核态的切换
  • 数据拷贝了三次

再进一步优化

linux2.4

  1. java调用transferto方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
  2. 将一些offset和length信息拷贝到socket缓冲区,几乎无消耗
  3. 使用DMA将内核缓冲区数据写入网卡,不使用CPU

这个过程只发生了一次用户态与内核态的切换,数据拷贝了俩次.所谓的零拷贝,并不是真正的无拷贝,而是不会拷贝重复数据到JVM内存中

java 的transferto,linux的sendFile都可以叫做零拷贝

它的优点有

  • 更少的用户态与内核态的切换
  • 不用cpu计算,减少cpu缓存伪共享
  • 零拷贝适合小文件传输

AIO

AIO也就是NIO2.在java7中引入了NIO的改进版NIO2,它是异步非阻塞的IO模型.异步IO是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,后台处理完成,操作系统会通知相应的线程进行后续的操作

AIO是异步IO的缩写,虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的.对于NIO来说,我们的业务线程是在IO操作本身是同步的除了AIO的其他IO模型都是同步的,

同步意味着,在进行读写操作时,线程需要等待结果,还是相当于限制

异步意味着,在进行读写操作时,线程不必等待结果,而是将来操作系统通过回调的方式由另外的线程来获取的结果

异步模型需要操作系统(kernel)提供支持

  • windows系统通过IOCP实现了真正的异步IO
  • linux异步IO在2.6版本引入,但其实底层还是用多路复用模拟了异步IO,性能没有优势
package aio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannel;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import static nio.ByteBufferUtil.debugAll;

/**
 * @Author: tongck
 * @Date: 2022/9/1 16:52
 */
public class AioFileChannel {
    public static void main(String[] args) throws IOException, InterruptedException {
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);
        //参数1 ByteBuffer
        //参数2 读取的位置
        //参数3 附件
        //参数4 回调对象

        ByteBuffer buffer = ByteBuffer.allocate(160);

        channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override//read 成功
            public void completed(Integer result, ByteBuffer attachment) {
                System.out.println("成功");
                attachment.flip();
                debugAll(attachment);
            }

            @Override//read 失败
            public void failed(Throwable exc, ByteBuffer attachment) {

            }
        });
        System.out.println("方法结束");
        //回调方法用的是守护线程,会随着主线程结束而结束,所以sleep一下
        Thread.sleep(3000);

    }
}
Last modification:September 1, 2022
如果觉得我的文章对你有用,请随意赞赏