无聊netty写了个redis

首先是redis的服务端,除了访问的进程外还会存储各个进程发上来的一个id方便进行下发
这是速度测试

package org.example.netty.redis;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.LinkedBlockingDeque;


/**
 * @Author: suiyi
 * @Date: 2023/11/30 20:21
 */
public class NettyServerRedis {

    static HashMap<String, String> map = new HashMap<>();
    static LinkedBlockingDeque<Action> workQueue = new LinkedBlockingDeque<>();

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            try {
                consumerWorkQueue();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();


        ServerBootstrap serverBootstrap = new ServerBootstrap();
        NioEventLoopGroup boos = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        serverBootstrap
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .group(boos, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ByteBuf buffer = Unpooled.copiedBuffer("\0".getBytes());
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, buffer));
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new LineEncoder(new LineSeparator("\0"), StandardCharsets.UTF_8));
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) {
//                                System.out.println(msg);
                                doAction(msg, ctx.channel());
                            }
                        });

                    }
                })
                .bind(8000).sync();
    }

    // id 操作内容 key value

    public static void doAction(String action, Channel channel) {
        String[] s = action.split(" ");
        try {
            if (s.length < 3) {
                throw new IllegalStateException("命令格式错误");
            }
            if (s[1].equals(ActionEnum.PUT.getName())) {
                if (s.length != 4) {
                    throw new IllegalStateException("命令格式错误");
                }
                workQueue.addLast(new Action(ActionEnum.PUT.getValue(), s[0], s[2], s[3], channel));
            } else if (s[1].equals(ActionEnum.GET.getName())) {
                workQueue.addLast(new Action(ActionEnum.GET.getValue(), s[0], s[2], channel));
            } else if (s[1].equals(ActionEnum.REMOVE.getName())) {
                workQueue.addLast(new Action(ActionEnum.REMOVE.getValue(), s[0], s[2], channel));
            }
        } catch (Exception e) {
            channel.writeAndFlush(Long.valueOf(s[0]) + " " + "命令错误!");
        }
    }

    public static void consumerWorkQueue() throws InterruptedException {
        while (true) {
            String msg = null;
            Action action = workQueue.takeLast();
            if (action.getType() == ActionEnum.GET.getValue()) {
                String value = map.get(action.getKey());
                msg = value;
            } else if (action.getType() == ActionEnum.PUT.getValue()) {
                map.put(action.getKey(), action.getValue());
                msg = "1";
            } else if (action.getType() == ActionEnum.REMOVE.getValue()) {
                msg = map.remove(action.getKey());
//                msg = remove ? "1" : "0";
            }
            action.getChannel().writeAndFlush(action.getId() + " " + msg);
        }
    }


}

客户端代码,这里顺便写了个速度测试demo,自测能做到10wQPS

package org.example.netty.redis;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.SneakyThrows;

import java.nio.charset.StandardCharsets;
import java.sql.Time;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @Author: suiyi
 * @Date: 2023/12/1 20:52
 */
public class NettyClientRedis2 {
    AtomicLong atomicLong = new AtomicLong(0);
    Channel channel = null;

    EventLoop next;
    ConcurrentHashMap<Long, Promise<String>> promiseMap = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        NettyClientRedis2[] netty = new NettyClientRedis2[100];
        for (int i = 0; i < 100; i++) {
            NettyClientRedis2 nettyClient = new NettyClientRedis2();
            nettyClient.start();
            netty[i] = nettyClient;
        }
        ExecutorService executorService = Executors.newFixedThreadPool(33);
        CountDownLatch countDownLatch = new CountDownLatch(300000);


        Random random = new Random();
        long l = System.currentTimeMillis();
        for (int j = 0; j < 100; j++) {
            for (int i = 0; i < 1000; i++) {
                int r1 = random.nextInt(10000000);
                int r2 = random.nextInt(10000000);
                int r3 = random.nextInt(10000000);
                int r4 = random.nextInt(10000000);
                int finalJ = j;
                executorService.execute(() -> {
                    netty[finalJ].get(Integer.toString(r3));
                    countDownLatch.countDown();
                });
                executorService.execute(() -> {
                    netty[finalJ].put(Integer.toString(r1), Integer.toString(r2));
                    countDownLatch.countDown();
                });
                executorService.execute(() -> {
                    netty[finalJ].remove(Integer.toString(r4));
                    countDownLatch.countDown();
                });

            }
        }
        countDownLatch.await();
        System.out.println(System.currentTimeMillis() - l);


//        put("123", "suiyi");
//        get("123");
//        processUserInput();
    }

    public void start() throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();
        next = group.next();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ByteBuf buffer = Unpooled.copiedBuffer("\0".getBytes());
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, buffer));
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new LineEncoder(new LineSeparator("\0"), StandardCharsets.UTF_8));
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                analysisCommand(msg);
                            }
                        });
                    }
                });

//        Channel
        channel = bootstrap.connect("127.0.0.1", 8000).sync().channel();
    }

    public void analysisCommand(String command) {
        String[] commands = command.split(" ");
        if (commands.length != 2) {
            throw new IllegalStateException("命令格式错误");
        }
        Promise<String> promise = promiseMap.get(Long.valueOf(commands[0]));
        try {
            promise.setSuccess(commands[1]);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void processUserInput() {
        while (true) {
            Scanner scanner = new Scanner(System.in);
            String input = scanner.nextLine(); // 读取一行输入
            System.out.println(sendCommand(input));
        }
    }

    public String get(String key) {
        return sendCommand("get " + key);
    }

    public String remove(String key) {
        return sendCommand("remove " + key);
    }

    public void put(String key, String value) {
        sendCommand("put " + key + " " + value);
    }

    @SneakyThrows
    private String sendCommand(String str) {
        long id = atomicLong.getAndIncrement();
        DefaultPromise<String> promise = new DefaultPromise<>(next);
        promiseMap.put(id, promise);
        channel.writeAndFlush(id + " " + str);
        return promise.get();
    }
}

然后是枚举类

package org.example.netty.redis;

import lombok.Getter;

/**
 * @Author: suiyi
 * @Date: 2023/12/1 19:37
 */
@Getter
public enum ActionEnum {
    GET("get", 1), PUT("put", 2) ,REMOVE("remove", 3);;

    final private String name;
    final private int value;

    ActionEnum(String name, int value) {
        this.name = name;
        this.value = value;
    }
}

最后是一个Action类,存储用户的动作

package org.example.netty.redis;

import apache.rocketmq.v2.Message;
import io.netty.channel.Channel;
import lombok.Data;

/**
 * @Author: suiyi
 * @Date: 2023/12/1 19:33
 */
@Data
public class Action {
    public Action(int type, String id, String key, String value, Channel channel) {
        this.id = Long.valueOf(id);
        this.type = type;
        this.key = key;
        this.value = value;
        this.channel = channel;
    }

    public Action(int type, String id, String key, Channel channel) {
        this.id = Long.valueOf(id);
        this.type = type;
        this.key = key;
        this.channel = channel;
    }

    private long id;
    private int type;
    private String key;
    private String value;
    private Channel channel;
}

经过测试30w个,put,remove,get操作
居然用了2465

redis我用了3801

不知道为啥我写的demo比redis快(当然是不带任何操作的)

这里附上对redis进行测试的demo

package org.example.netty.redis;

import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author: suiyi
 * @Date: 2023/12/4 21:30
 */
public class TestRedis {
    public static void main(String[] args) throws InterruptedException {
        Map<String,String>[] netty = new Map[100];
        for (int i = 0; i < 100; i++) {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://localhost:6379");
            RedissonClient redissonClient = Redisson.create(config);
            RMap<String, String> map = redissonClient.getMap("mapKey");
            netty[i] = map;
        }
        ExecutorService executorService = Executors.newFixedThreadPool(33);
        CountDownLatch countDownLatch = new CountDownLatch(300000);

        Random random = new Random();
        long l = System.currentTimeMillis();
        for (int j = 0; j < 100; j++) {
            for (int i = 0; i < 1000; i++) {
                int r1 = random.nextInt(10000);
                int r2 = random.nextInt(10000);
                int r3 = random.nextInt(10000);
                int r4 = random.nextInt(10000);
                int finalJ = j;
                executorService.execute(() -> {
                    netty[finalJ].get(Integer.toString(r3));
                    countDownLatch.countDown();
                });
                executorService.execute(() -> {
                    netty[finalJ].put(Integer.toString(r1), Integer.toString(r2));
                    countDownLatch.countDown();
                });
                executorService.execute(() -> {
                    netty[finalJ].remove(Integer.toString(r4));
                    countDownLatch.countDown();
                });
            }
        }
        countDownLatch.await();
        System.out.println(System.currentTimeMillis() - l);


    }
}
Last modification:December 8, 2023
如果觉得我的文章对你有用,请随意赞赏