IO入门
简介
netty是由jboss提供的一个开源java框架,新增为github上的独立项目
netty是一个异步,基于事件驱动的网络应用框架,用以快速开发高性能,高可靠性的网络io程序
netty主要针对在tcp协议下,面向clients端的高并发应用,或者peertopeer场景下的大量数据传输的应用
netty本质是一个NIO框架,适用于服务通讯相关的多种场景
NIO
non-blocking io 非阻塞io
有时候也会当做new IO
Channel
channel有点类似于stream,它就是读写数据的双向通道,可以从channel,将数据读入buffer,也可以将buffer的数据写入channel,而之前的stream要么是输入,要么是输出,channel要比stream更为底层
常见的channel有
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
buffer则用来缓冲读写数,常见的buffer有
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
selector选择器
多线程:如果每一个客户端连接都新建一个线程,会导致内存占用高,,上下文切换成本高,只适合连接数少的场景
线程池:线程池还可以让线程的创建和回收成本相对较低,有效控制了线程的最大数量,但问题是,阻塞模式下,一个线程只能处理一个socket连接,所以仅适合短连接场景
selector是如何解决这个问题的
selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程屌丝在一个channel上,适合连接数特别多,但低流量的场景
调用selector的select()会阻塞到channel发生了读写就绪时间,这些事件发生,select方法就会返回这些事件交给thread处理
用餐厅举例子
- 传统情况下是来一个客户就新招募一个服务员给对应客户
- 线程池是招募多个服务员,每一个客户对应一个服务员,多的客户只能等待
- selector是,服务员在客户需要的时候,为对应客户提供服务,在其他的时候可以给别的客户提供服务
实例
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
package nio;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* @Author: tongck
* @Date: 2022/8/30 10:37
*/
public class TestByteBuffer {
public static void main(String[] args) {
try {
//准备管道 fileChannel
FileChannel channel = new FileInputStream("1.txt").getChannel();
//准备缓冲区 容量10个字节
ByteBuffer buffer = ByteBuffer.allocate(10);
//从channel读取数据,向buffer写入
while (true){
int len = channel.read(buffer);
//返回值为读到的字节数,如果为-1就说明读到末尾了
if (len==-1){
break;
}
//打印buffer内容
buffer.flip();//切换至读模式
while (buffer.hasRemaining()){//是否还有剩余的数据
byte b = buffer.get();
System.out.print((char)b);
}
buffer.clear();//切换写模式
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
使用步骤
- 向buffer写数据,列入channel.read(buffer)
- 调用flip()切换至读模式
- 从buffer读取数据,列入调用buffer.get()
- 调用clear()活compact切换至写模式
- 重复1-4步
byteBuffer
数据结构
bytebuffer有以下重要属性
- capacity:容量
- position:指针下标
- limit:读写线程
一开始
写模式下,position是写入位置,limit等于容量,下图表示写入了4个字节后的状态
写完开始读取,flip动作发生后,position切换读取位置,limit切换为读取限制
当读取四个字节后
clear发生之后的状态
compact方法是把未读完的部分向前压缩,然后切换至写模式
常见方法
引入测试工具类
package nio;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* @Author: tongck
* @Date: 2022/8/30 10:37
*/
public class TestByteBuffer {
public static void main(String[] args) {
try {
//准备管道 fileChannel
FileChannel channel = new FileInputStream("1.txt").getChannel();
//准备缓冲区 容量10个字节
ByteBuffer buffer = ByteBuffer.allocate(10);
//从channel读取数据,向buffer写入
while (true){
int len = channel.read(buffer);
//返回值为读到的字节数,如果为-1就说明读到末尾了
if (len==-1){
break;
}
//打印buffer内容
buffer.flip();//切换至读模式
while (buffer.hasRemaining()){//是否还有剩余的数据
byte b = buffer.get();
System.out.print((char)b);
}
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试程序
package nio;
import java.nio.ByteBuffer;
import static nio.ByteBufferUtil.*;
/**
* @Author: tongck
* @Date: 2022/8/30 13:12
*/
public class TestByteBufferReadWrite {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte) 0x61); //'a'
debugAll(buffer);
buffer.put(new byte[]{0x62,0x63,0x64});
debugAll(buffer);
buffer.flip();//切换读模式
System.out.print(buffer.get());//输出的时候转成10进制了
debugAll(buffer);
buffer.compact();
debugAll(buffer);
buffer.put(new byte[]{0x65,0x6f});
debugAll(buffer);
}
}
最后输出
+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 00 00 00 00 00 00 00 00 00 |a......... |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [4], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+
97+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [4]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [3], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 64 00 00 00 00 00 00 |bcdd...... |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 65 6f 00 00 00 00 00 |bcdeo..... |
+--------+-------------------------------------------------+----------------+
分配空间
package nio;
import java.nio.ByteBuffer;
/**
* @Author: tongck
* @Date: 2022/8/30 13:37
*/
public class TestByteBufferAllocate {
public static void main(String[] args) {
//这个容量是固定的,超过就会报错
System.out.println(ByteBuffer.allocate(16).getClass());
System.out.println(ByteBuffer.allocateDirect(16).getClass());
/*
* class java.nio.HeapByteBuffer -java堆内存,读写效率较低,受到gc影响
class java.nio.DirectByteBuffer -直接内存,读写效率高,少一次数据的拷贝,分配内存的效率低下,使用不当会造成内存泄漏
* */
}
}
读写数据
写入
- 调用channel的read方法:从channel向buffer写
- 调用buffer的put方法:直接往buff写入字节数组
int readBytes=channel.read(buf)
和
buf.put((byte)127)
读取数据
一样有俩种方法
- channel的write方法
- 调用buffer自己的get方法
int writeBytes=channel.write(buf);
和
byte b= buf.get();
get方法会让position读指针向后走,如果想重复读取数据
可以调用rewind方法将position重新设置为0
调用get(int i)方法读取索引i的内容,它不会移动读指针
package nio;
import java.nio.ByteBuffer;
import static nio.ByteBufferUtil.*;
/**
* @Author: tongck
* @Date: 2022/8/30 13:59
*/
public class TestByteBufferRead {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[]{'a','b','c','d'});
buffer.flip();
//rewind从头开始读
buffer.get(new byte[4]);
debugAll(buffer);
buffer.rewind();
System.out.println((char)buffer.get());
buffer.rewind();
//mark做一个标记,记录position位置,reset是将position重置到mark位置
System.out.println((char)buffer.get());
System.out.println((char)buffer.get());
buffer.mark();//添加标记索引为2的位置
System.out.println((char)buffer.get());
System.out.println((char)buffer.get());
buffer.reset();//重置position到上次mark的位置
System.out.println((char)buffer.get());
System.out.println((char)buffer.get());
//get(i)不会改变读索引的位置
System.out.println((char)buffer.get(3));
}
}
与字符串转换
package nio;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import static nio.ByteBufferUtil.debugAll;
/**
* @Author: tongck
* @Date: 2022/8/31 9:07
*/
public class TestBufferString {
public static void main(String[] args) {
// 1.字符串转byteBuffer
ByteBuffer buffer1 = ByteBuffer.allocate(16);
buffer1.put("hello".getBytes());
debugAll(buffer1);
// 2.charset字符集 这里会自动切换到读模式
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello!");
debugAll(buffer2);
// wrap 也会自动切换到读模式
ByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());
debugAll(buffer3);
// buffer转字符串转buffer1的话会有问题,因为buffer1现在还是写模式,要先flop()2和3就没问题
String str1 = StandardCharsets.UTF_8.decode(buffer2).toString();
System.out.println(str1);
}
}
分散集中读写
拷贝文件的前30字节
package nio;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import static nio.ByteBufferUtil.debugAll;
/**
* @Author: tongck
* @Date: 2022/8/31 9:34
*/
public class TestScatteringReads {
public static void main(String[] args) {
try(FileChannel channel = new RandomAccessFile("1.txt", "r").getChannel()){
ByteBuffer b1 = ByteBuffer.allocate(10);
ByteBuffer b2 = ByteBuffer.allocate(10);
ByteBuffer b3 = ByteBuffer.allocate(10);
channel.read(new ByteBuffer[]{b1,b2,b3});
b1.flip();
b2.flip();
b3.flip();
debugAll(b1);
debugAll(b2);
debugAll(b3);
//rw读写模式
FileChannel channel2 = new RandomAccessFile("2.txt", "rw").getChannel();
channel2.write(new ByteBuffer[]{b1,b2,b3});
}catch (IOException e){
}
}
}
黏包半包
网络上有多条数据发给服务器,数据之间使用n进行分割,但是由于某种原因在接收时,被进行了重新组合
package nio;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import static nio.ByteBufferUtil.debugAll;
/**
* @Author: tongck
* @Date: 2022/8/31 10:49
*/
public class TestByteBufferExam {
public static void main(String[] args) throws FileNotFoundException {
ByteBuffer source = ByteBuffer.allocate(32);
source.put("hello,world\nI'm zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\n".getBytes());
split(source);
}
private static void split(ByteBuffer source){
source.flip();
// compact会把未读的向前移动
for (int i=0;i<source.limit();i++){
//找到完整的消息
if (source.get(i)=='\n'){
int length=i+1-source.position();
//存入新的byteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
//从source 读,向target写
for (int j=0;j<length;j++){
target.put(source.get());
}
debugAll(target);
}
}
source.compact();
}
}
文件编程
fileChannel只能工作在阻塞模式下
获取
不能直接打开fileChannel,必须通过FileInputStream,FileOutPutStream,或者RandomAccessFile来获取FileChannel,它们都有getChannel方法
实例
package nio;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
/**
* @Author: tongck
* @Date: 2022/8/31 11:13
*/
public class TestFileChannelTransferTo {
public static void main(String[] args) {
try (FileChannel from = new FileInputStream("1.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
){
//效率高,底层会用操作系统的零拷贝进行优化
from.transferTo(0,from.size(),to);
}catch (IOException e){
e.printStackTrace();
}
}
}
拷贝超过2g的文件
package nio;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
/**
* @Author: tongck
* @Date: 2022/8/31 11:13
*/
public class TestFileChannelTransferTo {
public static void main(String[] args) {
try (FileChannel from = new FileInputStream("test11.zip").getChannel();
FileChannel to = new FileOutputStream("t2.zip").getChannel();
){
//效率高,底层会用操作系统的零拷贝进行优化,最多2g数据
long size=from.size();
for (long left=size;left>0;){
//虽然写着从头到尾,但其实只会拷贝2g,剩余数据的大小将会返回出来,然后继续下一轮操作
left=left-from.transferTo((size-left),left,to);
}
}catch (IOException e){
e.printStackTrace();
}
}
}
获取文件数量代码
package nio;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: tongck
* @Date: 2022/8/31 13:38
*/
public class TestFilesWalkFileTree {
public static void main(String[] args) throws IOException {
AtomicInteger dirCount = new AtomicInteger();
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(Paths.get("E:\\code\\java\\test11"),new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println("=====>"+dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println("---->"+file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
});
System.out.println("文件"+fileCount);
System.out.println("文件夹"+dirCount);//包含最外层
}
}
删除全部文件
package nio;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
/**
* @Author: tongck
* @Date: 2022/8/31 13:38
*/
public class TestFilesWalkFileTree {
public static void main(String[] args) throws IOException {
Files.walkFileTree(Paths.get("E:\\code\\java\\test11\\1"),new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println("进入文件夹=====>"+dir);
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return super.visitFile(file, attrs);
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
System.out.println("退出文件夹<====="+dir);
return super.postVisitDirectory(dir, exc);
}
});
}
}
复制文件夹
package nio;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
/**
* @Author: tongck
* @Date: 2022/8/31 14:17
*/
public class TestCopy {
public static void main(String[] args) throws IOException {
String source="E:\\code\\java\\test11\\1";
String target="E:\\code\\java\\test11\\2";
Files.walk(Paths.get(source)).forEach(path->{
String targetName = path.toString().replace(source, target);
try {
if (Files.isDirectory(path)){//如果是目录就创建目录
Files.createDirectories(Paths.get(targetName));
}else if (Files.isRegularFile(path)){//如果是文件就拷贝文件
Files.copy(path, Paths.get(targetName));
}
}catch (IOException e){
}
});
}
}
网络编程
阻塞模式
服务端
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import static nio.ByteBufferUtil.debugRead;
/**
* @Author: tongck
* @Date: 2022/8/31 14:59
*/
public class Server {
public static void main(String[] args) throws IOException {
//使用nio来理解阻塞模式,单线程处理
ByteBuffer buffer = ByteBuffer.allocate(16);
//创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 绑定监听端口
ssc.bind(new InetSocketAddress(7088));
//连接集合
List<SocketChannel> channels=new ArrayList<>();
//accept
while (true){
System.out.println("等待连接");
SocketChannel sc = ssc.accept();//阻塞方法
System.out.println("连接");
channels.add(sc);
for (SocketChannel channel : channels) {
System.out.println("读取"+channel);
//接受数据
channel.read(buffer);
buffer.flip();
debugRead(buffer);
buffer.clear();
System.out.println("读取完毕"+channel);
}
}
}
}
client
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
* @Author: tongck
* @Date: 2022/8/31 17:20
*/
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",7088));
sc.write(Charset.defaultCharset().encode("hello"));
sc.close();
System.out.println("waiting");
}
}
非阻塞模式
同时能有多个客户端连接
server
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import static nio.ByteBufferUtil.debugRead;
/**
* @Author: tongck
* @Date: 2022/8/31 14:59
*/
public class Server {
public static void main(String[] args) throws IOException {
//使用nio来理解阻塞模式,单线程处理
ByteBuffer buffer = ByteBuffer.allocate(16);
//创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//非阻塞模式
// 绑定监听端口
ssc.bind(new InetSocketAddress(7088));
//连接集合
List<SocketChannel> channels=new ArrayList<>();
//accept
while (true){
SocketChannel sc = ssc.accept();//非阻塞,线程还是会继续运行,如果没有建立连接,sc是null
if(sc!=null){
sc.configureBlocking(false);
channels.add(sc);
}
for (SocketChannel channel : channels) {
//接受数据//非阻塞
int read = channel.read(buffer);
if (read>0){
buffer.flip();
debugRead(buffer);
buffer.clear();
}
}
}
}
}
client
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
* @Author: tongck
* @Date: 2022/8/31 17:20
*/
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",7088));
sc.write(Charset.defaultCharset().encode("hello"));
sc.close();
System.out.println("waiting");
}
}
连接器处理Accept
事件有四种类型
- accept:客户端发起连接请求是触发
- connect:客户端在连接建立后触发的事件
- read:可读事件
- write:可写事件
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import static nio.ByteBufferUtil.debugRead;
/**
* @Author: tongck
* @Date: 2022/8/31 14:59
*/
public class Server {
public static void main(String[] args) throws IOException {
//创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//建立selector,和chanel的联系
//事件发生后,可以通过它知道哪个事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
System.out.println("注册了key"+sscKey);
ssc.bind(new InetSocketAddress(7088));
while (true){
//select方法是阻塞的,没有事件发生,线程阻塞,有事件,线程才会恢复运行
//select在事件未处理时,不会阻塞 事件发生后要么处理要么取消,不能不操作
selector.select();
//处理事件 selectedKey,内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
//用迭代器遍历,因为要删除元素,for循环遍历的时候不能删除元素
while (iter.hasNext()){
SelectionKey key = iter.next();
System.out.println("key是"+key);
ServerSocketChannel channel=(ServerSocketChannel)key.channel();
//调用accept进行处理,不然再次调用selector.select()方法时,无法知道是否已经处理过这个事假
//可以调用key.cancel()取消事件,这样能阻塞
SocketChannel sc = channel.accept();
System.out.println(sc);
}
}
}
}
处理客户端断开
client
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
* @Author: tongck
* @Date: 2022/8/31 17:20
*/
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",7088));
sc.write(Charset.defaultCharset().encode("hello"));
sc.close();
System.out.println("waiting");
}
}
服务端
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import static nio.ByteBufferUtil.debugRead;
/**
* @Author: tongck
* @Date: 2022/8/31 14:59
*/
public class Server {
public static void main(String[] args) throws IOException {
//创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//建立selector,和chanel的联系
//事件发生后,可以通过它知道哪个事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
System.out.println("注册了key"+sscKey);
ssc.bind(new InetSocketAddress(7088));
while (true){
//select方法是阻塞的,没有事件发生,线程阻塞,有事件,线程才会恢复运行
//select在事件未处理时,不会阻塞 事件发生后要么处理要么取消,不能不操作
selector.select();
//处理事件 selectedKey,内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
//用迭代器遍历,因为要删除元素,for循环遍历的时候不能删除元素
while (iter.hasNext()){
SelectionKey key = iter.next();
//处理key时,要从selectedKey中删除,否则下次处理就会有问题
iter.remove();
System.out.println("key是"+key);
//区分事件类型
if (key.isAcceptable()){//如果是accept事件
ServerSocketChannel channel=(ServerSocketChannel)key.channel();
//调用accept进行处理,不然再次调用selector.select()方法时,无法知道是否已经处理过这个事假
//可以调用key.cancel()取消事件,这样能阻塞
SocketChannel sc = channel.accept();
sc.configureBlocking(false);//非阻塞模式
//建立连接后之后就要写数据了
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
System.out.println(sc);
}else if (key.isReadable()){//如果是read
//拿到触发事件的channel
try {
SocketChannel channel=(SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(4);
int read = channel.read(buffer);
if (read==-1){
key.cancel();
}else{
buffer.flip();
debugRead(buffer);
}
buffer.flip();
debugRead(buffer);
}catch (IOException e){
e.printStackTrace();
key.cancel();//客户端断开连接了,因此要将key取消(从selector keys集合中真正删除)
}
}
}
}
}
}
为什么用完key要remove
selector内部会有一个集合
维护了在selector中注册的key
当selector的select()方法发现有一个新的事件,那么就会创建一个新的集合,就是代码中的selector.selectedKeys()
,每次有新的事件会在selectedKeys中加入key,但不会主动删除
所以如果不手动remove,在第二次进入循环时,SocketChannel sc = channel.accept();会返回一个null值(之前已经建立过连接了,所以获取不到连接就返回null了),导致最后报错
消息边界问题
以上代码会有消息边界的问题比如客户端发送俩个字,那么第二个字就会分俩批发送(UTF-8编码情况下,一个中文站三个字节)
- 一种思路是固定消息长度,数据包一样,服务器按预定长度读取,缺点是浪费带宽
- 另一种思路是按分隔符拆分,缺点是效率低
TLV格式,即Type类型,length长度,value数据,类型长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果过大,影响server吞吐量
- http1.1是TLV格式
- Http2.0是LTV格式
ByteBuffer大小分配
每个channel都需要记录可能被切分的消息,因为byteBuffer不是线程安全的,因此需要为每个channel堆维护一个独立的ByteBuffer
ByteBuffer不能太大,比如ByteBuffer1Mb的话,要支持百万连接就要1TB内存,因此需要设计大小可变的ByteBuffer
- 一种思路是首先分配一个小的buffer,列入4K,如果发现数据不够,再分配8K的buffer,将4kbuffer内容拷贝纸8Kbuffer,优点是消息容易连续处理,缺点书数据拷贝消耗性能
- 另一种思路是数组组成buffer,一个数组不够,把多的内容写入新数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
处理可写事件
client
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @Author: tongck
* @Date: 2022/9/1 14:30
*/
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8077));
//接受数据
int count=0;
while (true){
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count+=sc.read(buffer);
sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}
服务端
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
/**
* @Author: tongck
* @Date: 2022/9/1 14:18
*/
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8077));
while (true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
// 等于 key.channel()
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//往客户端发送大量数据
StringBuilder sb=new StringBuilder();
for (int i=0;i<30000000;i++){
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
//实际写入的字节数
while (buffer.hasRemaining()){
int write = sc.write(buffer);
System.out.println(write);
}
}
}
}
}
}
如果这样写的话,很容易导致在数据发送完之前,一直在一个线程徘徊
正确的操作应该是这样
- 先写一次,判断是否写完了
- 没写完的话,关注一个可写事件
- 把没写完的buffer挂到sckey上
- 写完之后清理buufer
package nio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import static nio.ByteBufferUtil.debugRead;
/**
* @Author: tongck
* @Date: 2022/8/31 14:59
*/
public class Server {
public static void main(String[] args) throws IOException {
//创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//建立selector,和chanel的联系
//事件发生后,可以通过它知道哪个事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
System.out.println("注册了key"+sscKey);
ssc.bind(new InetSocketAddress(7088));
while (true){
//select方法是阻塞的,没有事件发生,线程阻塞,有事件,线程才会恢复运行
//select在事件未处理时,不会阻塞 事件发生后要么处理要么取消,不能不操作
selector.select();
//处理事件 selectedKey,内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
//用迭代器遍历,因为要删除元素,for循环遍历的时候不能删除元素
while (iter.hasNext()){
SelectionKey key = iter.next();
//处理key时,要从selectedKey中删除,否则下次处理就会有问题
iter.remove();
System.out.println("key是"+key);
//区分事件类型
if (key.isAcceptable()){//如果是accept事件
ServerSocketChannel channel=(ServerSocketChannel)key.channel();
//调用accept进行处理,不然再次调用selector.select()方法时,无法知道是否已经处理过这个事假
//可以调用key.cancel()取消事件,这样能阻塞
SocketChannel sc = channel.accept();
sc.configureBlocking(false);//非阻塞模式
//建立连接后之后就要写数据了
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
System.out.println(sc);
}else if (key.isReadable()){//如果是read
//拿到触发事件的channel
try {
SocketChannel channel=(SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(4);
int read = channel.read(buffer);
if (read==-1){
key.cancel();
}else{
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer).toString());
}
buffer.flip();
debugRead(buffer);
}catch (IOException e){
e.printStackTrace();
key.cancel();//客户端断开连接了,因此要将key取消(从selector keys集合中真正删除)
}
}
}
}
}
}
阻塞和非阻塞区别
非阻塞模式下,相关方法都不会暂停线程
ServerSocketChaneel在没有建立连接时,会返回null,继续运行
SocketChannel.read在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其他SocketChannel的read或是去执行ServerSocketChannel.accept
写数据时,线程只是等待数据写入Channel即可,无需等待Channel通过网络把数据发送出去
但非阻塞模式下,即使没有建立连接,和可读数据,线程仍然在不断运行浪费了cpu
数据复制的过程中,线程实际还是阻塞的(AIO改进的地方)
多路复用
单线程可以配合selector完成对多个Channel可读写事件的监控,这称之为多路复用
- 多路复用仅针对网络IO,普通文件IO没法利用多路复用
如果也有不用Selector的非阻塞模式,线程大部分时间都在做无用功,而Selector能够保证
- 有可连接采取连接
- 有可读事件才去读
- 有可写事件才去写(限于网络传输能力)
select何时不阻塞
事件发生时
selector.wakeup()
调用selector.close()
selector所在线程interrupt
IO模型
NIO VS BIO
stream vs channel
- stream不会自动缓冲数据,channel
- stream仅支持阻塞API,channel同时支持非阻塞,阻塞API,网络Channel可以配合selector实现多路复用
- 二者均为全双工,即读写可同时进行
不同的IO
Unix有以下五种I/O模型
- 阻塞式I/O
- 非阻塞式I/O
- I/O复用(select和poll)
- 信号驱动式(SIGIO)
- 异步(AIO)
同步:线程自己去获取结果(一个线程)
异步:线程自己不去获取结果,而是让其他线程获取结果(至少俩个线程)
只有异步IO是异步的,其他都是同步的,如图
异步其实不存在阻塞(这种说法是错误的!异步根本不存在阻塞)
零拷贝
RandomAccessFile file=new RandomAccessFile(file,"r");
byte[]buf=new byte[(int)f.lenth];
file.read(buf);
socket socket=...;
socket.getOutPutStream.write(buf)
这段代码内部是这样工作的
- java本身并不具备IO读写能力,因此read方法调用后,要从java查询的用户态切换至内核态,去调用操作系统的读能力,将数据写入缓冲区,这期间用户线程阻塞,操作系统使用DMA(Direct Memory Access)来实现文件读,期间不会使用cpu
DMA也可以理解为硬件单元,用来解放CPU完成文件io
- 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(byte []buf),这期间cpu会参与拷贝,无法利用DMA
- 调用write方法,这时将数据从用户缓冲区(byte[]buf)写入socket缓冲区,cpu将会参与拷贝
- 接下来要向网卡写数据,这项能力java又不具备,又要从用户态切换至内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,不会使用cpu
可以看到环节多,java io实际上不是物理级别的读写,而是缓存的复制,底层真正的读写是操作系统,用户台与内核态的切换
NIO优化
通过directByteBuffer
ByteBufer.allocate(10):heapByteBuffer使用的还是java内存
ByteBuffer.allocateDirect(10) :DirectByteBuffer使用的是操作系统内存
大部分步骤与优化前相同,唯有一点,可以使用directBytebuffer将堆外内存映射到JVM内存中来
- 这块内存不受jvm垃圾回收影响,内存地址固定,有助于IO读写
java中的DirectByteBuffer维护了内存的虚引用,内存回收分俩部
- DirectByteBuf对象被垃圾回收,将虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态切换的次数没有减少
进一步优化
底层采用了linux2.1后提供的sendFile方法
- java调用transferto方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
- 数据从内核缓冲区传输到Socket缓冲区,CPU会参与拷贝
- 最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu
可以看到
- 只发生了一次用户态内核态的切换
- 数据拷贝了三次
再进一步优化
linux2.4
- java调用transferto方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
- 将一些offset和length信息拷贝到socket缓冲区,几乎无消耗
- 使用DMA将内核缓冲区数据写入网卡,不使用CPU
这个过程只发生了一次用户态与内核态的切换,数据拷贝了俩次.所谓的零拷贝,并不是真正的无拷贝,而是不会拷贝重复数据到JVM内存中
java 的transferto,linux的sendFile都可以叫做零拷贝
它的优点有
- 更少的用户态与内核态的切换
- 不用cpu计算,减少cpu缓存伪共享
- 零拷贝适合小文件传输
AIO
AIO也就是NIO2.在java7中引入了NIO的改进版NIO2,它是异步非阻塞的IO模型.异步IO是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,后台处理完成,操作系统会通知相应的线程进行后续的操作
AIO是异步IO的缩写,虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的.对于NIO来说,我们的业务线程是在IO操作本身是同步的除了AIO的其他IO模型都是同步的,
同步意味着,在进行读写操作时,线程需要等待结果,还是相当于限制
异步意味着,在进行读写操作时,线程不必等待结果,而是将来操作系统通过回调的方式由另外的线程来获取的结果
异步模型需要操作系统(kernel)提供支持
- windows系统通过IOCP实现了真正的异步IO
- linux异步IO在2.6版本引入,但其实底层还是用多路复用模拟了异步IO,性能没有优势
package aio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannel;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import static nio.ByteBufferUtil.debugAll;
/**
* @Author: tongck
* @Date: 2022/9/1 16:52
*/
public class AioFileChannel {
public static void main(String[] args) throws IOException, InterruptedException {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);
//参数1 ByteBuffer
//参数2 读取的位置
//参数3 附件
//参数4 回调对象
ByteBuffer buffer = ByteBuffer.allocate(160);
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override//read 成功
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("成功");
attachment.flip();
debugAll(attachment);
}
@Override//read 失败
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
System.out.println("方法结束");
//回调方法用的是守护线程,会随着主线程结束而结束,所以sleep一下
Thread.sleep(3000);
}
}