netty入门
简介
netty是一个异步的,基于事件驱动的网络应用框架,用于快速开发可维护,高性能的网络服务器客户端
netty在网络应用框架中的地位就好比spring在javaEE开发中的地位
以下框架都是用了netty,因为它们都有网络通信需求
- Cassandra-nosql数据库
- spark
- hadoop
- rocketMQ
- ElasticSearch
- gRPC
- Dubbo
- Spring5.0 flux api完全放弃了tomcat,是用netty作为服务器端
- zookeeper
优势
自己使用NIO开发工作量大,bug多,要自己解决tcp传输问题,如黏包半包
epoll空轮询导致100%,对API进行增强,使之更易用
Netty vs Mina
mina由apche维护,3.x版本可能有较大重构,破坏API向下兼容性,Netty的开发迭代更迅速
demo
服务端
package netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
/**
* @Author: suiyi
* @Date: 2022/9/2 9:56
*/
public class HelloServer {
public static void main(String[] args) {
//启动器,负责组装netty组件
new ServerBootstrap()
//加入事件组 包含了线程和选择器
.group(new NioEventLoopGroup())
//选择服务器的ServerSocketChannel实现 NIO
.channel(NioServerSocketChannel.class)
//boss 负责处理连接 worker(child)负责处理读写 决定了worker 能执行哪些操作
.childHandler(
//channel 和客户端进行数据读写的通道 Initializer 初始化器,负责添加别的handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());//将byteBuf转为字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义handler
@Override//读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//打印转换好的字符串
System.out.println(msg);
// super.channelRead(ctx, msg);
}
});
}
//监听端口
}).bind(8077);
}
}
客户端
package netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
/**
* @Author: suiyi
* @Date: 2022/9/2 10:13
*/
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
//启动类
new Bootstrap()
//添加eventLoop
.group(new NioEventLoopGroup())
//选择客户端channel事件
.channel(NioSocketChannel.class)
//添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override//在连接建立以后被调用,初始化
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost",8077))
.sync()//同步阻塞方法,知道连接建立后才会继续执行
.channel()//代表的是连接对象
//向访问器发送数据
.writeAndFlush("hello,world!");
}
}
提示
把channel理解为数据通道
把msg理解为在流动的数据,最开始输入是bytebuf但经过pipline的加工,会变成其他类型的对象,最后输出又变成byteBuf
把handler理解为数据的处理工具
- 由多个工序,合在一起就是pipeline,pipline负责发布事件,传播给每个handler,handler对自己感兴趣的事件进行处理
- handler分inbound和outbound俩类(入站,出站)
把eventLoop理解为处理数据的工人
- 工人可以管理多个channel的io操作,一旦工人负责了某个channel就要负责到底(绑定)
- 工人既可以执行io操作,也可以进行任务处理,每位工人都有任务队列,队列里可以对方多个channel待处理任务,任务分为普通,定时任务
- 工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每到工序指定不同的工人
组件
EventLoop
事件循环对象
eventLoop本质是一个单线程执行器(同时维护了一个selector),里面有run方法处理Channel上源源不断的IO事件
继承关系
- 一条线是JUC下ScheduledExecutorService,因此包含了线程池中所有的方法
- 另一条线是继承自netty自己的orderedEventExecutor
定时任务
package netty;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.NettyRuntime;
import java.util.concurrent.TimeUnit;
/**
* @Author: suiyi
* @Date: 2022/9/2 11:09
*/
public class TestEventGroup {
public static void main(String[] args) {
//创建时间循环组
//这个类可以处理io事件,普通任务,定时任务
NioEventLoopGroup group = new NioEventLoopGroup(2);//默认是电脑核心数*2
// DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup();//普通任务 定时任务
//获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
//执行普通任务
// group.next().execute(()->{
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("ok");
// });
//执行定时任务
group.next().scheduleAtFixedRate(()->{
System.out.println("ok");
},0,1, TimeUnit.SECONDS);
//从0秒后开始执行,每秒执行一次
}
}
io任务
服务器端
package netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.Charset;
/**
* @Author: suiyi
* @Date: 2022/9/2 13:28
*/
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
//boss 和w worker
//boss只负责 serverSocketChannel 的 accept事件 worker只负责SocketChannel上的读写
.group(new NioEventLoopGroup(),new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override //bytebuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.print(Thread.currentThread());//只会用同一个线程处理当前请求
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8077);
}
}
客户端
package netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
/**
* @Author: suiyi
* @Date: 2022/9/2 10:13
*/
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
//启动类
Channel channel = new Bootstrap()
//添加eventLoop
.group(new NioEventLoopGroup())
//选择客户端channel事件
.channel(NioSocketChannel.class)
//添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override//在连接建立以后被调用,初始化
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8077))
.sync()//同步阻塞方法,知道连接建立后才会继续执行
.channel();//代表的是连接对象
//如果断点发送的话,要右键断点选择thread不然会阻塞nio的线程,导致无法正常发送数据
channel.writeAndFlush("qweqwe123asdqwezcQASDQWE");
channel.writeAndFlush("1111111111");
}
}
一旦建立连接,该连接的所有请求都会让一个eventloop进行处理
进一步细分指责,把handler的执行权交给不同的group
package netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.Charset;
/**
* @Author: suiyi
* @Date: 2022/9/2 13:28
*/
public class EventLoopServer {
public static void main(String[] args) {
//细分2 创建一个独立的eventGroup
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
//boss 和w worker
//boss只负责 serverSocketChannel 的 accept事件 worker只负责SocketChannel上的读写
.group(new NioEventLoopGroup(),new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
@Override //bytebuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.print(Thread.currentThread());//只会用同一个线程处理当前请求
System.out.println(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg);//消息传给下一个handler
}
}).addLast(group,"handler2",new ChannelInboundHandlerAdapter(){
@Override //bytebuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.print(Thread.currentThread());//只会用同一个线程处理当前请求
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8077);
}
}
根据指责不同去进一步的分工.可以让单独的group处理对应的代码
源码
如果俩个handler绑定的是同一个线程,那么就直接调用,否则把要调用的代码封装成一个任务对象,由下一个handler的线程来调用
Channel
- calse()关闭channel
claseFuture()处理channel的关闭
- sync方法是同步等待channel关闭
- addListener方法是异步等待channel关闭
- pipeline()方法添加处理器
- write()方法将数据写入
- writeAndFlush()方法将数据写入并刷出
channelfuture
在客户端demo中.connect是一个异步非阻塞方法
异步处理
package netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
/**
* @Author: suiyi
* @Date: 2022/9/2 10:13
*/
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
//带有future,promise的类型都是和异步方法配套使用的,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
//添加eventLoop
.group(new NioEventLoopGroup())
//选择客户端channel事件
.channel(NioSocketChannel.class)
//添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override//在连接建立以后被调用,初始化
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
//异步非阻塞方法 main发起了调用,真正执行connect是nio线程
.connect(new InetSocketAddress("localhost", 8077));
//同步阻塞方法,直到连接建立后才会继续执行,如果不执行这个方法
// ,获取到的channel就是还没准备好的channel,无法发送数据
/// channelFuture.sync();
//代表的是连接对象
channelFuture.addListener(new ChannelFutureListener() {
@Override//在nio线程连接建立之后,会调用这个方法
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
channel.writeAndFlush("hi");
channel.writeAndFlush("hi hi");
}
});
}
}
处理关闭,实现发送q实现关闭连接
package netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import javax.sound.sampled.Line;
import java.net.InetSocketAddress;
import java.util.Scanner;
/**
* @Author: suiyi
* @Date: 2022/9/2 10:13
*/
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override//在连接建立以后被调用,初始化
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8077));
Channel channel = channelFuture.sync().channel();
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true){
String line = scanner.nextLine();
if ("q".equals(line)){
channel.close();//close 也是异步操作
break;
}
channel.writeAndFlush(line);
}
},"input").start();
ChannelFuture closeFuture = channel.closeFuture();
//获取closedFuture对象,它也有俩种方式 1就是同步等待
// System.out.println("正在等待关闭");
// closeFuture.sync();//同步处理,等待关闭后才执行
// System.out.println("处理关闭之后的操作");
//异步回调,可简化为lambda
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("处理关闭之后的操作");
group.shutdownGracefully();//结束进程
}
});
}
}
future&promise
在异步处理时,经常用到这俩个接口
首先要说明的是netty的future与jdk的future同名,但是是俩个接口,netty的future继承自jdk的future,而promise又对netty Future接口进行了拓展
- jdk future只能同步等待任务结束(或许成功,或许失败),才能得到结果
- netty future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都要等任务结束
- netty prowmise 不仅有netty future的功能,而且脱离了任务独立存在,只作为俩个线程间传递结果的容器
jdk自带future
package netty.nio2;
import java.util.concurrent.*;
import static java.util.concurrent.Executors.*;
/**
* @Author: suiyi
* @Date: 2022/9/5 10:16
*/
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//线程池
ExecutorService service= newFixedThreadPool(2);
//提交任务
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 50;
}
});
//自行车通过future来获取结果
System.out.println(future.get());
}
}
netty的Future
package netty.nio2;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
/**
* @Author: suiyi
* @Date: 2022/9/5 13:10
*/
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 70;
}
});
// System.out.println(future.get());
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(future.getNow());
}
});
System.out.println("执行到结尾");
}
}
netty promise
package netty.nio2;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import java.util.concurrent.ExecutionException;
/**
* @Author: suiyi
* @Date: 2022/9/5 13:22
*/
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//准备event
EventLoop eventLoop = new NioEventLoopGroup().next();
//可以主动创建promise,结果容器
DefaultPromise<Integer> promise = new DefaultPromise<Integer>(eventLoop);
new Thread(()->{
try {
// int i=1/0;
promise.setSuccess(80);
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
promise.setFailure(e);
}
}).start();
//在这里如果有异常的话,主线程会拿到这个异常
System.out.println(promise.get());
}
}
Handler&pipline
channelHandler用来处理Channel上的各种事件,分为入站,出站俩种,所有的channelHAndler被连成一串,就是pipeline
- 出站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对回写的结果进行加工
channel是一个产品的加工车间,pipline是车间中的流水线,ChannelHandler就是流水线上的各个工序,而后面要讲的ByteBuf是原材料先经过一道道入站工序,再经过一道道出站工序最终变成产品
package netty.nio2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @Author: suiyi
* @Date: 2022/9/5 14:03
*/
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//通过channel拿到pipeline
ChannelPipeline pipeline = ch.pipeline();
//添加处理器 head-> h1->h2->h3->tail 底层是双向列表
//入栈和出站顺序是反过来的
pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(1);
super.channelRead(ctx, msg);//把数据传递给下个handler,如果不调用,调用链会断开
}
});
pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(2);
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(3);
super.channelRead(ctx, msg);
//分配了一个bytebuffer对象,写入server这个字符串的字节数组
// 只有执行这个操作才会触发出站处理器
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
//如果这里是ctx.write的话,就是从当前位置往前找
}
});
pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(4);
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(5);
super.write(ctx, msg, promise);
}
});
}
}).bind(7088);
}
}
输出结果
1
2
3
5
4
可以看到ChannelInboundHandlerAdapter是按照addLast的顺序执行的,而ChannelOutboundHandlerAdapter是按照addLast的逆序执行的.Channelpipeline实现是一个ChannelHandlerContext(包装了ChannelHandler)组成的双向列表
EmbeddedChannel
netty提供的测试工具类
package netty.nio2;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @Author: suiyi
* @Date: 2022/9/5 16:22
*/
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(1);
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(2);
super.channelRead(ctx, msg);
}
};
//模拟入栈操作,writeInbound,模拟出站操作可以用writeOutbound
EmbeddedChannel channel = new EmbeddedChannel(h1,h2);
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
}
}
ByteBuf
是对字节数据的封装
ByteBufallocator.Default.buffer(10)
创建一个默认的ByteBuf,初始容量是10
package netty.nio2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
/**
* @Author: suiyi
* @Date: 2022/9/6 9:16
*/
public class TestByteBuf {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf);
StringBuilder sb = new StringBuilder();
for (int i=0;i<300;i++){
sb.append("a");
}
ByteBuf byteBuf = buf.writeBytes(sb.toString().getBytes());
System.out.println(byteBuf);
}
}
直接内存vs堆内存
可以用下面的代码来创建池化基于堆的ByteBuf
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
下面代码是池化基于直接内存的ByteBuf
ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer();
- 直接内存创建和销毁的代价高昂,但读写性能高(少一次内存复制),适合配合池化功能一起使用
- 直接内存对GC压力小,因为这部分内存不收JVM垃圾回收的管理,但也要注意及时主动释放
netty默认情况下,都是使用直接内存作为byteBuf的内存
池化vs非池化
池化的概念类似于数据库连接池
池化最大的意义在于重用ByteBuf,优点如下
- 没有池化,则每次创建新的ByteBuf实例,这个操作内存代价昂贵,就算是堆内存,也会增加GC压力
- 有了池化,啧可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提高效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
4.1以后,非android平台默认启用池化,android平台默认非池化
4.1以前池化不成熟,默认是非池化实现
组成
ByteBuf有容量capacity,和最大容量 max capcity,
- 写指针到容量的位置是可写部分
- 读指针到写指针的部分是可读部分
- 读过的数据是废弃部分
- 容量到最大容量之间是可扩容部分
相比于ByteBuffer,他把读写切换为了俩个指针不用来回的切换,还可以支持扩容
内存回收
由于Netty中有堆外内存ByteBuf实现,堆外内存最好是手动来释放,而不是等待GC回收
- UnpooledHeapByteBuf使用的是JVM内存,只需等待GC回收即可
- UnpooledDirectByteBuf使用的是直接内存,需要特殊方法来回收内存
- PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty这里采用了引用计数法来控制回收的内存,每个ByteBuf都实现了ReferenceCounted接口
- 每个ByteBuf对象的初始技术为1
- 调用release方法计数减1,如果技术为0,ByteBuf内存被回收
- 调用retain方法计数+1,表示调用者没调用完之前,其他Handler调用了release也不会造成回收
- 当计数为0时,基层内存会被回收(也看是回到内存池),这时即使ByteBuf对象还在,其各个方法均无法正常使用
因为pipeline的存在,一般需要将ByteBuf传递给下一个ChannelHandler,如果在Fianlly中relase了就失去了传递性(当然如果在这个ChannelHandler内这个ByteBuf已经完成了它的使命,那么便无需传递)
基本规则是,谁是最终使用者,谁负责进行relase
slice
零拷贝的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后ByteBuf并没有发生内存复制.还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针
package netty.nio2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
/**
* @Author: suiyi
* @Date: 2022/9/6 11:13
*/
public class TestSlice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a','b','c','d','e','f','g'});
System.out.println(buf);
//切片过程中没有发生数据复制,用的还是原来的内存
ByteBuf f1 = buf.slice(0, 4);//切片方法执行时,会对最大容量做一个限制
ByteBuf f2 = buf.slice(4, 6);
System.out.println(f1);
System.out.println(f2);
//切片后的内容中不能在写入新的数据
f1.retain();
//释放原有byteBuf内存会影响之前的内存,使用f1.retain();可以添加计数器,防止被回收
//所以调用slice方法时最好对切片后的对象调用一次retain方法
buf.release();
}
}
duplicate
零拷贝的体现之一,就好比截取了原始ByteBuf中所有内容,并且没有maxCapcity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的
copy
会将内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关
compositeBuffer
package netty.nio2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
/**
* @Author: suiyi
* @Date: 2022/9/6 13:01
*/
public class TestCompositeByteBuf {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1,2,3,4,5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[]{6,7,8,9,10});
// ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// buffer.writeBytes(buf1).writeBytes(buf2); //传统方式
CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
buffer.addComponents(true,buf1,buf2);//true代表自动增长写指针
System.out.println(buffer);
}
}
误解
很多人有误区,认为只有在netty,nio这样的多路复用IO模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上
java Socket是全双工的,在任意时刻,线路A到B和B到A的双向信号传输,即便是阻塞IO,读写也是可以同时进行的,只要分别采用读线程和写线程即可