4. Netty初认识--AIO编程
通过上篇博文我们已能发现NIO编程难度确实比同步阻塞BIO大很多,而且之前的NIO并没有考虑"半包读","半包写",如果加上这些,会更加复杂,那为什么NIO使用越来越广泛,它的优点如下:
-
客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。
-
SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信线程就可以处理其他的链路,不需要同步等待这个链路可用。
-
线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程可用同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,一会那次,它非常适合做高性能、高负载的网络服务器。
JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,引人注目的是,java正式提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动I/O对应的AIO; 它不需要通过多路复用器Selector对注册的通道进行轮询操作即可实现异步读写。下面以AIO举例上篇文章的客户端连接服务器获取当前时间。
NIO2.0提供了异步文件通道和异步套接字通道的实现,异步通道提供两种方式后去操作结果:
通过java.util.concurrent.Future类来标识异步操作的结果;
在执行异步操作的时候传入一个java.nio.channels.
CompletionHandler接口的实现类作为操作完成的回调。
服务端
TimeServer
.java
package club.wujingjian.com.wujingjian.aio.server;
public class TimeServer {
public static void main(String[] args) {
int port = 9090;
if (args != null && args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
//创建异步的时间服务器处理类
AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
//启动异步时间服务器处理类的线程
new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
}
}
AsyncTimeServerHandler
.java
package club.wujingjian.com.wujingjian.aio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AsyncTimeServerHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncTimeServerHandler(int port) {
this.port = port;
try {
//创建一个异步服务端通道 AsynchronousServerSocketChannel
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
//调用bind方法绑定监听端口,如果端口合法且没被占用,绑定成功
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
//打印启动成功提示到控制台
System.out.println("The time server is start in port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//CountDownLatch作用是在完成一组正在执行的操作之前,允许当前的线程一直阻塞,在本例中我们让线程在此阻塞,防止服务端执行完成退出
//在实际项目中不需要启动独立线程来处理AsynchronousServerSocketChannel,这里仅仅做个示例展示
latch = new CountDownLatch(1);
doAccept(); //用于接收客户端的连接
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doAccept(){
//AcceptCompletionHandler 用来接收通知消息
asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
}
}
AcceptCompletionHandler
.java
package club.wujingjian.com.wujingjian.aio.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {
@Override
public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
//从attachment获取成员变量AsynchronousServerSocketChannel,然后继续调用它的accept方法.
//当我们调用AsynchronousServerSocketChannel的accept方法后,如果有新的客户端连接接入,系统将回调我们传入的CompletionHandler示例的completed方法,表示新
//的客户端已经接入成功,因为一个AsynchronousSocketChannel可以接收成千上万个客户端,搜易我们这里要继续调用它的accept方法,接收其他的客户端连接,最终形成
//一个循环.每当接收一个客户读连接成功之后,再一步接收新的客户端连接
attachment.asynchronousServerSocketChannel.accept(attachment, this);
//链路建立成功后,服务端需要接收客户端的请求,创建ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
/** AsynchronousSocketChannel的read方法参数解释如下:
* 第一个参数ByteBuffer dst: 接收缓冲区,用于从异步Chanenl中读取数据包;
* 第二个参数A attachemnt: 异步Channel携带的附件,通知回调的时候作为入参使用;
* 第三个参数CompletionHandler: 接收通知回调的业务handler,此处用ReadCompletionHandler
*/
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
ReadCompletionHandler
.java
package club.wujingjian.com.wujingjian.aio.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Date;
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
//将AsynchronousSocketChannel 传入构造参数中,主要用于读取半包消息和发送应答,本示例不对半包读写进行具体说明
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null) {
this.channel = channel;
}
}
@Override
//读取到消息的处理
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();//为后续从缓冲区读取数据做准备
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("The time server receive order : " + req);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(currentTime);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
private void doWrite(String currentTime) {
//对当前时间进行合法性校验
if (currentTime != null && currentTime.trim().length() > 0) {
byte [] bytes = currentTime.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//如果没有发送完成,继续发送
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
//ignore on close
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端
TimeClient
.java
package club.wujingjian.com.wujingjian.aio.client;
public class TimeClient {
public static void main(String[] args) {
int port = 9090;
if (args != null && args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new Thread(new AsyncTimeClientHandler("127.0.0.1",port), "AIO-AsyncTimeClientHandler-001").start();
}
}
AsyncTimeClientHandler
.java
package club.wujingjian.com.wujingjian.aio.client;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler>, Runnable {
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);//创建CountDownLatch进行等待,防止异步操作没有执行完线程就退出
//client.connect第二个参数A attach : AsynchronousSocketChannel的附件,用于回调通知时作为入参被传递,调用者可以自定义
//client.connect第三个参数CompletionHandler: 异步操作回调通知接口,由调用者实现
//本例中我们第二,三个参数都是用了AsyncTimeClientHandler类本身,因为它实现了CompletionHandler接口
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
client.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer,buffer,this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
//ignore on close
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ignore on close
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ignore on close
}
}
}
运行结果Server端
客户端: