基于JavaNIO实现的群聊和统计在线人数

[啤酒]满怀忧思,不如先干再说

之前的文章介绍了NIO三大核心的基本操作,本章通过经典的聊天室案例基于Java NIO实现群聊和统计在线人数案例,对进一步掌握和加深NIO有不错的帮助,一起来看看怎么实现吧

本文和其他IO内容收录于《简单的IO流》合集中,最近展现量和阅读量巨低,如果觉得不错不要吝啬三连支持一下吧!

基于JavaNIO实现的群聊和统计在线人数

群聊

群聊就是多个客户端【用户】连接上同一服务端,服务端负责将用户发送的消息转发给其他用户的客户端,以此来实现群聊功能。

服务端

实现流程:

  • 创建 ServerSocketChannel 通道,并设置监听端口,设置为非阻塞模式
  • 创建 Selector 选择器,并将通道注册到选择器上,监听 accept 接收事件
  • 选择器的作用就是轮询访问通道,接下来就是不断的等待其他通道连接,如果没有可用通道继续循环轮询
  • 如果有可用通道,就将通道拿到,判断通道是 accept【接收通道】 还是 read【可读通道】
  • 如果是接收通道就认为是第一次连接进来,将其注册到选择器中,提示对应的客户端输入用户名
  • 如果是可读通道,认为是客户端发送了消息,那么就从通道中将数据取出,继续监听该通道,并将消息转发给其他所有的客户端
  • 其实服务端就是一个转发作用,只是转发时不需要给发送消息的客户端转发
public class Server {

    // 已连接用户
    private Map<String,SocketChannel> onlineUsers = new HashMap<>();

    // 开启服务
    public void startServer() throws IOException {
        // 1、创建选择器
        Selector selector = Selector.open();
        // 2、创建ServerSocketChannel通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 3、为通道设置监听
        serverSocketChannel.bind(new InetSocketAddress(9999));
        // 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 4、将通道注册到选择器
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务器启动成功");
        // 5、循环等待通道连接
        for (;;) {
            // 获取channel数量
            int channels = selector.select();
            if(channels == 0) {
                continue;
            }
            // 获取可用的channel
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                // 获取通道
                SelectionKey selectionKey = iterator.next();
                // 移除已经获取的通道
                iterator.remove();
                // 如果是accept状态
                if(selectionKey.isAcceptable()) {
                    acceptOperator(selector,serverSocketChannel);
                }
                // 如果是可读状态
                if(selectionKey.isReadable()) {
                    readOperator(selector,selectionKey);
                }
            }
        }
    }

    // 处理可读状态
    private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException {
        // 1、获取已就绪通道
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        // 2、创建Buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 3、循环读取客户端消息
        int readLength = socketChannel.read(buffer);
        String message = "";
        if (readLength > 0) {
            // 切换为读模式
            buffer.flip();
            // 读取内容
            message += Charset.forName("UTF-8").decode(buffer);
            // 切割消息,前边为用户名,后边为消息
            String[] split = message.split(":");
            // 只有用户名
            if(split.length == 1) {
                // 判断用户名是否存在
                if(onlineUsers.get(split[0]) != null) {
                    socketChannel.write(Charset.forName("UTF-8").encode("该用户名已存在,请重新输入!"));
                }else  {
                    onlineUsers.put(split[0],socketChannel);
                    // 发送给自己
                    socketChannel.write(Charset.forName("UTF-8").encode("您的昵称,"+ split[0] +",通过认证,当前在线人数" + onlineUsers.size() + "人"));
                    message = "欢迎" + split[0] +"加入群聊,当前在线人数" + onlineUsers.size() + "人";
                    // 发送给其他客户端
                    castOtherClient(selector,message,socketChannel);
                }
            }else {
                // 包含名字和消息
                // 广播给其他客户端
                castOtherClient(selector,message,socketChannel);
            }
        }
        // 4、将channel再次注册到选择器
        socketChannel.register(selector,SelectionKey.OP_READ);
    }

    // 广播给其他客户端
    private void castOtherClient(Selector selector, String message, SocketChannel socketChannel) throws IOException {
        // 1、获取所有已经接入的channel
        Set<SelectionKey> selectionKeys = selector.keys();
        // 2、循环向所有channel广播消息
        for (SelectionKey selectionKey : selectionKeys) {
            Channel targetChannel = selectionKey.channel();
            // 剔除自己
            if(targetChannel instanceof SocketChannel && targetChannel != socketChannel) {
                ((SocketChannel) targetChannel).write(Charset.forName("UTF-8").encode(message));
            }
        }

    }

    // 将通道注册进选择器
    private void acceptOperator(Selector selector, ServerSocketChannel serverSocketChannel) throws IOException {
        // 接入状态
        SocketChannel socketChannel = serverSocketChannel.accept();
        // 将channel设置为非阻塞状态
        socketChannel.configureBlocking(false);
        // 将channel注册进选择器,监听可读事件
        socketChannel.register(selector,SelectionKey.OP_READ);
        // 返回客户端信息
        socketChannel.write(Charset.forName("UTF-8").encode("欢迎进入聊天室,请设置你的昵称吧!"));
    }

    public static void main(String[] args) throws IOException {
        Server server = new Server();
        server.startServer();
    }
}

客户端

客户端主要是发送消息和接收别的客户端消息,实现流程如下:

  • 通过 SocketChannel 与服务端创立连接
  • 创建选择器将连接通道注册到选择器上,开启线程接收消息,接收消息的方式和服务端接收方式一样
  • 通过开启键盘输入,输入用户名和发送消息,将消息写到连接通道内
public class Client {

    // 用户名
    private String username = "";
    private Selector selector;

    // 加入群聊
    public void joinChat() throws IOException {
        // 连接服务端
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
        startClient(socketChannel);
        // 发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String message = scanner.nextLine();
            // 判断是否初始化名字
            if("".equals(message) || message.length() == 0) {
                System.out.println("发送内容不能为空!");
                continue;
            }else if("".equals(username)) {
                // 如果没有名字,服务端验证姓名
            }else {
                System.out.println("用户名===》" + username);
                message = username + ":" + message;
            }
            // 发送消息
            socketChannel.write(Charset.forName("UTF-8").encode(message));
        }
    }

    // 开启客户端
    public void startClient(SocketChannel socketChannel) throws IOException{
        // 接收消息
        selector = Selector.open();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        // 创建线程接收消息
        new Thread(() -> {
            try {
                for (;;) {
                    // 获取channel数量
                    int channels = selector.select();
                    if(channels == 0) {
                        continue;
                    }
                    // 获取可用的channel
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        // 获取通道
                        SelectionKey selectionKey = iterator.next();
                        // 移除已经获取的通道
                        iterator.remove();
                        // 如果是可读状态
                        if(selectionKey.isReadable()) {
                            readOperator(selector,selectionKey);
                        }
                    }
                }
            }catch (Exception e) {

            }
        }).start();
    }

    // 接收消息
    private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException {
        // 1、获取已就绪通道
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        // 2、创建Buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 3、循环读取客户端消息
        int readLength = socketChannel.read(buffer);
        String message = "";
        if (readLength > 0) {
            // 切换为读模式
            buffer.flip();
            // 读取内容
            message += Charset.forName("UTF-8").decode(buffer);
            // 如果是注册名字
            if(message.contains("通过认证")) {
                username = message.split(",")[1];
            }
            System.out.println(message);
        }
        // 4、将channel再次注册到选择器
        socketChannel.register(selector,SelectionKey.OP_READ);
    }
}

用户启动客户端

可以有多个客户端,即多个用户加入聊天室,此处你可以搞BClient,CClient等等多个客户端

package com.stt.nio.chat;

import java.io.IOException;

public class AClient {

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.joinChat();
    }
}

执行结果如下方动图,可以实现即时的群聊功能

基于JavaNIO实现的群聊和统计在线人数

单聊怎么实现呢?

上边服务端的代码中使用Map将用户名和其对应的eChannel存储起来了,但聊的话其实就是让服务端将消息转发给对应的Channel即可,说到这,你应该知道怎么实现了吧!