本文共 6535 字,大约阅读时间需要 21 分钟。
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.TimeUnit;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class NIOSocketServer extends Thread { private static final Logger LOG = LoggerFactory .getLogger(NIOSocketServer.class); private static final String CHARSET = "UTF-8"; private static final int BUFFER_SIZE = 1024; private static final int FAIL_TRY_NUM = 3; private Selector selector; private ServerSocketChannel ssc; private static NIOSocketServer server; /** * 程序入口 * * @param args */ public static void main(String[] args) { server = new NIOSocketServer(); try { // server.setDaemon(true); server.initServer(); server.start(); } catch (Exception e) { // 如果出现异常,则直接关闭客户端 server.stopServer(); System.exit(1); } } @Override public void run() { int failNum = 0; while (true) { try { int select = selector.select(); if (select > 0) { Setkeys = selector.selectedKeys(); Iterator iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { doAcceptable(key); } if (key.isWritable()) { doWriteMessage(key); } if (key.isReadable()) { doReadMessage(key); } if (key.isConnectable()) { doConnectable(key); } iter.remove(); } } } catch (Exception e) { failNum++; if (failNum > FAIL_TRY_NUM) { server.stopServer(); } } } } /** * 初始化服务器端程序,开始监听端口 * * @throws IOException */ private void initServer() throws IOException { selector = Selector.open(); ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(2181)); ssc.register(selector, SelectionKey.OP_ACCEPT); } /** * 停止服务器 * * @throws IOException */ private void stopServer() { try { if (selector != null && selector.isOpen()) { selector.close(); } if (ssc != null && ssc.isOpen()) { ssc.close(); } } catch (IOException e) { LOG.info("关闭服务端失败:" + e.getMessage()); } } /** * 对新的客户端连接进行处理 * * @param key * @throws IOException */ private void doAcceptable(SelectionKey key) throws IOException { ServerSocketChannel tmpSsc = (ServerSocketChannel) key.channel(); SocketChannel ss = tmpSsc.accept(); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } /** * 已连接 * * @param key */ private void doConnectable(SelectionKey key) { LOG.info("connect is ok"); } /** * 写消息到客户端 * * @param key * @throws IOException */ private void doWriteMessage(SelectionKey key) throws Exception { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.wrap("server write msg to client" .getBytes(CHARSET)); while (buffer.hasRemaining()) { sc.write(buffer); } TimeUnit.SECONDS.sleep(1); } /** * @param key * @throws IOException */ private void doReadMessage(SelectionKey key) throws Exception { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE); int read = sc.read(bb); while (read > 0) { bb.flip(); byte[] barr = new byte[bb.limit()]; bb.get(barr); LOG.info("server read msg from client:" + new String(barr, CHARSET)); bb.clear(); read = sc.read(bb); } TimeUnit.SECONDS.sleep(1); }}
import java.io.IOException;import java.io.UnsupportedEncodingException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.TimeUnit;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class NIOSocketClient extends Thread { private static final Logger LOG = LoggerFactory .getLogger(NIOSocketClient.class); private static final String CHARSET = "UTF-8"; private static final int BUFFER_SIZE = 1024; private static final int FAIL_TRY_NUM = 3; private SocketChannel socketChannel; private Selector selector; private static NIOSocketClient client; /** * 程序入口 * * @param args */ public static void main(String[] args) { client = new NIOSocketClient(); try { client.initClient(); client.start(); // client.setDaemon(true); } catch (Exception e) { // 如果出现异常,则直接关闭客户端 client.close(); } } public void run() { int failNum = 0; while (true) { try { writeMessage(); int select = selector.select(); System.out.println(select); if (select > 0) { Setkeys = selector.selectedKeys(); Iterator iter = keys.iterator(); while (iter.hasNext()) { SelectionKey sk = iter.next(); if (sk.isReadable()) { readMessage(sk); } iter.remove(); } } } catch (Exception e) { // 如果出现三次以上的异常,则关闭客户端 failNum++; if (failNum > FAIL_TRY_NUM) { client.close(); System.exit(1); } } } } public void readMessage(SelectionKey sk) throws Exception, UnsupportedEncodingException { SocketChannel curSc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); while (curSc.read(buffer) > 0) { buffer.flip(); LOG.info("read message from server:" + new String(buffer.array(), CHARSET)); buffer.clear(); } TimeUnit.SECONDS.sleep(1); } public void writeMessage() throws Exception { String ss = "client write msg to server"; ByteBuffer buffer = ByteBuffer.wrap(ss.getBytes(CHARSET)); while (buffer.hasRemaining()) { socketChannel.write(buffer); } TimeUnit.SECONDS.sleep(1); } public void initClient() throws IOException, ClosedChannelException { InetSocketAddress addr = new InetSocketAddress(2181); socketChannel = SocketChannel.open(); selector = Selector.open(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); // 连接到server socketChannel.connect(addr); while (!socketChannel.finishConnect()) { LOG.info("check finish connection"); } } /** * 停止客户端 */ private void close() { try { if (selector != null && selector.isOpen()) { selector.close(); } if (socketChannel != null && socketChannel.isOpen()) { socketChannel.close(); } } catch (IOException e) { LOG.info("关闭客户端失败:" + e.getMessage()); } }}
转载地址:http://ubupa.baihongyu.com/