ARTS 第 5 周 - Java NIO

Algorithm

这周做了一道 腾讯精选练习 50 题 中的简单题 Nim 游戏,刚开始我在想是否可以用贪心或者动态规划的思想来解,但是想了半天都没有抽取出子问题,然后我从 5 开始寻找规律,发现 5 到 12 中除了 4 的倍数 8 和 12,先手都是可以获胜的,所以一个简单的结论就是:只要石头总数不能被 4 整除先手就能获胜。仔细想想,这个游戏可以切分成以 4 块石头为一个单位,先手的人无论拿几块(1 到 3 块),最后 1 块都是对方的,所以总数如果是 4 的倍数一定是对方获胜;相反,如果总数不是 4 的倍数,先手的人一定可以通过拿 1 到 3 块使得剩下的石头数量是 4 的倍数,那对方就要输了。

class Solution {
public boolean canWinNim(int n) {
return n % 4 != 0;
}
}

Review

这周想要研究下 Java 的 NIO,读了 Doug Lea 关于使用 Java NIO 实现 Reactor 模式的一个分享 Scalable IO in Java。Doug Lea 最广为人知的身份是 JDK 并发包 java.util.concurrent 的作者,通过阅读他的代码了解他这个人,就像通过阅读一本书了解书的作者一样,有种奇妙的感觉;另外他也是《Java Concurrency in Practice》这本书的作者之一。Doug Lea 在他的 PPT 中,先从传统 Thread Per Request 模型讲起,引入 Reactor 模式,还讲到将一个大的任务切分成多个小任务,提高并发度的思想,这是典型的分治(Divide and Conquer)思想;然后逐渐展开 Reactor 模式的各种改进版本,让人清晰地看到变化的过程,以及每次改进要解决的问题,强烈推荐这个 PPT。

Tip

关于技术问题,英文资料往往质量更高,也更「第一手」,使用 Google 搜到满意资料的一个关键是:选择精准的英文关键字,例如:搜入门教程可以 tutorial,搜最佳实践用 best practice,搜某个东西的原理用 mechanism。那么,如何积累这样的关键词呢,也许只能通过多看英文资料和书籍了,因此 ARTS 中 R 的重要性就突显出来了。

Share

不熟悉 Java NIO 的可以先看这个教程 Java NIO Tutorial,下面直接进入到 Reactor 模式,下面这张 Doug Lea 的 PPT 中的示意图描述了 Reactor 模式的一种常见实现:服务端 Reactor 接收到客户端的连接请求后,交由 Acceptor 创建 socket 连接、设置要监听的事件及相应的回调,Reactor 监听到 socket 的事件后在线程池中触发回调执行。

Reactor 模式示意图(来自 Doug Lea 的 PPT)

我参考 Doug Lea 的 PPT 中的代码实现了一个基于 Reactor 模式的 Echo 服务,可以在本地运行,然后使用 telnet 127.0.0.1 4869 连接上去进行测试。需要注意 Handler 的 run 方法中的这行代码:key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));,这是暂停监听 socket 上的可读事件,否则从 socket 读取数据之前 Reactor 会不断触发这个事件,导致 socket 上注册的回调被不断执行。处理完可读事件(读完数据)后需要恢复监听 socket 上的可读事件,之后需要使用 key.selector().wakeup(); 唤醒一下 Reactor,否则如果没有其他事件发生,Reactor 会一直阻塞在 selector.select();

// Reactor.java
public class Reactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocket;

public Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor());
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for (SelectionKey key : selectedKeys) {
dispatch(key);
}
selectedKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}

private void dispatch(SelectionKey key) {
Runnable handler = (Runnable) key.attachment();
if (handler != null) {
handler.run();
}
}

private class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel socket = serverSocket.accept();
if (socket != null) {
new Handler(selector, socket);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws IOException {
new Reactor(4869).run();
}
}

// Handler.java
public class Handler implements Runnable {
private final SocketChannel socket;
private final SelectionKey key;
private final ByteBuffer buffer = ByteBuffer.allocate(1024);
private static final ExecutorService threadPool = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024)
);

public Handler(Selector selector, SocketChannel socket) throws IOException {
this.socket = socket;
socket.configureBlocking(false);
key = socket.register(selector, SelectionKey.OP_READ, this);
}

@Override
public void run() {
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
threadPool.submit(this::doEcho);
}

private void doEcho() {
try {
if (socket.read(buffer) < 0) {
socket.close();
return;
}
buffer.flip();
while (buffer.hasRemaining()) {
socket.write(buffer);
}
buffer.clear();
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
key.selector().wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
}

最后,关于 Reactor 模式有一篇学术论文《Reactor: An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events》,对 Reactor 有比较详细的阐释,有兴趣的可以去读读。

0%