本文共 9951 字,大约阅读时间需要 33 分钟。
作为Java NIO(非阻塞输入输出)体系的核心,选择器(Selector)在网络编程中发挥着重要作用。本文将从基础到应用,深入探讨选择器的工作原理及其实际应用场景。
选择器管理着一个可注册的通道集合的信息及它们的就绪状态。每个通道在注册时需要切换为非阻塞模式,这通常适用于套接字通道,但与FileChannel不兼容。通道可以同时注册到多个选择器上,但每个选择器对应的通道只能注册一次。
选择键(SelectionKey)记录了特定通道与选择器的注册关系。它包含两个比特掩码:一个表示关注的操作(interest集合),另一个表示通道当前已准备好的操作(ready集合)。通过register方法,可以将通道与选择器关联,并指定感兴趣的操作类型。
以下代码展示了通道与选择器的基本注册过程:
Selector selector = Selector.open();channel1.register(selector, SelectionKey.OP_READ);channel2.register(selector, SelectionKey.OP_WRITE);channel3.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);int readyCount = selector.select(10000);
选择键不仅记录了通道与选择器的关系,还包含了操作掩码。键的ready集合是interest集合的子集,表示自上次select操作以来已就绪的操作。键的取消会将其放入已取消键集合,但通道不会立即注销,直到下次操作。
选择器支持多种操作,如读、写、连接和接受等。每个操作的支持取决于具体的通道类型。例如,SocketChannel不支持accept操作。
选择器在多线程环境中表现出色。通道的事件处理可以通过线程池来分配,确保服务的高效性。以下代码展示了如何使用线程池来处理选择器事件:
import java.nio.channels.SocketChannel;import java.util.Queue;import java.util.concurrent.LinkedQueue;import java.util.concurrent.ThreadPoolExecutor;public class SelectSocketsThreadPool extends SelectorDemo { private static final int MAX_THREADS = 5; private ThreadPool pool = new ThreadPool(MAX_THREADS); protected void readDataFromSocket(SelectionKey key) throws Exception { WorkerThread worker = pool.getWorker(); if (worker == null) { return; } worker.serviceChannel(key); } private class ThreadPool { Queue idle = new LinkedQueue<>(); ThreadPool(int poolSize) { for (int i = 0; i < poolSize; i++) { WorkerThread thread = new WorkerThread(this); thread.setName("Worker" + (i + 1)); thread.start(); idle.add(thread); } } WorkerThread getWorker() { WorkerThread worker = null; synchronized (idle) { if (!idle.isEmpty()) { worker = idle.poll(); } } return worker; } void returnWorker(WorkerThread worker) { synchronized (idle) { idle.add(worker); } } } private class WorkerThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate(1024); private ThreadPool pool; private SelectionKey key; WorkerThread(ThreadPool pool) { this.pool = pool; } public synchronized void run() { System.out.println(this.getName() + " is ready"); while (true) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); this.interrupted(); } if (key == null) { continue; } System.out.println(this.getName() + " has been awakened"); try { drainChannel(key); } catch (Exception e) { System.out.println("Caught '" + e + "' closing channel"); try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } key.selector().wakeup(); } key = null; this.pool.returnWorker(this); } } synchronized void serviceChannel(SelectionKey key) { this.key = key; key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); this.notify(); } void drainChannel(SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); while ((count = channel.read(byteBuffer)) > 0) { byteBuffer.flip(); while (byteBuffer.hasRemaining()) { channel.write(byteBuffer); } byteBuffer.clear(); } if (count < 0) { channel.close(); return; } key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.selector().wakeup(); } }} 以下是SelectorServerSocketChannel(服务端)和SelectorSocketChannel(客户端)的通信示例:
import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;public class SelectorServerSocketChannel { public static void main(String[] args) throws Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverSocketChannel.socket(); Selector selector = Selector.open(); serverSocketChannel.configureBlocking(false); serverSocket.bind(new InetSocketAddress("localhost", 1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { selector.select(); Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); it.remove(); if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); System.out.println("Connected: " + channel.socket().getRemoteSocketAddress()); } if (key.isReadable()) { ByteBuffer byteBuffer = ByteBuffer.allocate(512); SocketChannel socketChannel = (SocketChannel) key.channel(); socketChannel.read(byteBuffer); byteBuffer.flip(); System.out.println("server received message: " + getString(byteBuffer)); byteBuffer.clear(); String message = "server sending message " + System.currentTimeMillis(); System.out.println("server sending message: " + message); byteBuffer.put(message.getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); } } } } private static String getString(ByteBuffer buffer) { Charset charset; CharsetDecoder decoder; CharBuffer charBuffer; try { charset = Charset.forName("UTF-8"); decoder = charset.newDecoder(); charBuffer = decoder.decode(buffer.asReadOnlyBuffer()); return charBuffer.toString(); } catch (Exception ex) { ex.printStackTrace(); return ""; } }} import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SocketChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.Iterator;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;public class SelectorSocketChannel { public static void main(String[] args) throws Exception { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Selector selector = Selector.open(); socketChannel.connect(new InetSocketAddress("localhost", 1234)); socketChannel.register(selector, SelectionKey.OP_CONNECT); while (true) { selector.select(); Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); it.remove(); if (key.isConnectable()) { if (socketChannel.isConnectionPending()) { if (socketChannel.finishConnect()) { key.interestOps(SelectionKey.OP_READ); sendMessage(socketChannel); } else { key.cancel(); } } } if (key.isReadable()) { ByteBuffer byteBuffer = ByteBuffer.allocate(512); while (true) { byteBuffer.clear(); int count = socketChannel.read(byteBuffer); if (count > 0) { byteBuffer.flip(); System.out.println("client receive message: " + getString(byteBuffer)); break; } } } } } } private static void sendMessage(SocketChannel socketChannel) throws Exception { String message = "client sending message " + System.currentTimeMillis(); ByteBuffer byteBuffer = ByteBuffer.allocate(512); byteBuffer.clear(); System.out.println("client sending message: " + message); byteBuffer.put(message.getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); } private static String getString(ByteBuffer buffer) { Charset charset; CharsetDecoder decoder; CharBuffer charBuffer; try { charset = Charset.forName("UTF-8"); decoder = charset.newDecoder(); charBuffer = decoder.decode(buffer.asReadOnlyBuffer()); return charBuffer.toString(); } catch (Exception ex) { ex.printStackTrace(); return ""; } }} 选择器在Java NIO体系中扮演着关键角色。通过选择器,开发者可以高效地管理多个通道的I/O操作,提升系统性能。选择器的灵活性和扩展性使其在多线程环境中尤为重要。通过选择键、唤醒机制和线程池的配合,选择器能够处理大量的并发连接和数据流量。本文通过服务端和客户端的示例,展示了选择器在实际应用中的强大能力。
转载地址:http://gkjyz.baihongyu.com/