无聊netty写了个redis
首先是redis的服务端,除了访问的进程外还会存储各个进程发上来的一个id方便进行下发
这是速度测试
package org.example.netty.redis;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @Author: suiyi
* @Date: 2023/11/30 20:21
*/
public class NettyServerRedis {
static HashMap<String, String> map = new HashMap<>();
static LinkedBlockingDeque<Action> workQueue = new LinkedBlockingDeque<>();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
try {
consumerWorkQueue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.childOption(ChannelOption.SO_KEEPALIVE, true)
.group(boos, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ByteBuf buffer = Unpooled.copiedBuffer("\0".getBytes());
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, buffer));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new LineEncoder(new LineSeparator("\0"), StandardCharsets.UTF_8));
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// System.out.println(msg);
doAction(msg, ctx.channel());
}
});
}
})
.bind(8000).sync();
}
// id 操作内容 key value
public static void doAction(String action, Channel channel) {
String[] s = action.split(" ");
try {
if (s.length < 3) {
throw new IllegalStateException("命令格式错误");
}
if (s[1].equals(ActionEnum.PUT.getName())) {
if (s.length != 4) {
throw new IllegalStateException("命令格式错误");
}
workQueue.addLast(new Action(ActionEnum.PUT.getValue(), s[0], s[2], s[3], channel));
} else if (s[1].equals(ActionEnum.GET.getName())) {
workQueue.addLast(new Action(ActionEnum.GET.getValue(), s[0], s[2], channel));
} else if (s[1].equals(ActionEnum.REMOVE.getName())) {
workQueue.addLast(new Action(ActionEnum.REMOVE.getValue(), s[0], s[2], channel));
}
} catch (Exception e) {
channel.writeAndFlush(Long.valueOf(s[0]) + " " + "命令错误!");
}
}
public static void consumerWorkQueue() throws InterruptedException {
while (true) {
String msg = null;
Action action = workQueue.takeLast();
if (action.getType() == ActionEnum.GET.getValue()) {
String value = map.get(action.getKey());
msg = value;
} else if (action.getType() == ActionEnum.PUT.getValue()) {
map.put(action.getKey(), action.getValue());
msg = "1";
} else if (action.getType() == ActionEnum.REMOVE.getValue()) {
msg = map.remove(action.getKey());
// msg = remove ? "1" : "0";
}
action.getChannel().writeAndFlush(action.getId() + " " + msg);
}
}
}
客户端代码,这里顺便写了个速度测试demo,自测能做到10wQPS
package org.example.netty.redis;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.SneakyThrows;
import java.nio.charset.StandardCharsets;
import java.sql.Time;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* @Author: suiyi
* @Date: 2023/12/1 20:52
*/
public class NettyClientRedis2 {
AtomicLong atomicLong = new AtomicLong(0);
Channel channel = null;
EventLoop next;
ConcurrentHashMap<Long, Promise<String>> promiseMap = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
NettyClientRedis2[] netty = new NettyClientRedis2[100];
for (int i = 0; i < 100; i++) {
NettyClientRedis2 nettyClient = new NettyClientRedis2();
nettyClient.start();
netty[i] = nettyClient;
}
ExecutorService executorService = Executors.newFixedThreadPool(33);
CountDownLatch countDownLatch = new CountDownLatch(300000);
Random random = new Random();
long l = System.currentTimeMillis();
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 1000; i++) {
int r1 = random.nextInt(10000000);
int r2 = random.nextInt(10000000);
int r3 = random.nextInt(10000000);
int r4 = random.nextInt(10000000);
int finalJ = j;
executorService.execute(() -> {
netty[finalJ].get(Integer.toString(r3));
countDownLatch.countDown();
});
executorService.execute(() -> {
netty[finalJ].put(Integer.toString(r1), Integer.toString(r2));
countDownLatch.countDown();
});
executorService.execute(() -> {
netty[finalJ].remove(Integer.toString(r4));
countDownLatch.countDown();
});
}
}
countDownLatch.await();
System.out.println(System.currentTimeMillis() - l);
// put("123", "suiyi");
// get("123");
// processUserInput();
}
public void start() throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
next = group.next();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ByteBuf buffer = Unpooled.copiedBuffer("\0".getBytes());
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, buffer));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new LineEncoder(new LineSeparator("\0"), StandardCharsets.UTF_8));
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
analysisCommand(msg);
}
});
}
});
// Channel
channel = bootstrap.connect("127.0.0.1", 8000).sync().channel();
}
public void analysisCommand(String command) {
String[] commands = command.split(" ");
if (commands.length != 2) {
throw new IllegalStateException("命令格式错误");
}
Promise<String> promise = promiseMap.get(Long.valueOf(commands[0]));
try {
promise.setSuccess(commands[1]);
} catch (Exception e) {
e.printStackTrace();
}
}
public void processUserInput() {
while (true) {
Scanner scanner = new Scanner(System.in);
String input = scanner.nextLine(); // 读取一行输入
System.out.println(sendCommand(input));
}
}
public String get(String key) {
return sendCommand("get " + key);
}
public String remove(String key) {
return sendCommand("remove " + key);
}
public void put(String key, String value) {
sendCommand("put " + key + " " + value);
}
@SneakyThrows
private String sendCommand(String str) {
long id = atomicLong.getAndIncrement();
DefaultPromise<String> promise = new DefaultPromise<>(next);
promiseMap.put(id, promise);
channel.writeAndFlush(id + " " + str);
return promise.get();
}
}
然后是枚举类
package org.example.netty.redis;
import lombok.Getter;
/**
* @Author: suiyi
* @Date: 2023/12/1 19:37
*/
@Getter
public enum ActionEnum {
GET("get", 1), PUT("put", 2) ,REMOVE("remove", 3);;
final private String name;
final private int value;
ActionEnum(String name, int value) {
this.name = name;
this.value = value;
}
}
最后是一个Action类,存储用户的动作
package org.example.netty.redis;
import apache.rocketmq.v2.Message;
import io.netty.channel.Channel;
import lombok.Data;
/**
* @Author: suiyi
* @Date: 2023/12/1 19:33
*/
@Data
public class Action {
public Action(int type, String id, String key, String value, Channel channel) {
this.id = Long.valueOf(id);
this.type = type;
this.key = key;
this.value = value;
this.channel = channel;
}
public Action(int type, String id, String key, Channel channel) {
this.id = Long.valueOf(id);
this.type = type;
this.key = key;
this.channel = channel;
}
private long id;
private int type;
private String key;
private String value;
private Channel channel;
}
经过测试30w个,put,remove,get操作
居然用了2465
redis我用了3801
不知道为啥我写的demo比redis快(当然是不带任何操作的)
这里附上对redis进行测试的demo
package org.example.netty.redis;
import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author: suiyi
* @Date: 2023/12/4 21:30
*/
public class TestRedis {
public static void main(String[] args) throws InterruptedException {
Map<String,String>[] netty = new Map[100];
for (int i = 0; i < 100; i++) {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
RedissonClient redissonClient = Redisson.create(config);
RMap<String, String> map = redissonClient.getMap("mapKey");
netty[i] = map;
}
ExecutorService executorService = Executors.newFixedThreadPool(33);
CountDownLatch countDownLatch = new CountDownLatch(300000);
Random random = new Random();
long l = System.currentTimeMillis();
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 1000; i++) {
int r1 = random.nextInt(10000);
int r2 = random.nextInt(10000);
int r3 = random.nextInt(10000);
int r4 = random.nextInt(10000);
int finalJ = j;
executorService.execute(() -> {
netty[finalJ].get(Integer.toString(r3));
countDownLatch.countDown();
});
executorService.execute(() -> {
netty[finalJ].put(Integer.toString(r1), Integer.toString(r2));
countDownLatch.countDown();
});
executorService.execute(() -> {
netty[finalJ].remove(Integer.toString(r4));
countDownLatch.countDown();
});
}
}
countDownLatch.await();
System.out.println(System.currentTimeMillis() - l);
}
}