NIO

NIO 核心概念及入门

Posted by leone on 2019-01-12

NIO

NIO 核心概念

Java NIO 概述

Java NIO (New IO,Non-Blocking IO)是从Java 1.4版本开始引入的一套新的IO API,可以替代标准的Java IO API。NIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。

随着 JDK 7 的发布,Java对NIO进行了极大的扩展,增强了对文件处理和文件系统特性的支持,以至于我们称他们为 NIO.2。因为 NIO 提供的一些功能,NIO已经成为文件处理中越来越重要的部分。

nio与传统io区别

IO模型 IO NIO
方式 从硬盘到内存 从内存到硬盘
通信 面向流(乡村公路) 面向缓存(多路复用技术)
处理 阻塞IO(多线程) 非阻塞IO(反应堆Reactor)
触发 选择器(轮询机制)

阻塞与同步

  • 1.阻塞(Block)和非租塞(NonBlock):

阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候阻塞:往往需要等待缞冲区中的数据准备好过后才处理其他的事情,否則一直等待在那里。

非阻塞:当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回

  • 2.同步(Synchronization)和异步(Async)的方式:

同步和异步都是基于应用程序私操作系统处理IO事件所采用的方式,比如同步:是应用程序要直接参与IO读写的操作。异步:所有的IO读写交给搡作系统去处理,应用程序只需要等待通知。

同步方式在处理IO事件的时候,必须阻塞在某个方法上靣等待我们的IO事件完成(阻塞IO事件或者通过轮询IO事件的方式).对于异步来说,所有的IO读写都交给了搡作系统。这个时候,我们可以去做其他的事情,并不拓要去完成真正的IO搡作,当搡作完成IO后.会给我们的应用程序一个通知

同步:阻塞到IO事件,阻塞到read成则write。这个时候我们就完全不能做自己的事情,让读写方法加入到线程里面,然后阻塞线程来实现,对线程的性能开销比较大,

面向流与面向缓冲

Java IO和NIO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

缓冲区Buffer

为什么说NIO是基于缓冲区的IO方式呢?因为,当一个链接建立完成后,IO的数据未必会马上到达,为了当数据到达时能够正确完成IO操作,在BIO(阻塞IO)中,等待IO的线程必须被阻塞,以全天候地执行IO操作。为了解决这种IO方式低效的问题,引入了缓冲区的概念,当数据到达时,可以预先被写入缓冲区,再由缓冲区交给线程,因此线程无需阻塞地等待IO。

通道:

当执行:SocketChannel.write(Buffer),便将一个 buffer 写到了一个通道中。如果说缓冲区还好理解,通道相对来说就更加抽象。网上博客难免有写不严谨的地方,容易使初学者感到难以理解。

引用 Java NIO 中权威的说法:通道是 I/O 传输发生时通过的入口,而缓冲区是这些数 据传输的来源或目标。对于离开缓冲区的传输,您想传递出去的数据被置于一个缓冲区,被传送到通道。对于传回缓冲区的传输,一个通道将数据放置在您所提供的缓冲区中。

例如 有一个服务器通道 ServerSocketChannel serverChannel,一个客户端通道 SocketChannel clientChannel;服务器缓冲区:serverBuffer,客户端缓冲区:clientBuffer。

当服务器想向客户端发送数据时,需要调用:clientChannel.write(serverBuffer)。当客户端要读时,调用 clientChannel.read(clientBuffer)

当客户端想向服务器发送数据时,需要调用:serverChannel.write(clientBuffer)。当服务器要读时,调用 serverChannel.read(serverBuffer)

这样,通道和缓冲区的关系似乎更好理解了。在实践中,未必会出现这种双向连接的蠢事(然而这确实存在的,后面的内容还会涉及),但是可以理解为在NIO中:如果想将Data发到目标端,则需要将存储该Data的Buffer,写入到目标端的Channel中,然后再从Channel中读取数据到目标端的Buffer中。

Selector:

通道和缓冲区的机制,使得线程无需阻塞地等待IO事件的就绪,但是总是要有人来监管这些IO事件。这个工作就交给了selector来完成,这就是所谓的同步。

Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便。

要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪,这就是所说的轮询。一旦这个方法返回,线程就可以处理这些事件。

selector 中注册的事件有:OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE

传统IO

使用传统的I/O程序读取文件内容, 并写入到另一个文件(或Socket), 如下程序:
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);
会有较大的性能开销, 主要表现在一下两方面:

    1. 上下文切换(context switch), 此处有4次用户态和内核态的切换
    1. Buffer内存开销, 一个是应用程序buffer, 另一个是系统读取buffer以及socket buffer

NIO

NIO技术省去了将操作系统的read buffer拷贝到程序的buffer, 以及从程序buffer拷贝到socket buffer的步骤, 直接将 read buffer 拷贝到 socket buffer. java 的 FileChannel.transferTo() 方法就是这样的实现, 这个实现是依赖于操作系统底层的sendFile()实现的.
publicvoid transferTo(long position, long count, WritableByteChannel target);
他的底层调用的是系统调用sendFile()方法
sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

Java NIO.2 之Path、Paths 与 Files 的使用

早期的java只提供了一个File类来访问文件系统,但File类的功能比较有限,所提供的方法性能也不高。而且,大多数方法在出错时仅返回失败,并不会提供异常信息。
NIO. 2为了弥补这种不足,引入了Path接口,代表一个平台无关的平台路径,描述了目录结构中文件的位置。Path可以看成是File类的升级版本,实际引用的资源也可以不存在。

1
2
3
4
5
6
7
8
// 在以前IO操作都是这样写的:
import java.io.File;
File file = new File("index.html");

// 但在Java7 中,我们可以这样写:
import java.nio.file.Path;
import java.nio.file.Paths;
Path path = Paths.get("index.html");

同时,NIO.2还提供了Files、Paths工具类,Files包含了大量静态的工具方法来操作文件;Paths则包含了两个返回Path的静态工厂方法。static Path get(String first, String … more) : 用于将多个字符串串连成路径,static Path get(URI uri): 返回指定uri对应的Path路径

java.nio.file.Paths 常用方法

  • String toString() : 返回调用 Path 对象的字符串表示形式

  • boolean startsWith(String path) : 判断是否以 path 路径开始

  • boolean endsWith(String path) : 判断是否以 path 路径结束

  • boolean isAbsolute() : 判断是否是绝对路径

  • Path getParent() :返回Path对象包含整个路径,不包含 Path 对象指定的文件路径

  • Path getRoot() :返回调用 Path 对象的根路径

  • Path getFileName() : 返回与调用 Path 对象关联的文件名

  • Path getName(int idx) : 返回指定索引位置 idx 的路径名称

  • int getNameCount() : 返回Path 根目录后面元素的数量

  • Path toAbsolutePath() : 作为绝对路径返回调用 Path 对象

  • Path resolve(Path p) :合并两个路径,返回合并后的路径对应的Path对象

  • File toFile(): 将Path转化为File类的对象

java.nio.file.Files 常用方法

  • Path copy(Path src, Path dest, CopyOption … how) : 文件的复制

  • Path createDirectory(Path path, FileAttribute<?> … attr) : 创建一个目录

  • Path createFile(Path path, FileAttribute<?> … arr) : 创建一个文件

  • void delete(Path path) : 删除一个文件,如果不存在,执行报错

  • void deleteIfExists(Path path) : Path对应的文件如果存在,执行删除

  • Path move(Path src, Path dest, CopyOption…how) : 将 src 移动到 dest 位置

  • long size(Path path) : 返回 path 指定文件的大小

netty简介

Netty是基于Java NIO的网络应用框架.

Netty是一个NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来使开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。Netty的内部实现时很复杂的,但是Netty提供了简单易用的api从网络处理代码中解耦业务逻辑。Netty是完全基于NIO实现的,所以整个Netty都是异步的。网络应用程序通常需要有较高的可扩展性,无论是Netty还是其他的基于Java NIO的框架,都会提供可扩展性的解决方案。Netty中一个关键组成部分是它的异步特性.

netty官网 http://netty.io/ ,netty包下载地址 https://netty.io/downloads.html

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.33.Final</version>
</dependency>

netty 通信demo

  • SimpleClient.java

客户端启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

/**
* @author Leone
* @since 2018-06-29
**/
@Slf4j
public class SimpleClient {

private String host;

private int port;

public SimpleClient(String host, int port) {
this.host = host;
this.port = port;
}

public void start() throws Exception {

EventLoopGroup eventLoopGroup = null;
try {
// 创建Bootstrap对象用来引导启动客户端
Bootstrap bootstrap = new Bootstrap();
// 创建EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据
eventLoopGroup = new NioEventLoopGroup();
// 创建InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定连接的服务器地址
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new SimpleClientHandler());
}
});
// 调用Bootstrap.connect()来连接服务器
ChannelFuture future = bootstrap.connect().sync();
// 最后关闭EventLoopGroup来释放资源
future.channel().close().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().sync();
}
}
}

public static void main(String[] args) throws Exception {
new SimpleClient("localhost", 8888).start();
}


}
  • SimpleClientHandler.java

客户端回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
* @author Leone
* @since 2018-06-29
**/
@Slf4j
public class SimpleClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

//客户端连接服务器后被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String data = "client say hello";
log.info("客户端连接服务器,开始发送数据: {}", data);
byte[] req = data.getBytes();
ByteBuf message = Unpooled.buffer(req.length);
message.writeBytes(req);
}

// 从服务器接收到数据后调用
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
// 服务端返回消息后
ByteBuf buf = byteBuf;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
log.info("服务端响应的数据为: {}", body);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("client exceptionCaught..");
ctx.close();
}
}
  • SimpleServer.java

服务端启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
* @author Leone
* @since 2018-06-29
**/
@Slf4j
public class SimpleServer {

private int port;

public SimpleServer(int port) {
this.port = port;
}

public void start() throws Exception {
EventLoopGroup eventLoopGroup = null;
try {
// 创建ServerBootstrap实例来引导绑定和启动服务器
ServerBootstrap bootstrap = new ServerBootstrap();
// 创建NioEventLoopGroup对象来处理事件,如接受新连接、接收数据、写数据等等
eventLoopGroup = new NioEventLoopGroup();
// 指定通道类型为NioServerSocketChannel,设置InetSocketAddress让服务器监听某个端口已等待客户端连接。
bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress("localhost", port)
.childHandler(new ChannelInitializer<Channel>() {
// 设置childHandler执行所有的连接请求
@Override
protected void initChannel(Channel channel) {
channel.pipeline().addLast(new SimpleServerHandler());
}
});
// 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
ChannelFuture channelFuture = bootstrap.bind().sync();
log.info("开始监听,端口为:{}", channelFuture.channel().localAddress());
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().sync();
}
}
}


public static void main(String[] args) throws Exception {
new SimpleServer(8888).start();
}


}
  • SimpleServerHandler.java

客户端回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.Date;

/**
* @author Leone
* @since 2018-06-29
**/
@Slf4j
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
log.info("server 收到 client 发送的数据: {}", body);
// 向客户端响应数据
String currentTime = new Date().toString();
log.info("向客户端响应数据: {}", currentTime);
ByteBuf res = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(res);
}


@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("数据读取完毕");
// 刷新后才将数据发送到socketChannel
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

netty中handler的执行顺序

Handler在netty中,无疑占据着非常重要的地位。Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否。一句话,没有它做不到的只有你想不到的。

Netty中的所有handler都实现自ChannelHandler接口。按照输出输出来分,分为ChannelInboundHandler、ChannelOutboundHandler两大类。ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。

Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行。

总结

在使用Handler的过程中,需要注意:

1、ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler。
2、ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。
3、流水线pipeline中outhandler不能放在最后,否则不生效
4、Handler的消费处理放在最后一个处理。