NIO聊天室

BIO(Blocking I/0) 为阻塞IO,NIO(Non-Blocking I/O)为非阻塞IO。

不推荐以BIO构建生产应用,它有以下特点

  1. 阻塞式I/O
  2. 弹性伸缩能力差
  3. 多线程耗资源

针对以上问题,NIO可以完美解决,下面为学习NIO,使用Java 原生API编写NIO聊天室,上生产的话参考封装NIO的Netty框架。

NioServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package ml.yihao;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

/**
* 使用Java NIO编写聊天程序
* 服务端程序代码
* @author zyh
* @create 20-9-24 下午11:25
*/
public class NioServer {

private static final String ENCODING_UTF8 = "UTF-8";
private static final Integer BUFFER_SIZE = 1024;

private static String hostname = "127.0.0.1";
private static Integer port = 8081;

/**
* 启动服务端
*/
public void start() throws IOException {

// 1. 构造注册中心selector
Selector selector = Selector.open();

// 2. 创建ServerSocketChannel 并绑定IP 和Port 设置为非阻塞模式
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(hostname, port));
serverSocketChannel.configureBlocking(false);

// 3. 注册
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

// 4. 调用selector 的select方法检测就绪状态
for (;;){
int select = selector.select();
if(select == 0) {
continue;
}
// 5. 拿到就绪集合
Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> iterator = selectedKeys.iterator();
while(iterator.hasNext()){

SelectionKey selectionKey = iterator.next();
// 6. 根据channel类型处理业务逻辑
if(selectionKey.isAcceptable()){
// 监听
acceptHandler(serverSocketChannel, selector);
}

if(selectionKey.isReadable()){
readHandler(selectionKey , selector);
}

iterator.remove();
}

}
}

/**
* 当监听到accept事件时,需要将socket注册到selector上
* @param selector
* @throws ClosedChannelException
*/
public void acceptHandler(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {

SocketChannel socketChannel = serverSocketChannel.accept();

// 如果不设置这个东西会怎么样?
socketChannel.configureBlocking(false);

socketChannel.register(selector, SelectionKey.OP_READ);

socketChannel.write(Charset.forName(ENCODING_UTF8).encode("欢迎来到zyh搭建的聊天室!"));
}


/**
* 监听到读的事件
*/
public void readHandler(SelectionKey selectionKey, Selector selector) throws IOException {

SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
//socketChannel.read()
ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
String result = "";
while(socketChannel.read(byteBuffer) > 0){

// 切换读模式
byteBuffer.flip();

result += Charset.forName(ENCODING_UTF8).decode(byteBuffer);
}

if(result.length() > 0){
// 广播数据
//System.out.println(":: 读取的数据为" + result);
broadcast(selector, socketChannel, result);
}
// 7. 处理业务 是业务而定是否需要再次注册channel
socketChannel.register(selector, SelectionKey.OP_READ);
}

// 广播数据给所有连接
public void broadcast(Selector selector,SocketChannel sourceChannel, String msg) throws IOException {

Set<SelectionKey> selectionKeys = selector.keys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
Channel targetChannel = selectionKey.channel();

// 如果targetChannel 是SocketChannel类型
// 如果targetChannel 不是发消息的Channel
if(targetChannel instanceof SocketChannel
&& targetChannel != sourceChannel){

((SocketChannel)targetChannel).write(Charset.forName(ENCODING_UTF8).encode(msg));
}

}
}


public static void main(String[] args) throws IOException {
NioServer server = new NioServer();
server.start();
}


}

NioClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package ml.yihao;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Objects;
import java.util.Scanner;
import java.util.Set;

/**
* 使用Java NIO编写聊天程序
* 客户端端程序代码
* @author zyh
* @create 20-9-25 上午0:32
*/
public class NioClient {

private static final String ENCODING_UTF8 = "UTF-8";
private static final Integer BUFFER_SIZE = 1024;

private static String hostname = "127.0.0.1";
private static Integer port = 8081;

public void start() throws IOException {

// 链接服务端
SocketChannel socketChannel = SocketChannel.
open(new InetSocketAddress(hostname, port));

// 单独开个线程接收服务端给的信息
Selector selector = Selector.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
new Thread(new NioClient.ReadThread(selector)).start();

// 向服务端写数据
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){

String line = scanner.next();
socketChannel.write(Charset.forName(ENCODING_UTF8).encode(line));
}

}

public static void main(String[] args) throws IOException {

NioClient nioClient = new NioClient();
nioClient.start();
}

/**
* 客户端读数据的线程
*/
class ReadThread implements Runnable{

private Selector selector;

protected ReadThread(Selector selector){
this.selector = selector;
}

@Override
public void run() {

for (;;){
try {
// 监测 注册中心channel就绪状态
int select = selector.select();
if(select == 0) continue;

// 拿到selectionKey集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();

// 遍历判断channel类型
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();

if(selectionKey.isReadable()){
// 读取客户端中的数据
readHanler(selectionKey, selector);
}

// 不移除会导致set 中的对象堆积
iterator.remove();
}

} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 读取数据
*/
public void readHanler(SelectionKey selectionKey, Selector selector) throws IOException {

ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

String message = null;
while(socketChannel.read(buffer) > 0){

buffer.flip();

message += Charset.forName(ENCODING_UTF8).decode(buffer);
}

if(Objects.nonNull(message) && message.length() > 0){
System.out.println(message);
}

// 重新注册Channel到selector上
socketChannel.register(selector, SelectionKey.OP_READ);
}

}
}