博客
关于我
【NIO】Java NIO之选择器
阅读量:436 次
发布时间:2019-03-06

本文共 9951 字,大约阅读时间需要 33 分钟。

选择器基础与应用

作为Java NIO(非阻塞输入输出)体系的核心,选择器(Selector)在网络编程中发挥着重要作用。本文将从基础到应用,深入探讨选择器的工作原理及其实际应用场景。

选择器基础

选择器管理着一个可注册的通道集合的信息及它们的就绪状态。每个通道在注册时需要切换为非阻塞模式,这通常适用于套接字通道,但与FileChannel不兼容。通道可以同时注册到多个选择器上,但每个选择器对应的通道只能注册一次。

选择键(SelectionKey)记录了特定通道与选择器的注册关系。它包含两个比特掩码:一个表示关注的操作(interest集合),另一个表示通道当前已准备好的操作(ready集合)。通过register方法,可以将通道与选择器关联,并指定感兴趣的操作类型。

以下代码展示了通道与选择器的基本注册过程:

Selector selector = Selector.open();channel1.register(selector, SelectionKey.OP_READ);channel2.register(selector, SelectionKey.OP_WRITE);channel3.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);int readyCount = selector.select(10000);

使用选择键

选择键不仅记录了通道与选择器的关系,还包含了操作掩码。键的ready集合是interest集合的子集,表示自上次select操作以来已就绪的操作。键的取消会将其放入已取消键集合,但通道不会立即注销,直到下次操作。

选择器的使用

选择器支持多种操作,如读、写、连接和接受等。每个操作的支持取决于具体的通道类型。例如,SocketChannel不支持accept操作。

多线程场景下的可扩展性

选择器在多线程环境中表现出色。通道的事件处理可以通过线程池来分配,确保服务的高效性。以下代码展示了如何使用线程池来处理选择器事件:

import java.nio.channels.SocketChannel;import java.util.Queue;import java.util.concurrent.LinkedQueue;import java.util.concurrent.ThreadPoolExecutor;public class SelectSocketsThreadPool extends SelectorDemo {    private static final int MAX_THREADS = 5;    private ThreadPool pool = new ThreadPool(MAX_THREADS);    protected void readDataFromSocket(SelectionKey key) throws Exception {        WorkerThread worker = pool.getWorker();        if (worker == null) {            return;        }        worker.serviceChannel(key);    }    private class ThreadPool {        Queue
idle = new LinkedQueue<>(); ThreadPool(int poolSize) { for (int i = 0; i < poolSize; i++) { WorkerThread thread = new WorkerThread(this); thread.setName("Worker" + (i + 1)); thread.start(); idle.add(thread); } } WorkerThread getWorker() { WorkerThread worker = null; synchronized (idle) { if (!idle.isEmpty()) { worker = idle.poll(); } } return worker; } void returnWorker(WorkerThread worker) { synchronized (idle) { idle.add(worker); } } } private class WorkerThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate(1024); private ThreadPool pool; private SelectionKey key; WorkerThread(ThreadPool pool) { this.pool = pool; } public synchronized void run() { System.out.println(this.getName() + " is ready"); while (true) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); this.interrupted(); } if (key == null) { continue; } System.out.println(this.getName() + " has been awakened"); try { drainChannel(key); } catch (Exception e) { System.out.println("Caught '" + e + "' closing channel"); try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } key.selector().wakeup(); } key = null; this.pool.returnWorker(this); } } synchronized void serviceChannel(SelectionKey key) { this.key = key; key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); this.notify(); } void drainChannel(SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); while ((count = channel.read(byteBuffer)) > 0) { byteBuffer.flip(); while (byteBuffer.hasRemaining()) { channel.write(byteBuffer); } byteBuffer.clear(); } if (count < 0) { channel.close(); return; } key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.selector().wakeup(); } }}

客户端与服务端的通信示例

以下是SelectorServerSocketChannel(服务端)和SelectorSocketChannel(客户端)的通信示例:

服务端

import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;public class SelectorServerSocketChannel {    public static void main(String[] args) throws Exception {        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();        ServerSocket serverSocket = serverSocketChannel.socket();        Selector selector = Selector.open();        serverSocketChannel.configureBlocking(false);        serverSocket.bind(new InetSocketAddress("localhost", 1234));        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);        while (true) {            selector.select();            Iterator it = selector.selectedKeys().iterator();            while (it.hasNext()) {                SelectionKey key = (SelectionKey) it.next();                it.remove();                if (key.isAcceptable()) {                    ServerSocketChannel server = (ServerSocketChannel) key.channel();                    SocketChannel channel = server.accept();                    channel.configureBlocking(false);                    channel.register(selector, SelectionKey.OP_READ);                    System.out.println("Connected: " + channel.socket().getRemoteSocketAddress());                }                if (key.isReadable()) {                    ByteBuffer byteBuffer = ByteBuffer.allocate(512);                    SocketChannel socketChannel = (SocketChannel) key.channel();                    socketChannel.read(byteBuffer);                    byteBuffer.flip();                    System.out.println("server received message: " + getString(byteBuffer));                    byteBuffer.clear();                    String message = "server sending message " + System.currentTimeMillis();                    System.out.println("server sending message: " + message);                    byteBuffer.put(message.getBytes());                    byteBuffer.flip();                    socketChannel.write(byteBuffer);                }            }        }    }    private static String getString(ByteBuffer buffer) {        Charset charset;        CharsetDecoder decoder;        CharBuffer charBuffer;        try {            charset = Charset.forName("UTF-8");            decoder = charset.newDecoder();            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());            return charBuffer.toString();        } catch (Exception ex) {            ex.printStackTrace();            return "";        }    }}

客户端

import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SocketChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.Iterator;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;public class SelectorSocketChannel {    public static void main(String[] args) throws Exception {        SocketChannel socketChannel = SocketChannel.open();        socketChannel.configureBlocking(false);        Selector selector = Selector.open();        socketChannel.connect(new InetSocketAddress("localhost", 1234));        socketChannel.register(selector, SelectionKey.OP_CONNECT);        while (true) {            selector.select();            Iterator it = selector.selectedKeys().iterator();            while (it.hasNext()) {                SelectionKey key = (SelectionKey) it.next();                it.remove();                if (key.isConnectable()) {                    if (socketChannel.isConnectionPending()) {                        if (socketChannel.finishConnect()) {                            key.interestOps(SelectionKey.OP_READ);                            sendMessage(socketChannel);                        } else {                            key.cancel();                        }                    }                }                if (key.isReadable()) {                    ByteBuffer byteBuffer = ByteBuffer.allocate(512);                    while (true) {                        byteBuffer.clear();                        int count = socketChannel.read(byteBuffer);                        if (count > 0) {                            byteBuffer.flip();                            System.out.println("client receive message: " + getString(byteBuffer));                            break;                        }                    }                }            }        }    }    private static void sendMessage(SocketChannel socketChannel) throws Exception {        String message = "client sending message " + System.currentTimeMillis();        ByteBuffer byteBuffer = ByteBuffer.allocate(512);        byteBuffer.clear();        System.out.println("client sending message: " + message);        byteBuffer.put(message.getBytes());        byteBuffer.flip();        socketChannel.write(byteBuffer);    }    private static String getString(ByteBuffer buffer) {        Charset charset;        CharsetDecoder decoder;        CharBuffer charBuffer;        try {            charset = Charset.forName("UTF-8");            decoder = charset.newDecoder();            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());            return charBuffer.toString();        } catch (Exception ex) {            ex.printStackTrace();            return "";        }    }}

总结

选择器在Java NIO体系中扮演着关键角色。通过选择器,开发者可以高效地管理多个通道的I/O操作,提升系统性能。选择器的灵活性和扩展性使其在多线程环境中尤为重要。通过选择键、唤醒机制和线程池的配合,选择器能够处理大量的并发连接和数据流量。本文通过服务端和客户端的示例,展示了选择器在实际应用中的强大能力。

转载地址:http://gkjyz.baihongyu.com/

你可能感兴趣的文章
nginx css,js合并插件,淘宝nginx合并js,css插件
查看>>
Nginx gateway集群和动态网关
查看>>
Nginx Location配置总结
查看>>
Nginx log文件写入失败?log文件权限设置问题
查看>>
Nginx Lua install
查看>>
nginx net::ERR_ABORTED 403 (Forbidden)
查看>>
Nginx SSL私有证书自签,且反代80端口
查看>>
Nginx upstream性能优化
查看>>
Nginx 中解决跨域问题
查看>>
nginx 代理解决跨域
查看>>
Nginx 动静分离与负载均衡的实现
查看>>
Nginx 反向代理 MinIO 及 ruoyi-vue-pro 配置 MinIO 详解
查看>>
nginx 反向代理 转发请求时,有时好有时没反应,产生原因及解决
查看>>
Nginx 反向代理解决跨域问题
查看>>
Nginx 反向代理配置去除前缀
查看>>
nginx 后端获取真实ip
查看>>
Nginx 多端口配置和访问异常问题的排查与优化
查看>>
Nginx 如何代理转发传递真实 ip 地址?
查看>>
Nginx 学习总结(16)—— 动静分离、压缩、缓存、黑白名单、性能等内容温习
查看>>
Nginx 学习总结(17)—— 8 个免费开源 Nginx 管理系统,轻松管理 Nginx 站点配置
查看>>