Table of Contents
引言
2018 年接近尾声,rust
团队勉强立住了异步 IO
的 flag,async
成为了关键字,Pin
, Future
, Poll
和 await!
也进入了标准库。不过一直以来实际项目中用不到这套东西,所以也没有主动去了解过。
最近心血来潮想用 rust
写点东西,但并找不到比较能看的文档(可能是因为 rust
发展太快了,很多都过时了),最后参考这篇文章和 "new tokio"
( romio ) 写了几个 demo
,并基于 mio
在 coroutine
中实现了简陋的异步 IO
。
最终实现的 file-server 如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// examples/async-echo.rs
#![feature(async_await)]
#![feature(await_macro)]
#![feature(futures_api)]
#[macro_use]
extern crate log;
use asyncio::executor::{block_on, spawn, TcpListener, TcpStream};
use asyncio::fs_future::{read_to_string};
use failure::Error;
fn main() -> Result<(), Error> {
env_logger::init();
block_on(new_server())?
}
const CRLF: &[char] = &['\r', '\n'];
async fn new_server() -> Result<(), Error> {
let mut listener = TcpListener::bind(&"127.0.0.1:7878".parse()?)?;
info!("Listening on 127.0.0.1:7878");
while let Ok((stream, addr)) = await!(listener.accept()) {
info!("connection from {}", addr);
spawn(handle_stream(stream))?;
}
Ok(())
}
async fn handle_stream(mut stream: TcpStream) -> Result<(), Error> {
await!(stream.write_str("Please enter filename: "))?;
let file_name_vec = await!(stream.read())?;
let file_name = String::from_utf8(file_name_vec)?.trim_matches(CRLF).to_owned();
let file_contents = await!(read_to_string(file_name))?;
await!(stream.write_str(&file_contents))?;
stream.close();
Ok(())
}
写这篇文章的主要目的是梳理和总结,同时也希望能给对这方面有兴趣的 Rustacean
作为参考。本文代码以易于理解为主要编码原则,某些地方并没有太考虑性能,还请见谅;但如果文章和代码中有明显错误,欢迎指正。
本文代码仓库在 Github (部分代码较长,建议 clone
下来用编辑器看),所有 examples
在 nightly-x86_64-apple-darwin 2018 Edition
上均能正常运行。运行 example/async-echo
时设置 RUST_LOG
为 info
可以在 terminal 看到基本的运行信息,debug
则可见事件循环中的事件触发顺序。
异步 IO
的基石 - mio
mio
是一个极简的底层异步 IO
库,如今 rust
生态中几乎所有的异步 IO
程序都基于它。
随着 channel
, timer
等 sub module
在 0.6.5
版本被标为 deprecated
,如今的 mio 提供的唯二两个核心功能分别是:
- 对操作系统异步网络
IO
的封装 - 用户自定义事件队列
第一个核心功能对应到不同操作系统分别是
Linux(Android) => epoll
Windows => iocp
MacOS(iOS), FreeBSD => kqueue
Fuchsia => <unknown>
mio 把这些不同平台上的 API 封装出了一套 epoll like
的异步网络 API,支持 udp 和 tcp
。
除此之外还封装了一些不同平台的拓展 API,比如
uds
,本文不对这些 API 做介绍。
异步网络 IO
下面是一个 tcp
的 demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// examples/tcp.rs
use mio::*;
use mio::net::{TcpListener, TcpStream};
use std::io::{Read, Write, self};
use failure::Error;
use std::time::{Duration, Instant};
const SERVER_ACCEPT: Token = Token(0);
const SERVER: Token = Token(1);
const CLIENT: Token = Token(2);
const SERVER_HELLO: &[u8] = b"PING";
const CLIENT_HELLO: &[u8] = b"PONG";
fn main() -> Result<(), Error> {
let addr = "127.0.0.1:13265".parse()?;
// Setup the server socket
let server = TcpListener::bind(&addr)?;
// Create a poll instance
let poll = Poll::new()?;
// Start listening for incoming connections
poll.register(&server, SERVER_ACCEPT, Ready::readable(),
PollOpt::edge())?;
// Setup the client socket
let mut client = TcpStream::connect(&addr)?;
let mut server_handler = None;
// Register the client
poll.register(&client, CLIENT, Ready::readable() | Ready::writable(),
PollOpt::edge())?;
// Create storage for events
let mut events = Events::with_capacity(1024);
let start = Instant::now();
let timeout = Duration::from_millis(10);
'top: loop {
poll.poll(&mut events, None)?;
for event in events.iter() {
if start.elapsed() >= timeout {
break 'top
}
match event.token() {
SERVER_ACCEPT => {
let (handler, addr) = server.accept()?;
println!("accept from addr: {}", &addr);
poll.register(&handler, SERVER, Ready::readable() | Ready::writable(), PollOpt::edge())?;
server_handler = Some(handler);
}
SERVER => {
if event.readiness().is_writable() {
if let Some(ref mut handler) = &mut server_handler {
match handler.write(SERVER_HELLO) {
Ok(_) => {
println!("server wrote");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
}
if event.readiness().is_readable() {
let mut hello = [0; 4];
if let Some(ref mut handler) = &mut server_handler {
match handler.read_exact(&mut hello) {
Ok(_) => {
assert_eq!(CLIENT_HELLO, &hello);
println!("server received");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
}
}
CLIENT => {
if event.readiness().is_writable() {
match client.write(CLIENT_HELLO) {
Ok(_) => {
println!("client wrote");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
if event.readiness().is_readable() {
let mut hello = [0; 4];
match client.read_exact(&mut hello) {
Ok(_) => {
assert_eq!(SERVER_HELLO, &hello);
println!("client received");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
}
_ => unreachable!(),
}
}
};
Ok(())
}
这个 demo
稍微有点长,接下来我们把它一步步分解。
直接看主循环
1
2
3
4
5
6
7
fn main() {
// ...
loop {
poll.poll(&mut events, None).unwrap();
// ...
}
}
每次循环都得执行 poll.poll
,第一个参数是用来存 events
的 Events
, 容量是 1024
;
1
let mut events = Events::with_capacity(1024);
第二个参数是 timeout
,即一个 Option<Duration>
,超时会直接返回。返回类型是 io::Result<usize>
。
其中的
usize
代表events
的数量,这个返回值是deprecated
并且会在之后的版本移除,仅供参考
这里我们设置了 timeout = None
,所以当这个函数返回时,必然是某些事件被触发了。让我们遍历 events
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
match event.token() {
SERVER_ACCEPT => {
let (handler, addr) = server.accept()?;
println!("accept from addr: {}", &addr);
poll.register(&handler, SERVER, Ready::readable() | Ready::writable(), PollOpt::edge())?;
server_handler = Some(handler);
}
SERVER => {
if event.readiness().is_writable() {
if let Some(ref mut handler) = &mut server_handler {
match handler.write(SERVER_HELLO) {
Ok(_) => {
println!("server wrote");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
}
if event.readiness().is_readable() {
let mut hello = [0; 4];
if let Some(ref mut handler) = &mut server_handler {
match handler.read_exact(&mut hello) {
Ok(_) => {
assert_eq!(CLIENT_HELLO, &hello);
println!("server received");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
}
}
CLIENT => {
if event.readiness().is_writable() {
match client.write(CLIENT_HELLO) {
Ok(_) => {
println!("client wrote");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
if event.readiness().is_readable() {
let mut hello = [0; 4];
match client.read_exact(&mut hello) {
Ok(_) => {
assert_eq!(SERVER_HELLO, &hello);
println!("client received");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
}
_ => unreachable!(),
}
我们匹配每一个 event
的 token
,这里的 token
就是我用来注册的那些 token
。比如我在上面注册了 server
1
2
3
4
// Start listening for incoming connections
poll.register(&server, SERVER_ACCEPT, Ready::readable(),
PollOpt::edge()).unwrap();
第二个参数就是 token
1
const SERVER_ACCEPT: Token = Token(0);
这样当 event.token() == SERVER_ACCEPT
时,就说明这个事件跟我们注册的 server
有关,于是我们试图 accept
一个新的连接并把它注册进 poll
,使用的 token
是 SERVER
。
1
2
3
4
let (handler, addr) = server.accept()?;
println!("accept from addr: {}", &addr);
poll.register(&handler, SERVER, Ready::readable() | Ready::writable(), PollOpt::edge())?;
server_handler = Some(handler);
这样我们之后如果发现 event.token() == SERVER
,我们就认为它和注册的 handler
有关:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if event.readiness().is_writable() {
if let Some(ref mut handler) = &mut server_handler {
match handler.write(SERVER_HELLO) {
Ok(_) => {
println!("server wrote");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
}
if event.readiness().is_readable() {
let mut hello = [0; 4];
if let Some(ref mut handler) = &mut server_handler {
match handler.read_exact(&mut hello) {
Ok(_) => {
assert_eq!(CLIENT_HELLO, &hello);
println!("server received");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
}
这时候我们还需要判断 event.readiness()
,这就是 register
函数的第三个参数,叫做 interest
,顾名思义,就是“感兴趣的事”。它的类型是 Ready
,一共四种,readable, writable, error 和 hup
,可进行并运算。
在上面我们给 handler
注册了 Ready::readable() | Ready::writable()
,所以 event
可能是 readable
也可能是 writable
,所以我们要经过判断来执行相应的逻辑。注意这里的判断是
1
2
3
4
5
6
7
if ... {
...
}
if ... {
...
}
而非
1
2
3
4
5
if ... {
...
} else if ... {
...
}
因为一个事件可能同时是 readable
和 writable
。
容错性原则
大概逻辑先讲到这儿,这里先讲一下 mio
的“容错性原则”,即不能完全相信 event
。
可以看到我上面有一段代码是这么写的
1
2
3
4
5
6
7
match event.token() {
SERVER_ACCEPT => {
let (handler, addr) = server.accept()?;
println!("accept from addr: {}", &addr);
poll.register(&handler, SERVER, Ready::readable() | Ready::writable(), PollOpt::edge())?;
server_handler = Some(handler);
}
server.accept()
返回的是 io::Result<(TcpStream, SocketAddr)>
。如果我们选择完全相信 event
的话,在这里 unwrap()
并没有太大问题 —— 如果真的有一个新的连接就绪,accept()
产生的 io::Result
是我们无法预料且无法处理的,我们应该抛给调用者或者直接 panic
。
但问题就是,我们可以认为 event
的伪消息是可预料的,可能并没有一个新的连接准备就绪,这时候我们 accept()
会引发 WouldBlock Error
。但我们不应该认为 WouldBlock
是一种错误 —— 这是一种友善的提醒。server
告诉我们:“并没有新的连接,请下次再来吧。”,所以在这里我们应该忽略(可以打个 log
)它并重新进入循环。
像我后面写的那样:
1
2
3
4
5
6
7
8
9
match client.write(CLIENT_HELLO) {
Ok(_) => {
println!("client wrote");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
Poll Option
好了,现在我们可以运行:
1
[async-io-demo] cargo run --example tcp
terminal 里打印出了
1
2
3
4
5
6
client wrote
accept from addr: 127.0.0.1:53205
client wrote
server wrote
server received
...
我们可以发现,在短短的 10 millis
内(let timeout = Duration::from_millis(10);
),server
和 client
分别进行了数十次的读写!
如果我们不想进行这么多次读写呢?比如,我们只想让 server
写一次。在网络比较通畅的情况下,client
和 server
几乎一直是可写的,所以 Poll::poll
在数微秒内就返回了。
这时候就要看 register
的第四个参数了。
1
2
3
poll.register(&server, SERVER_ACCEPT, Ready::readable(),
PollOpt::edge()).unwrap();
PollOpt::edge()
的类型是 PollOpt
,一共有 level, edge, oneshot
三种,他们有什么区别呢?
比如在我上面的代码里,
1
2
3
4
5
6
7
8
9
10
11
12
13
if event.readiness().is_readable() {
let mut hello = [0; 4];
match client.read_exact(&mut hello) {
Ok(_) => {
assert_eq!(SERVER_HELLO, &hello);
println!("client received");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
err => {
err?;
}
}
}
我在收到一个 readable readiness
时,只读了四个字节。如果这时候缓冲区里有八字节的数据,那么:
- 如果我注册时使用
PollOpt::level()
,我在下次poll
时 一定 还能收到一次readable readiness event
(只要我没有主动执行set_readiness(Read::empty())
); - 如果我注册时使用
PollOpt::edge()
,我在下次poll
时 不一定 还能收到一次readable readiness event
;
所以,使用 PollOpt::edge()
时有一个“排尽原则(Draining readiness
)”,即每次触发 event
时一定要操作到资源耗尽返回 WouldBlock
,即上面的代码要改成:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if event.readiness().is_readable() {
let mut hello = [0; 4];
loop {
match client.read_exact(&mut hello) {
Ok(_) => {
assert_eq!(SERVER_HELLO, &hello);
println!("client received");
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
err => {
err?;
}
}
}
}
那么,oneshot
又是怎样的行为呢?让我们回到上面的问题,如果我们只想让 handler
写一次,怎么办 —— 注册时使用 PollOpt::oneshot()
,即
1
2
3
4
let (handler, addr) = server.accept()?;
println!("accept from addr: {}", &addr);
poll.register(&handler, SERVER, Ready::readable() | Ready::writable(), PollOpt::oneshot())?;
server_handler = Some(handler);
这样的话,你只能收到一次 SERVER
事件,除非你使用 Poll::reregister
重新注册 handler
。
Poll::reregister
可以更改PollOpt
和interest
Still Block
其实上面这个 demo
还存在一个问题,即我们在回调代码块中使用了同步的 IO
操作 println!
。我们要尽可能避免在回调的代码块里使用耗时的 IO
操作。
考虑到文件 IO
(包括 Stdin, Stdout, Stderr
) 速度很慢,我们只需要把所有的文件 IO
交给一个线程进行即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use std::sync::mpsc::{Sender, Receiver, channel, SendError};
#[derive(Clone)]
pub struct Fs {
task_sender: Sender<Task>,
}
impl Fs {
pub fn new() -> Self {
let (sender, receiver) = channel();
std::thread::spawn(move || {
loop {
match receiver.recv() {
Ok(task) => {
match task {
Task::Println(ref string) => println!("{}", string),
Task::Exit => return
}
},
Err(_) => {
return;
}
}
}
});
Fs { task_sender: sender }
}
pub fn println(&self, string: String) {
self.task_sender.send(Task::Println(string)).unwrap()
}
}
pub enum Task {
Exit,
Println(String),
}
之后,可以使用 Fs::println
替换所有的 println!
。
自定义事件
上面我们实现异步 println
比较简单,这是因为 println
并没有返回值,不需要进行后续操作。设想一下,如果要我们实现 open
和 ready_to_string
,先异步地 open
一个文件,然后异步地 read_to_string
,最后再异步地 println
, 我们要怎么做?
最简单的写法是回调,像这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// src/fs.rs
use crossbeam_channel::{unbounded, Sender};
use std::fs::File;
use std::io::Read;
use std::boxed::FnBox;
use std::thread;
use failure::Error;
#[derive(Clone)]
pub struct Fs {
task_sender: Sender<Task>,
}
pub struct FsHandler {
io_worker: thread::JoinHandle<Result<(), Error>>,
executor: thread::JoinHandle<Result<(), Error>>,
}
pub fn fs_async() -> (Fs, FsHandler) {
let (task_sender, task_receiver) = unbounded();
let (result_sender, result_receiver) = unbounded();
let io_worker = std::thread::spawn(move || {
loop {
match task_receiver.recv() {
Ok(task) => {
match task {
Task::Println(ref string) => println!("{}", string),
Task::Open(path, callback, fs) => {
result_sender
.send(TaskResult::Open(File::open(path)?, callback, fs))?
}
Task::ReadToString(mut file, callback, fs) => {
let mut value = String::new();
file.read_to_string(&mut value)?;
result_sender
.send(TaskResult::ReadToString(value, callback, fs))?
}
Task::Exit => {
result_sender
.send(TaskResult::Exit)?;
break;
}
}
}
Err(_) => {
break;
}
}
}
Ok(())
});
let executor = std::thread::spawn(move || {
loop {
let result = result_receiver.recv()?;
match result {
TaskResult::ReadToString(value, callback, fs) => callback.call_box((value, fs))?,
TaskResult::Open(file, callback, fs) => callback.call_box((file, fs))?,
TaskResult::Exit => break
};
};
Ok(())
});
(Fs { task_sender }, FsHandler { io_worker, executor })
}
impl Fs {
pub fn println(&self, string: String) -> Result<(), Error> {
Ok(self.task_sender.send(Task::Println(string))?)
}
pub fn open<F>(&self, path: &str, callback: F) -> Result<(), Error>
where F: FnOnce(File, Fs) -> Result<(), Error> + Sync + Send + 'static {
Ok(self.task_sender.send(Task::Open(path.to_string(), Box::new(callback), self.clone()))?)
}
pub fn read_to_string<F>(&self, file: File, callback: F) -> Result<(), Error>
where F: FnOnce(String, Fs) -> Result<(), Error> + Sync + Send + 'static {
Ok(self.task_sender.send(Task::ReadToString(file, Box::new(callback), self.clone()))?)
}
pub fn close(&self) -> Result<(), Error> {
Ok(self.task_sender.send(Task::Exit)?)
}
}
impl FsHandler {
pub fn join(self) -> Result<(), Error> {
self.io_worker.join().unwrap()?;
self.executor.join().unwrap()
}
}
type FileCallback = Box<FnBox(File, Fs) -> Result<(), Error> + Sync + Send>;
type StringCallback = Box<FnBox(String, Fs) -> Result<(), Error> + Sync + Send>;
pub enum Task {
Exit,
Println(String),
Open(String, FileCallback, Fs),
ReadToString(File, StringCallback, Fs),
}
pub enum TaskResult {
Exit,
Open(File, FileCallback, Fs),
ReadToString(String, StringCallback, Fs),
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// examples/fs.rs
use asyncio::fs::fs_async;
use failure::Error;
const TEST_FILE_VALUE: &str = "Hello, World!";
fn main() -> Result<(), Error> {
let (fs, fs_handler) = fs_async();
fs.open("./examples/test.txt", |file, fs| {
fs.read_to_string(file, |value, fs| {
assert_eq!(TEST_FILE_VALUE, &value);
fs.println(value)?;
fs.close()
})
})?;
fs_handler.join()?;
Ok(())
}
测试
1
[async-io-demo] cargo run --example fs
这样写在逻辑上的确是对的,但是负责跑 callback
的 executor
线程其实被负责 io
的线程阻塞住了(result_receiver.recv()
)。那我们能不能在 executor
线程里跑一个事件循环,以达到不被 io
线程阻塞的目的呢?(即确定 result_receiver
中有 result
时,executor
才会进行 result_receiver.recv()
).
这就到了体现 mio
强大可拓展性的时候:注册用户态的事件队列。
把上面的代码稍加修改,就成了这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// src/fs_mio.rs
use crossbeam_channel::{unbounded, Sender, TryRecvError};
use std::fs::File;
use std::io::{Read};
use std::boxed::FnBox;
use std::thread;
use failure::Error;
use std::time::Duration;
use mio::*;
#[derive(Clone)]
pub struct Fs {
task_sender: Sender<Task>,
}
pub struct FsHandler {
io_worker: thread::JoinHandle<Result<(), Error>>,
executor: thread::JoinHandle<Result<(), Error>>,
}
const FS_TOKEN: Token = Token(0);
pub fn fs_async() -> (Fs, FsHandler) {
let (task_sender, task_receiver) = unbounded();
let (result_sender, result_receiver) = unbounded();
let poll = Poll::new().unwrap();
let (registration, set_readiness) = Registration::new2();
poll.register(®istration, FS_TOKEN, Ready::readable(), PollOpt::oneshot()).unwrap();
let io_worker = std::thread::spawn(move || {
loop {
match task_receiver.recv() {
Ok(task) => {
match task {
Task::Println(ref string) => println!("{}", string),
Task::Open(path, callback, fs) => {
result_sender
.send(TaskResult::Open(File::open(path)?, callback, fs))?;
set_readiness.set_readiness(Ready::readable())?;
}
Task::ReadToString(mut file, callback, fs) => {
let mut value = String::new();
file.read_to_string(&mut value)?;
result_sender
.send(TaskResult::ReadToString(value, callback, fs))?;
set_readiness.set_readiness(Ready::readable())?;
}
Task::Exit => {
result_sender
.send(TaskResult::Exit)?;
set_readiness.set_readiness(Ready::readable())?;
break;
}
}
}
Err(_) => {
break;
}
}
}
Ok(())
});
let executor = thread::spawn(move || {
let mut events = Events::with_capacity(1024);
'outer: loop {
poll.poll(&mut events, Some(Duration::from_secs(1)))?;
for event in events.iter() {
match event.token() {
FS_TOKEN => {
loop {
match result_receiver.try_recv() {
Ok(result) => {
match result {
TaskResult::ReadToString(value, callback, fs) => callback.call_box((value, fs))?,
TaskResult::Open(file, callback, fs) => callback.call_box((file, fs))?,
TaskResult::Exit => break 'outer
}
}
Err(e) => {
match e {
TryRecvError::Empty => break,
TryRecvError::Disconnected => Err(e)?
}
}
}
}
poll.reregister(®istration, FS_TOKEN, Ready::readable(), PollOpt::oneshot())?;
}
_ => unreachable!()
}
}
};
Ok(())
});
(Fs { task_sender }, FsHandler { io_worker, executor })
}
impl Fs {
pub fn println(&self, string: String) -> Result<(), Error> {
Ok(self.task_sender.send(Task::Println(string))?)
}
pub fn open<F>(&self, path: &str, callback: F) -> Result<(), Error>
where F: FnOnce(File, Fs) -> Result<(), Error> + Sync + Send + 'static {
Ok(self.task_sender.send(Task::Open(path.to_string(), Box::new(callback), self.clone()))?)
}
pub fn read_to_string<F>(&self, file: File, callback: F) -> Result<(), Error>
where F: FnOnce(String, Fs) -> Result<(), Error> + Sync + Send + 'static {
Ok(self.task_sender.send(Task::ReadToString(file, Box::new(callback), self.clone()))?)
}
pub fn close(&self) -> Result<(), Error> {
Ok(self.task_sender.send(Task::Exit)?)
}
}
impl FsHandler {
pub fn join(self) -> Result<(), Error> {
self.io_worker.join().unwrap()?;
self.executor.join().unwrap()
}
}
type FileCallback = Box<FnBox(File, Fs) -> Result<(), Error> + Sync + Send>;
type StringCallback = Box<FnBox(String, Fs) -> Result<(), Error> + Sync + Send>;
pub enum Task {
Exit,
Println(String),
Open(String, FileCallback, Fs),
ReadToString(File, StringCallback, Fs),
}
pub enum TaskResult {
Exit,
Open(File, FileCallback, Fs),
ReadToString(String, StringCallback, Fs),
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// examples/fs-mio.rs
use asyncio::fs_mio::fs_async;
use failure::Error;
const TEST_FILE_VALUE: &str = "Hello, World!";
fn main() -> Result<(), Error> {
let (fs, fs_handler) = fs_async();
fs.open("./examples/test.txt", |file, fs| {
fs.read_to_string(file, |value, fs| {
assert_eq!(TEST_FILE_VALUE, &value);
fs.println(value)?;
fs.close()
})
})?;
fs_handler.join()?;
Ok(())
}
可以注意到,上面的代码发生的改变就是,executor
不再被 result_receiver.recv
阻塞,而变成了注册事件(registration
)后等待 Poll::poll
返回事件;只有等到了新的事件,才会进行 result_receiver.try_recv
。同时,io_worker
线程在 send result
之后会执行 set_readiness.set_readiness(Ready::readable())?;
,以通知 executor
线程对相应结果做处理。
这样的话,executor
就不会被 io worker
阻塞了,因为我们可以把所有的事件都注册到 executor
上,mio::Poll
会同时监听多个事件(比如把 fs
和 tcp
结合起来)。
测试
1
[async-io-demo] cargo run --example fs-mio
Callback is evil
既然文件 IO
的 executor
不再会被 io worker
线程阻塞了,那我们来试试让 fs
和 tcp
共用一个 poll
然后建立一个简单的文件服务器吧。
但可以先等等,因为我已经开始觉得写 callback
有点难受了 —— 如果我们还想处理错误的话,会觉得更难受,像这样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use asyncio::fs_mio::fs_async;
use failure::Error;
const TEST_FILE_VALUE: &str = "Hello, World!";
fn main() -> Result<(), Error> {
let (fs, fs_handler) = fs_async();
fs.open("./examples/test.txt",
|file, fs| {
fs.read_to_string(file,
|value, fs| {
assert_eq!(TEST_FILE_VALUE, &value);
fs.println(value,
|err| {
...
}
);
fs.close()
},
|err| {
...
}
)
},
|err| {
...
}
)?;
fs_handler.join()?;
Ok(())
}
而且对 rust
来说,更加艰难的是闭包中的生命周期问题(闭包几乎不能通过捕获来借用环境变量)。这就意味着,如果我要借用环境中的某个变量,我要么 clone
它(如果它实现了 Clone
的话),要么把它作为闭包参数传入(意味着你要根据需要改每一层回调函数的签名,这太屎了)。
考虑到各种原因,rust
最终选择用 coroutine
作为异步 IO
的 API
抽象。
coroutine
这里所说的 coroutine
是指基于 rust generator
的 stackless coroutine
而非早期被 rust
抛弃的 green thread(stackful coroutine)
。
generator
rust
大概在今年五月份引入了 generator
,但到现在还是 unstable 的 —— 虽说也没多少人用 stable(误
一个典型的斐波那契 generator
如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// examples/fab.rs
#![feature(generators, generator_trait)]
use std::ops::{Generator, GeneratorState};
fn main() {
let mut gen = fab(5);
loop {
match unsafe { gen.resume() } {
GeneratorState::Yielded(value) => println!("yield {}", value),
GeneratorState::Complete(ret) => {
println!("return {}", ret);
break;
}
}
}
}
fn fab(mut n: u64) -> impl Generator<Yield=u64, Return=u64> {
move || {
let mut last = 0u64;
let mut current = 1;
yield last;
while n > 0 {
yield current;
let tmp = last;
last = current;
current = tmp + last;
n -= 1;
}
return last;
}
}
由于 generator
的“中断特性”,我们很自然的可以想到,如果用 generator
搭配 mio
,给每个 generator
分配一个 token
,然后 poll mio
的事件循环,收到一个唤醒事件就 resume
相应的 generator
;每个 generator
在要阻塞的时候拿自己的 token
注册一个唤醒事件然后 yield
,不就实现了“同步代码”的异步 IO
吗?
这样看来原理上来说已经稳了,但 rust
异步 IO
的天空依旧漂浮着两朵乌云。
自引用
第一朵乌云和 rust
自身的内存管理机制有关。
如果你写出这样的 generator
1
2
3
4
5
6
7
8
fn self_ref_generator() -> impl Generator<Yield=u64, Return=()> {
|| {
let x: u64 = 1;
let ref_x: &u64 = &x;
yield 0;
yield *ref_x;
}
}
rust
一定会给你抛个错然后告诉你 “borrow may still be in use when generator yields”。编译器没有教你怎么修正可能会让你有些恐慌,去不存在的搜索引擎上查了查,你发现这和 generator
的实现有关。
前文中提到,rust generator
是 stackless
的,即它并不会保留一个完整的栈,而是根据不同的状态保留需要的变量。如果你把上面的代码改成
1
2
3
4
5
6
7
8
fn no_ref_generator() -> impl Generator<Yield=u64, Return=()> {
|| {
let x: u64 = 1;
let ref_x: &u64 = &x;
yield *ref_x;
yield 0;
}
}
在第一次 yield
结束之后,编译器会发现 generator
唯一需要保留的是字面量 0
,所以这段代码可以顺利编译通过。但是,对于前面的 generator
,第一次 yield
过后,编译器发现你需要同时保留 x
和它的引用 ref_x
,这样的话 generator
就会变成类似这样的结构(仅供参考):
1
2
3
4
5
6
7
8
9
enum SomeGenerator<'a> {
...
SomeState {
_yield: u64,
x: u64
ref_x: &'a u64
}
...
}
这就是 rust
中“臭名昭著” 的自引用,下面这段代码会发生什么呢
1
2
3
4
5
6
7
8
9
10
11
12
struct A<'a> {
b: u64,
ref_b: Option<&'a u64>
}
impl<'a> A<'a> {
fn new() -> Self {
let mut a = A{b: 1, ref_b: None};
a.ref_b = Some(&a.b);
a
}
}
你会发现它编译不过,当然这是很合理的,栈上的 a 变量拷贝出去之后其成员 b 的引用会失效,rust
的生命周期机制帮你规避了这个问题。但即使你改成这样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use std::borrow::{BorrowMut};
struct A<'a> {
b: u64,
ref_b: Option<&'a u64>
}
impl<'a> A<'a> {
fn boxed() -> Box<Self> {
let mut a = Box::new(A{b: 1, ref_b: None});
let mut_ref: &mut A = a.borrow_mut();
mut_ref.ref_b = Some(&mut_ref.b);
a
}
}
这样按道理来说是没问题的,因为 a 的实体已经在堆上了,即使你拷贝它在栈上的引用,也不会改变其成员 b 的地址,引用一直是有效的 —— 但问题是,你没法跟编译器解释这事,编译器认为函数里面的 &mut_ref.b
只能活到函数结束,这样含有这个引用的 a 自然也不能 move 出来。
那你可能会想,那我就在外面再取引用就好了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct A<'a> {
b: u64,
ref_b: Option<&'a u64>
}
impl<'a> A<'a> {
fn new() -> Self {
A{b: 1, ref_b: None}
}
}
fn main() {
let mut a = A::new();
a.ref_b = Some(&a.b);
}
这样的确没啥毛病,但是,你会发现自引用不仅阻止了 move,还阻止了你对 A 可变引用。。比如这样就编译不过
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct A<'a> {
b: u64,
ref_b: Option<&'a u64>
}
impl<'a> A<'a> {
fn new() -> Self {
A{b: 1, ref_b: None}
}
fn mute(&mut self) {
}
}
fn main() {
let mut a = A::new();
a.ref_b = Some(&a.b);
a.mute();
}
但远古的 Future::poll
签名就长这样
1
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
而直到现在 Generator::resume
的签名还是这样
1
unsafe fn resume(&mut self) -> GeneratorState<Self::Yield, Self::Return>;
这样的话自引用会导致 generator
无法实现 Generator
和 Future
在这种情况下,我们可以使用 NonNull
来避过编译器的检查
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use std::ptr::NonNull;
struct A {
b: u64,
ref_b: NonNull<u64>
}
impl A {
fn new() -> Self {
A{b: 1, ref_b: NonNull::dangling()}
}
}
fn main() {
let mut a = A::new();
a.ref_b = NonNull::from(&a.b);
}
这样的确没有了烦人的生命周期约束,但也意味着你要自己保证内存安全 —— 绝对不能 move,也不能对其可变引用使用 mem::replace
或 mem::swap
,这样非常不妙。
Pin
那有没有办法通过其它方式来保证能保证它不能被 move 或者取可变引用呢?这就是 pin
的应用场景了。pin
具体的内容可以看这篇 RFC,本文只是简要说明一下。
rust
默认给大部分类型实现了 trait std::marker::Unpin
,这只是一个标记,表示这个类型 move 是安全的,这时候,Pin<'a, T>
跟 &'a mut T
没有区别,你也可以安全地通过 Pin::new(&mut T)
和 Pin::as_mut(self: &mut Pin<T>)
相互转换。
但对于不能安全 move 的类型,比如上面的 A
,我们得先把它标记为 !Unpin
,安全的标记方法是给它一个 !Unpin
的成员,比如 Pinned
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#![feature(pin)]
use std::marker::{Pinned};
use std::ptr::NonNull;
struct A {
b: u64,
ref_b: NonNull<u64>,
_pin: Pinned,
}
impl A {
fn new() -> Self {
A {
b: 1,
ref_b: NonNull::dangling(),
_pin: Pinned,
}
}
}
fn main() {
let mut a = A::new();
let mut pinned = unsafe { Pin::new_unchecked(&mut a) };
let ref_b = NonNull::from(&pinned.b);
let mut_ref: Pin<&mut A> = pinned.as_mut();
unsafe {Pin::get_mut_unchecked(mut_ref).ref_b = ref_b};
let unmoved = pinned;
assert_eq!(unmoved.ref_b, NonNull::from(&unmoved.b));
}
从 !Unpin
的类型构建 Pin
总是 unsafe
的,它们通过 Pin::new_unchecked
和 Pin::get_mut_unchecked
相互转换。当然,我们在构建时是可以保证它是 safe
,我们只要完成这两个 unsafe
的操作,就可以保证:
- 永远不能
safe
地获得可变引用:Pin::get_mut_unchecked
是unsafe
的 - 永远不能
safe
地 move:因为Pin
只拥有可变引用,且由于Pin::get_mut_unchecked
是unsafe
的,你不能safe
地对其可变引用使用mem::replace
或mem::swap
当然,如果你不想在构建时使用 unsafe
或者想获得 a
的所有权以便在函数间传递,你可以使用 Box::pinned
从而把它分配在堆上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct A {
b: u64,
ref_b: NonNull<u64>,
_pin: Pinned,
}
impl A {
fn boxed() -> Pin<Box<Self>> {
let mut boxed = Box::pinned(A {
b: 1,
ref_b: NonNull::dangling(),
_pin: Pinned,
});
let ref_b = NonNull::from(&boxed.b);
let mut_ref: Pin<&mut A> = boxed.as_mut();
unsafe { Pin::get_mut_unchecked(mut_ref).ref_b = ref_b };
boxed
}
}
fn main() {
let boxed = A::boxed();
let unmoved = boxed;
assert_eq!(unmoved.ref_b, NonNull::from(&unmoved.b));
}
有了 Pin
之后,新版 Future
的定义就是这样的了
1
2
3
4
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output>;
}
合理的抽象
既然已经打算钦定了 coroutine
作为异步 IO
的 API
抽象,那应该把哪些东西加入标准库、哪些东西加入语法支持、哪些东西交给第三方实现呢?让开发者手动调用 unsafe
的 Generator::resume
终归不是很妙,也不好把 mio
作为唯一的底层异步 IO
实现(如果这样的话不如把 mio
也并入标准库)。
现在的 rust
提供了 async
的语法支持(以前是用过程宏的实现的)、await!
的标准库宏支持,标准库 std::future
的 trait Future
和 struct GenFuture
, 标准库 std::task
的 enum Poll<T>, struct LocalWaker, struct Waker
和 trait UnsafeWaker
。
你需要给你的 MyWaker
实现 trait UnsafeWaker
,用 mio
的话就用 SetReadiness
,unsafe fn wake(&self)
用 SetReadiness::set_readiness
实现。然后把 MyWaker
包在 Waker, LocalWaker
里面。
Poll<T>
Poll<T>
的定义为
1
2
3
4
pub enum Poll<T> {
Ready(T),
Pending,
}
await!
await!
宏只能在 async
函数或者块里面用,传入一个 Future
await!(future)
会被展开成
1
2
3
4
5
6
7
8
loop {
if let Poll::Ready(x) = ::future::poll_with_tls(unsafe{
Pin::new_unchecked(&mut future)
}) {
break x;
}
yield
}
::future::poll_with_tls
即 thread-local waker
,就是你传给这个 GenFuture::poll
的 LocalWaker
,
async
async
则会把 Generator
包装成 Future(GenFuture)
。
GenFuture
的相关定义如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct GenFuture<T: Generator<Yield = ()>>(T);
impl<T: Generator<Yield = ()>> !Unpin for GenFuture<T> {}
impl<T: Generator<Yield = ()>> Future for GenFuture<T> {
type Output = T::Return;
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
set_task_waker(lw, || match unsafe { Pin::get_mut_unchecked(self).0.resume() } {
GeneratorState::Yielded(()) => Poll::Pending,
GeneratorState::Complete(x) => Poll::Ready(x),
})
}
}
pub fn from_generator<T: Generator<Yield = ()>>(x: T) -> impl Future<Output = T::Return> {
GenFuture(x)
}
这里可以看到,GenFuture
在每次调用 self.0.resume
之前会 set_task_waker
,通过一个 thread_local
的变量中转,从而 generator
里面的 future::poll
能通过 poll_with_tls
拿到这个 LocalWaker
。
所以,下面的代码
1
2
3
async fn async_recv(string_channel: Receiver<String>) -> String {
await!(string_channel.recv_future())
}
会被类似地展开为这样
1
2
3
4
5
6
7
8
9
10
11
12
13
fn async_recv(string_channel: Receiver<String>) -> impl Future<Output = T::Return> {
from_generator(move || {
let recv_future = string_channel.recv_future();
loop {
if let Poll::Ready(x) = ::future::poll_with_tls(unsafe{
Pin::new_unchecked(&mut recv_future)
}) {
break x;
}
yield
}
})
}
non-blocking coroutine
掌握了上文的基础知识后,我们就可以开始实践了。
coroutine 本身并不意味着“非阻塞”,你完全可以在两次 yield
之间调用阻塞 IO
的 API
从而导致阻塞。 非阻塞的关键在于,在将要阻塞的时候(比如某个 API
返回了 io::ErrorKind::WouldBlock
),在 GenFuture::poll
中用底层异步接口注册一个事件和唤醒回调(waker
)然后自身休眠(yield
),底层异步调度在特定事件发生的时候回调唤醒这个 Future
。
下面我参照 romio
的异步调度实现了 Executor
block_on, spawn, TcpListener
和 TcpStream
,代码较长,建议 clone
后用编辑器看。(请注意区分 Poll(mio::Poll)
与 task::Poll
以及 net::{TcpListener, TcpStream}(mio::net::{TcpListener, TcpStream})
与 TcpListener, TcpStream
)
Executor
Executor
中包含 mio::Poll
,main task waker
及用来管理 task
和 source
的 Slab
各一个。其本身并没有实现什么特别的方法,主要是初始化为 thread_local
的 EXECUTOR
供其它函数借用。
block_on
block_on
函数会阻塞当前线程,传入参数是一个 future: Future<Output=T>
,被称为 main task
;返回值类型是 T
。该函数一般在最外层被调用。
block_on
会引用 thread_local EXECUTOR
,主要逻辑是调用 mio::Poll::poll
来响应事件。block_on
把 0 - MAX_RESOURCE_NUM(1 << 31)
个 Token
分为三类。
-
main task token
收到
Token
为MAIN_TASK_TOKEN
的事件即表示需要唤醒main task
,执行main_task.poll
,返回task::Poll::Ready(T)
则block_on
函数返回。 -
task token
奇数
token
表示由spawn
函数分发的其它任务需要被唤醒,执行相应的task.poll
,token
和该事件在EXECUTOR.tasks
中的index
一一映射。 -
source token
偶数
token
表示由register_source
函数注册的source
需要被分发,执行相应source
的waker()
以唤醒分发它们的task
。
spawn
分发任务
TcpListener
包装了 mio::net::TcpListener
,accept
方法返回一个 Future
。
TcpStream
包装了 mio::net::TcpStream
, read
和 write
方法均返回 Future
。
echo-server
实现了 executor
之后,我们可以就写一个简单的 echo-server
了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// examples/async-echo
#![feature(async_await)]
#![feature(await_macro)]
#[macro_use]
extern crate log;
use asyncio::executor::{block_on, spawn, TcpListener};
use failure::Error;
fn main() -> Result<(), Error> {
env_logger::init();
block_on(
async {
let mut listener = TcpListener::bind(&"127.0.0.1:7878".parse()?)?;
info!("Listening on 127.0.0.1:7878");
while let Ok((mut stream, addr)) = await!(listener.accept()) {
info!("connection from {}", addr);
spawn(
async move {
let client_hello = await!(stream.read())?;
let read_length = client_hello.len();
let write_length =
await!(stream.write(client_hello))?;
assert_eq!(read_length, write_length);
stream.close();
Ok(())
},
)?;
};
Ok(())
},
)?
}
1
RUST_LOG=info cargo run --example async-echo
可以用 telnet
连上试试看。
后记
当然最后还留了一个 demo,就是把文件 IO
也封装为 coroutine
的非阻塞 IO
,实现在 src/fs_future.rs
中,这时可以运行本文开头给的 example 了。
1
RUST_LOG=info cargo run --example file-server
用 telnet
测试
1
2
3
4
5
6
7
[~] telnet 127.0.0.1 7878
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Please enter filename: examples/test.txt
Hello, World!
Connection closed by foreign host.
读者有兴趣的话可以看一下 src/fs_future.rs 中的实现,这里就不细说,接下来我们再谈谈现在 coroutine API
的不足。
我目前发现的主要问题就是不能在 Future::poll
中使用 try
,导致出现 Result
的地方只能 match
,希望之后会有比较好的解决方案(比如给 task::Poll<Result<R, E>>
实现 Try
)。
第二个问题是 Waker
最里面装的是 UnsafeWaker
的 NonNull
指针,当然我能理解 rust
团队有性能等其它方面的考虑,但如果用 mio
的 set_readiness
封装出 MyWaker
的话,clone
完全不需要 NonNull
,而且我在实际编码时因为这个出过空指针错误。。希望以后能提供一个更安全的选择。