Skip to content

Commit c24a963

Browse files
ijacksonSergioBenitez
authored andcommitted
Add support for immediate chunk flushing, SSE.
Problem: To support Server-Side Events (SSE, aka JS EventSource) it is necessary for the server to keep open an HTTP request and dribble out data (the event stream) as it is generated. Currently, Rocket handles this poorly. It likes to fill in complete chunks. Also there is no way to force a flush of the underlying stream: in particular, there is a BufWriter underneath hyper. hyper would honour a flush request, but there is no way to send one. Options: Ideally the code which is producing the data would be able to explicitly designate when a flush should occur. Certainly it would not be acceptable to flush all the time for all readers. 1. Invent a new kind of Body (UnbufferedChunked) which translates the data from each Read::read() call into a single call to the stream write, and which always flushes. This would be a seriously invasive change. And it would mean that SSE systems with fast event streams might work poorly. 2. Invent a new kind of Body which doesn't use Read at all, and instead has a more sophisticated API. This would be super-invasive and heavyweight. 3. Find a way to encode the necessary information in the Read trait protocol. Chosen solution: It turns out that option 3 is quite easy. The read() call can return an io::Error. There are at least some errors that clearly ought not to occur here. An obvious one is ErrorKind::WouldBlock. Rocket expects the reader to block. WouldBlock is only applicable to nonblocking objects. And indeed the reader will generally want to return it (once) when it is about to block. We have the Stream treat io::Error with ErrorKind::WouldBlock, from its reader, as a request to flush. There are two effects: we stop trying to accumulate a full chunk, and we issue a flush call to the underlying writer (which, eventually, makes it all the way down into hyper and BufWriter). Implementation: We provide a method ReadExt::read_max_wfs which is like read_max but which handles the WouldBlock case specially. It tells its caller whether a flush was wanted. This is implemented by adding a new code to read_max_internal. with a boolean to control it. This seemed better than inventing a trait or something. (The other read_max call site is reading http headers in data.rs, and I think it wants to tread WouldBlock as an error.) Risks and downsides: Obviously this ad-hoc extension to the Read protocol is not particularly pretty. At least, people who aren't doing SSE (or similar) won't need it and can ignore it. If for some reason the data source is actually nonblocking, this new arrangement would spin, rather than calling the situation a fatal error. This possibility seems fairly remote, in production settings at least. To migitate this it might be possible for the loop in Rocket::issue_response to bomb out if it notices it is sending lots of consecutive empty chunks. It is possible that async Rocket will want to take a different approach entirely. But it will definitely need to solve this problem somehow, and naively it seems like the obvious transformation to eg the Tokio read trait would have the same API limitation and admit the same solution. (Having a flush occur every time the body stream future returns Pending would not be good for performance, I think.)
1 parent 7b1995c commit c24a963

File tree

8 files changed

+153
-14
lines changed

8 files changed

+153
-14
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ members = [
2525
"examples/testing",
2626
"examples/request_local_state",
2727
"examples/request_guard",
28+
"examples/sse",
2829
"examples/stream",
2930
"examples/json",
3031
"examples/msgpack",

core/lib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ all-features = true
2020
[features]
2121
default = ["private-cookies"]
2222
tls = ["rocket_http/tls"]
23+
sse = []
2324
private-cookies = ["rocket_http/private-cookies"]
2425

2526
[dependencies]

core/lib/src/ext.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,35 @@
11
use std::io;
22

3-
pub trait ReadExt: io::Read {
4-
fn read_max(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
5-
let start_len = buf.len();
6-
while !buf.is_empty() {
7-
match self.read(buf) {
8-
Ok(0) => break,
9-
Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; }
10-
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
11-
Err(e) => return Err(e),
12-
}
3+
fn read_max_internal<T: io::Read>(reader: &mut T, mut buf: &mut [u8],
4+
wouldblock_flush_signalling: bool)
5+
-> io::Result<(usize, bool)> {
6+
let start_len = buf.len();
7+
let need_flush = loop {
8+
if buf.is_empty() { break false }
9+
match reader.read(buf) {
10+
Ok(0) => { break true }
11+
Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; }
12+
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
13+
Err(ref e) if (e.kind() == io::ErrorKind::WouldBlock &&
14+
wouldblock_flush_signalling) => { break true }
15+
Err(e) => return Err(e),
1316
}
17+
};
1418

15-
Ok(start_len - buf.len())
19+
Ok((start_len - buf.len(), need_flush))
20+
}
21+
22+
pub trait ReadExt: io::Read + Sized {
23+
fn read_max(&mut self, buf: &mut [u8]) -> io::Result<usize> {
24+
Ok(read_max_internal(self, buf, false)?.0)
25+
}
26+
27+
/// Tries to fill buf with data. Short reads can occur for EOF or
28+
/// flush requests. With SSE enabled, a flush request occurs if
29+
/// the underlying reader returns ErrorKind::Wouldblock
30+
fn read_max_wfs(&mut self, buf: &mut [u8])
31+
-> io::Result<(usize, bool)> {
32+
read_max_internal(self, buf, cfg!(feature="sse"))
1633
}
1734
}
1835

core/lib/src/response/response.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,6 +1018,12 @@ impl<'r> Response<'r> {
10181018
/// [DEFAULT_CHUNK_SIZE](::response::DEFAULT_CHUNK_SIZE). Use
10191019
/// [set_chunked_body](#method.set_chunked_body) for custom chunk sizes.
10201020
///
1021+
/// Normally, data will be buffered and sent only in complete
1022+
/// chunks. If you need timely transmission of available data,
1023+
/// rather than buffering, enable the `sse` feature and use the
1024+
/// `WouldBlock` technique described in
1025+
/// [Stream](::response::Stream).
1026+
///
10211027
/// # Example
10221028
///
10231029
/// ```rust

core/lib/src/response/stream.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,30 @@ impl<T: Read> Stream<T> {
2929
/// # #[allow(unused_variables)]
3030
/// let response = Stream::chunked(io::stdin(), 10);
3131
/// ```
32+
///
33+
/// # Buffering and blocking
34+
///
35+
/// Normally, data will be buffered and sent only in complete
36+
/// `chunk_size` chunks.
37+
///
38+
/// With the feature `sse` enabled, the `Read`er may signal that
39+
/// data sent so far should be transmitted in a timely fashion
40+
/// (e.g. it is responding to a Server-Side Events (JavaScript
41+
/// `EventSource`) request. To do this it should return an
42+
/// [io::Error](std::io::Error) of kind `WouldBlock` (which should
43+
/// not normally occur), after returning a collection of data.
44+
/// This will cause a flush of data seen so far, rather than being
45+
/// treated as an error.
46+
///
47+
/// Note that long-running responses may easily exhaust Rocket's
48+
/// thread pool, so consider increasing the number of threads.
49+
/// If doing SSE, also note the 'maximum open connections' browser
50+
/// limitation which is described in the
51+
/// [EventSource documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource)
52+
/// on the Mozilla Developer Network.
53+
///
54+
/// Without the `sse` feature, a `WouldBlock` error is treated
55+
/// as an actual error.
3256
pub fn chunked(reader: T, chunk_size: u64) -> Stream<T> {
3357
Stream(reader, chunk_size)
3458
}

core/lib/src/rocket.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,12 @@ impl Rocket {
159159
let mut buffer = vec![0; chunk_size as usize];
160160
let mut stream = hyp_res.start()?;
161161
loop {
162-
match body.read_max(&mut buffer)? {
163-
0 => break,
164-
n => stream.write_all(&buffer[..n])?,
162+
match body.read_max_wfs(&mut buffer)? {
163+
(0, _) => break,
164+
(n, f) => {
165+
stream.write_all(&buffer[..n])?;
166+
if f { stream.flush()? }
167+
},
165168
}
166169
}
167170

examples/sse/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "sse"
3+
version = "0.0.0"
4+
workspace = "../../"
5+
publish = false
6+
7+
[dependencies]
8+
rocket = { path = "../../core/lib", features = ["sse"] }

examples/sse/src/main.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#![feature(proc_macro_hygiene, decl_macro)]
2+
#[macro_use]
3+
extern crate rocket;
4+
5+
use rocket::http::ContentType;
6+
use rocket::response::Content;
7+
use rocket::response::Responder;
8+
use std::io::BufReader;
9+
use std::io::Read;
10+
use std::thread::sleep;
11+
use std::time::Duration;
12+
13+
#[get("/")]
14+
fn index<'r>() -> impl Responder<'r> {
15+
Content(
16+
ContentType::HTML,
17+
r##"
18+
<body>
19+
<h1>Hi!</h1>
20+
21+
<div id="spong">nothing yet</div>
22+
23+
</body>
24+
<script src="script.js"></script>
25+
"##,
26+
)
27+
}
28+
29+
#[get("/script.js")]
30+
fn script<'r>() -> impl Responder<'r> {
31+
Content(
32+
ContentType::JavaScript,
33+
r##"
34+
status_node = document.getElementById('spong');
35+
status_node.innerHTML = 'js-done'
36+
37+
es = new EventSource("updates");
38+
es.onmessage = function(event) {
39+
status_node.innerHTML = event.data;
40+
}
41+
"##,
42+
)
43+
}
44+
45+
const BUF_SIZE : usize = 4096;
46+
47+
type TestCounter = BufReader<TestCounterInner>;
48+
#[derive(Debug)]
49+
struct TestCounterInner {
50+
next: usize,
51+
}
52+
impl Read for TestCounterInner {
53+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
54+
sleep(Duration::from_millis(500));
55+
let data = format!("data: {}\n\n", self.next);
56+
self.next += 1;
57+
// `BufReader` won't call us unless its buffer is empty, and
58+
// then buf will be the whole of the buffer, ie of size
59+
// BUF_SIZE (due to the `with_capacity` call). So `data` is
60+
// definitely going to fit.
61+
buf[0..data.len()].copy_from_slice(data.as_bytes());
62+
Ok(buf.len())
63+
}
64+
}
65+
66+
#[get("/updates")]
67+
fn updates<'x>() -> impl Responder<'x> {
68+
let tc = TestCounterInner { next: 0 };
69+
let tc = BufReader::with_capacity(BUF_SIZE, tc);
70+
let ch = rocket::response::Stream::from(tc);
71+
let ct = ContentType::parse_flexible("text/event-stream; charset=utf-8").unwrap();
72+
Content(ct, ch)
73+
}
74+
75+
fn main() {
76+
rocket::ignite()
77+
.mount("/", routes![index, script, updates,])
78+
.launch();
79+
}

0 commit comments

Comments
 (0)