|
| 1 | +# Production-Ready Accept Loop |
| 2 | + |
| 3 | +A production-ready accept loop needs the following things: |
| 4 | +1. Handling errors |
| 5 | +2. Limiting the number of simultanteous connections to avoid deny-of-service |
| 6 | + (DoS) attacks |
| 7 | + |
| 8 | + |
| 9 | +## Handling errors |
| 10 | + |
| 11 | +There are two kinds of errors in an accept loop: |
| 12 | +1. Per-connection errors. The system uses them to notify that there was a |
| 13 | + connection in the queue and it's dropped by the peer. Subsequent connections |
| 14 | + can be already queued so next connection must be accepted immediately. |
| 15 | +2. Resource shortages. When these are encountered it doesn't make sense to |
| 16 | + accept the next socket immediately. But the listener stays active, so you server |
| 17 | + should try to accept socket later. |
| 18 | + |
| 19 | +Here is the example of a per-connection error (printed in normal and debug mode): |
| 20 | +``` |
| 21 | +Error: Connection reset by peer (os error 104) |
| 22 | +Error: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" } |
| 23 | +``` |
| 24 | + |
| 25 | +And the following is the most common example of a resource shortage error: |
| 26 | +``` |
| 27 | +Error: Too many open files (os error 24) |
| 28 | +Error: Os { code: 24, kind: Other, message: "Too many open files" } |
| 29 | +``` |
| 30 | + |
| 31 | +### Testing Application |
| 32 | + |
| 33 | +To test your application for these errors try the following (this works |
| 34 | +on unixes only). |
| 35 | + |
| 36 | +Lower limits and start the application: |
| 37 | +``` |
| 38 | +$ ulimit -n 100 |
| 39 | +$ cargo run --example your_app |
| 40 | + Compiling your_app v0.1.0 (/work) |
| 41 | + Finished dev [unoptimized + debuginfo] target(s) in 5.47s |
| 42 | + Running `target/debug/examples/your_app` |
| 43 | +Server is listening on: http://127.0.0.1:1234 |
| 44 | +``` |
| 45 | +Then in another console run the [`wrk`] benchmark tool: |
| 46 | +``` |
| 47 | +$ wrk -c 1000 http://127.0.0.1:1234 |
| 48 | +Running 10s test @ http://localhost:8080/ |
| 49 | + 2 threads and 1000 connections |
| 50 | +$ telnet localhost 1234 |
| 51 | +Trying ::1... |
| 52 | +Connected to localhost. |
| 53 | +``` |
| 54 | + |
| 55 | +Important is to check the following things: |
| 56 | + |
| 57 | +1. The application doesn't crash on error (but may log errors, see below) |
| 58 | +2. It's possible to connect to the application again once load is stopped |
| 59 | + (few seconds after `wrk`). This is what `telnet` does in example above, |
| 60 | + make sure it prints `Connected to <hostname>`. |
| 61 | +3. The `Too many open files` error is logged in the appropriate log. This |
| 62 | + requires to set "maximum number of simultaneous connections" parameter (see |
| 63 | + below) of your application to a value greater then `100` for this example. |
| 64 | +4. Check CPU usage of the app while doing a test. It should not occupy 100% |
| 65 | + of a single CPU core (it's unlikely that you can exhaust CPU by 1000 |
| 66 | + connections in Rust, so this means error handling is not right). |
| 67 | + |
| 68 | +#### Testing non-HTTP applications |
| 69 | + |
| 70 | +If it's possible, use the appropriate benchmark tool and set the appropriate |
| 71 | +number of connections. For example `redis-benchmark` has a `-c` parameter for |
| 72 | +that, if you implement redis protocol. |
| 73 | + |
| 74 | +Alternatively, can still use `wrk`, just make sure that connection is not |
| 75 | +immediately closed. If it is, put a temporary timeout before handing |
| 76 | +the connection to the protocol handler, like this: |
| 77 | + |
| 78 | +```rust,edition2018 |
| 79 | +# extern crate async_std; |
| 80 | +# use std::time::Duration; |
| 81 | +# use async_std::{ |
| 82 | +# net::{TcpListener, ToSocketAddrs}, |
| 83 | +# prelude::*, |
| 84 | +# }; |
| 85 | +# |
| 86 | +# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; |
| 87 | +# |
| 88 | +#async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { |
| 89 | +# let listener = TcpListener::bind(addr).await?; |
| 90 | +# let mut incoming = listener.incoming(); |
| 91 | +while let Some(stream) = incoming.next().await { |
| 92 | + task::spawn(async { |
| 93 | + task:sleep(Duration::from_secs(10)).await; // 1 |
| 94 | + connection_loop(stream).await; |
| 95 | + }); |
| 96 | +} |
| 97 | +# Ok(()) |
| 98 | +# } |
| 99 | +``` |
| 100 | + |
| 101 | +1. Make sure the sleep coroutine is inside the spawned task, not in the loop. |
| 102 | + |
| 103 | +[`wrk`]: https://github.com/wg/wrk |
| 104 | + |
| 105 | + |
| 106 | +### Handling Errors Manually |
| 107 | + |
| 108 | +Here is how basic accept loop could look like: |
| 109 | + |
| 110 | +```rust,edition2018 |
| 111 | +# extern crate async_std; |
| 112 | +# use std::time::Duration; |
| 113 | +# use async_std::{ |
| 114 | +# net::{TcpListener, ToSocketAddrs}, |
| 115 | +# prelude::*, |
| 116 | +# }; |
| 117 | +# |
| 118 | +# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; |
| 119 | +# |
| 120 | +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { |
| 121 | + let listener = TcpListener::bind(addr).await?; |
| 122 | + let mut incoming = listener.incoming(); |
| 123 | + while let Some(result) = incoming.next().await { |
| 124 | + let stream = match stream { |
| 125 | + Err(ref e) if is_connection_error(e) => continue, // 1 |
| 126 | + Err(e) => { |
| 127 | + eprintln!("Error: {}. Pausing for 500ms."); // 3 |
| 128 | + task::sleep(Duration::from_millis(500)).await; // 2 |
| 129 | + continue; |
| 130 | + } |
| 131 | + Ok(s) => s, |
| 132 | + }; |
| 133 | + // body |
| 134 | + } |
| 135 | + Ok(()) |
| 136 | +} |
| 137 | +``` |
| 138 | + |
| 139 | +1. Ignore per-connection errors. |
| 140 | +2. Sleep and continue on resource shortage. |
| 141 | +3. It's important to log the message, because these errors commonly mean the |
| 142 | + misconfiguration of the system and are helpful for operations people running |
| 143 | + the application. |
| 144 | + |
| 145 | +Be sure to [test your application](#testing-application). |
| 146 | + |
| 147 | + |
| 148 | +### External Crates |
| 149 | + |
| 150 | +The crate [`async-listen`] has a helper to achieve this task: |
| 151 | +```rust,edition2018 |
| 152 | +# extern crate async_std; |
| 153 | +# extern crate async_listen; |
| 154 | +# use std::time::Duration; |
| 155 | +# use async_std::{ |
| 156 | +# net::{TcpListener, ToSocketAddrs}, |
| 157 | +# prelude::*, |
| 158 | +# }; |
| 159 | +# |
| 160 | +# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; |
| 161 | +# |
| 162 | +use async_listen::{ListenExt, error_hint}; |
| 163 | +
|
| 164 | +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { |
| 165 | +
|
| 166 | + let listener = TcpListener::bind(addr).await?; |
| 167 | + let mut incoming = listener |
| 168 | + .incoming() |
| 169 | + .log_warnings(log_accept_error) // 1 |
| 170 | + .handle_errors(Duration::from_millis(500)); |
| 171 | + while let Some(socket) = incoming.next().await { // 2 |
| 172 | + // body |
| 173 | + } |
| 174 | + Ok(()) |
| 175 | +} |
| 176 | +
|
| 177 | +fn log_accept_error(e: &io::Error) { |
| 178 | + eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)) // 3 |
| 179 | +} |
| 180 | +``` |
| 181 | + |
| 182 | +1. Logs resource shortages (`async-listen` calls them warnings). If you use |
| 183 | + `log` crate or any other in your app this should go to the log. |
| 184 | +2. Stream yields sockets without `Result` wrapper after `handle_errors` because |
| 185 | + all errors are already handled. |
| 186 | +3. Together with the error we print a hint, which explains some errors for end |
| 187 | + users. For example, it recommends increasing open file limit and gives |
| 188 | + a link. |
| 189 | + |
| 190 | +[`async-listen`]: https://crates.io/crates/async-listen/ |
| 191 | + |
| 192 | +Be sure to [test your application](#testing-application). |
| 193 | + |
| 194 | + |
| 195 | +## Connections Limit |
| 196 | + |
| 197 | +Even if you've applied everything described in |
| 198 | +[Handling Errors](#handling-errors) section, there is still a problem. |
| 199 | + |
| 200 | +Let's imagine you have a server that needs to open a file to process |
| 201 | +client request. At some point, you might encounter the following situation: |
| 202 | + |
| 203 | +1. There are as many client connection as max file descriptors allowed for |
| 204 | + the application. |
| 205 | +2. Listener gets `Too many open files` error so it sleeps. |
| 206 | +3. Some client sends a request via the previously open connection. |
| 207 | +4. Opening a file to serve request fails, because of the same |
| 208 | + `Too many open files` error, until some other client drops a connection. |
| 209 | + |
| 210 | +There are many more possible situations, this is just a small illustation that |
| 211 | +limiting number of connections is very useful. Generally, it's one of the ways |
| 212 | +to control resources used by a server and avoiding some kinds of deny of |
| 213 | +service (DoS) attacks. |
| 214 | + |
| 215 | +### `async-listen` crate |
| 216 | + |
| 217 | +Limiting maximum number of simultaneous connections with [`async-listen`] |
| 218 | +looks like the following: |
| 219 | + |
| 220 | +```rust,edition2018 |
| 221 | +# extern crate async_std; |
| 222 | +# extern crate async_listen; |
| 223 | +# use std::time::Duration; |
| 224 | +# use async_std::{ |
| 225 | +# net::{TcpListener, TcpStream, ToSocketAddrs}, |
| 226 | +# prelude::*, |
| 227 | +# }; |
| 228 | +# |
| 229 | +# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; |
| 230 | +# |
| 231 | +use async_listen::{ListenExt, Token, error_hint}; |
| 232 | +
|
| 233 | +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { |
| 234 | +
|
| 235 | + let listener = TcpListener::bind(addr).await?; |
| 236 | + let mut incoming = listener |
| 237 | + .incoming() |
| 238 | + .log_warnings(log_accept_error) |
| 239 | + .handle_errors(Duration::from_millis(500)) // 1 |
| 240 | + .backpressure(100); |
| 241 | + while let Some((token, socket)) = incoming.next().await { // 2 |
| 242 | + task::spawn(async move { |
| 243 | + connection_loop(&token, stream).await; // 3 |
| 244 | + }); |
| 245 | + } |
| 246 | + Ok(()) |
| 247 | +} |
| 248 | +async fn connection_loop(_token: &Token, stream: TcpStream) { // 4 |
| 249 | + // ... |
| 250 | +} |
| 251 | +# fn log_accept_error(e: &io::Error) { |
| 252 | +# eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)); |
| 253 | +# } |
| 254 | +``` |
| 255 | + |
| 256 | +1. We need to handle errors first, because [`backpressure`] helper expects |
| 257 | + stream of `TcpStream` rather than `Result`. |
| 258 | +2. The token yielded by a new stream is what is counted by backpressure helper. |
| 259 | + I.e. if you drop a token, new connection can be established. |
| 260 | +3. We give the connection loop a reference to token to bind token's lifetime to |
| 261 | + the lifetime of the connection. |
| 262 | +4. The token itsellf in the function can be ignored, hence `_token` |
| 263 | + |
| 264 | +[`backpressure`]: https://docs.rs/async-listen/0.1.2/async_listen/trait.ListenExt.html#method.backpressure |
| 265 | + |
| 266 | +Be sure to [test this behavior](#testing-application). |
0 commit comments