Skip to content

Commit 10a44bc

Browse files
KixironJoshua Nelson
authored and
Joshua Nelson
committed
Updated async dependencies
1 parent bdaf33b commit 10a44bc

File tree

10 files changed

+449
-665
lines changed

10 files changed

+449
-665
lines changed

Cargo.lock

+285-580
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+10-6
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ log = "0.4"
1313
regex = "1"
1414
structopt = "0.3"
1515
crates-index-diff = "7"
16-
reqwest = "0.9"
16+
reqwest = { version = "0.10", features = ["blocking"] }
1717
semver = "0.9"
1818
slug = "=0.1.1"
1919
env_logger = "0.7"
@@ -27,11 +27,6 @@ toml = "0.5"
2727
html5ever = "0.22"
2828
schemamama = "0.3"
2929
schemamama_postgres = "0.2"
30-
rusoto_s3 = "0.40"
31-
rusoto_core = "0.40"
32-
rusoto_credential = "0.40"
33-
futures = "0.1"
34-
tokio = "0.1"
3530
systemstat = "0.1.4"
3631
prometheus = { version = "0.7.0", default-features = false }
3732
lazy_static = "1.0.0"
@@ -62,6 +57,15 @@ notify = "4.0.15"
6257
chrono = { version = "0.4.11", features = ["serde"] }
6358
time = "0.1" # TODO: Remove once `iron` is removed
6459

60+
# Communicating with S3
61+
rusoto_s3 = "0.43"
62+
rusoto_core = "0.43"
63+
rusoto_credential = "0.43"
64+
65+
# Async
66+
futures-util = "0.3"
67+
tokio = { version = "0.2", features = ["rt-threaded"] }
68+
6569
[target.'cfg(not(windows))'.dependencies]
6670
libc = "0.2"
6771

src/db/delete_crate.rs

+29-24
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use crate::storage::s3::{s3_client, S3_BUCKET_NAME};
1+
use crate::storage::s3::{s3_client, S3Backend, S3_BUCKET_NAME};
22
use failure::{Error, Fail};
33
use postgres::Connection;
4-
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3};
4+
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3};
55

66
/// List of directories in docs.rs's underlying storage (either the database or S3) containing a
77
/// subdirectory named after the crate. Those subdirectories will be deleted.
@@ -22,8 +22,9 @@ pub fn delete_crate(conn: &Connection, name: &str) -> Result<(), Error> {
2222
};
2323

2424
delete_from_database(conn, name, crate_id)?;
25-
if let Some(s3) = s3_client() {
26-
delete_from_s3(&s3, name)?;
25+
if let Some(client) = s3_client() {
26+
let mut backend = S3Backend::new(client, S3_BUCKET_NAME);
27+
delete_from_s3(&mut backend, name)?
2728
}
2829

2930
Ok(())
@@ -68,24 +69,25 @@ fn delete_from_database(conn: &Connection, name: &str, crate_id: i32) -> Result<
6869
Ok(())
6970
}
7071

71-
fn delete_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
72+
fn delete_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
7273
for prefix in STORAGE_PATHS_TO_DELETE {
7374
delete_prefix_from_s3(s3, &format!("{}/{}/", prefix, name))?;
7475
}
76+
7577
Ok(())
7678
}
7779

78-
fn delete_prefix_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
80+
fn delete_prefix_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
7981
let mut continuation_token = None;
8082
loop {
81-
let list = s3
82-
.list_objects_v2(ListObjectsV2Request {
83-
bucket: S3_BUCKET_NAME.into(),
84-
prefix: Some(name.into()),
85-
continuation_token,
86-
..ListObjectsV2Request::default()
87-
})
88-
.sync()?;
83+
let list =
84+
s3.runtime_handle()
85+
.block_on(s3.client().list_objects_v2(ListObjectsV2Request {
86+
bucket: S3_BUCKET_NAME.into(),
87+
prefix: Some(name.into()),
88+
continuation_token,
89+
..ListObjectsV2Request::default()
90+
}))?;
8991

9092
let to_delete = list
9193
.contents
@@ -97,20 +99,23 @@ fn delete_prefix_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
9799
version_id: None,
98100
})
99101
.collect::<Vec<_>>();
100-
let resp = s3
101-
.delete_objects(DeleteObjectsRequest {
102-
bucket: S3_BUCKET_NAME.into(),
103-
delete: rusoto_s3::Delete {
104-
objects: to_delete,
105-
quiet: None,
106-
},
107-
..DeleteObjectsRequest::default()
108-
})
109-
.sync()?;
102+
103+
let resp =
104+
s3.runtime_handle()
105+
.block_on(s3.client().delete_objects(DeleteObjectsRequest {
106+
bucket: S3_BUCKET_NAME.into(),
107+
delete: rusoto_s3::Delete {
108+
objects: to_delete,
109+
quiet: None,
110+
},
111+
..DeleteObjectsRequest::default()
112+
}))?;
113+
110114
if let Some(errs) = resp.errors {
111115
for err in &errs {
112116
log::error!("error deleting file from s3: {:?}", err);
113117
}
118+
114119
failure::bail!("uploading to s3 failed");
115120
}
116121

src/index/api.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{error::Result, utils::MetadataPackage};
22
use chrono::{DateTime, Utc};
33
use failure::err_msg;
4-
use reqwest::{header::ACCEPT, Client};
4+
use reqwest::{blocking::Client, header::ACCEPT};
55
use semver::Version;
66
use serde_json::Value;
77
use std::io::Read;

src/storage/mod.rs

+15-6
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub fn get_file_list<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>, Error> {
5959

6060
pub(crate) enum Storage<'a> {
6161
Database(DatabaseBackend<'a>),
62-
S3(S3Backend<'a>),
62+
S3(Box<S3Backend<'a>>),
6363
}
6464

6565
impl<'a> Storage<'a> {
@@ -70,16 +70,17 @@ impl<'a> Storage<'a> {
7070
DatabaseBackend::new(conn).into()
7171
}
7272
}
73-
pub(crate) fn get(&self, path: &str) -> Result<Blob, Error> {
73+
74+
pub(crate) fn get(&mut self, path: &str) -> Result<Blob, Error> {
7475
match self {
7576
Self::Database(db) => db.get(path),
7677
Self::S3(s3) => s3.get(path),
7778
}
7879
}
7980

80-
fn store_batch(&mut self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> {
81+
fn store_batch(&mut self, batch: Vec<Blob>, trans: &Transaction) -> Result<(), Error> {
8182
match self {
82-
Self::Database(db) => db.store_batch(batch, trans),
83+
Self::Database(db) => db.store_batch(&batch, trans),
8384
Self::S3(s3) => s3.store_batch(batch),
8485
}
8586
}
@@ -131,15 +132,18 @@ impl<'a> Storage<'a> {
131132
date_updated: Utc::now(),
132133
})
133134
});
135+
134136
loop {
135137
let batch: Vec<_> = blobs
136138
.by_ref()
137139
.take(MAX_CONCURRENT_UPLOADS)
138140
.collect::<Result<_, Error>>()?;
141+
139142
if batch.is_empty() {
140143
break;
141144
}
142-
self.store_batch(&batch, &trans)?;
145+
146+
self.store_batch(batch, &trans)?;
143147
}
144148

145149
trans.commit()?;
@@ -152,6 +156,7 @@ fn detect_mime(file_path: &Path) -> Result<&'static str, Error> {
152156
.first_raw()
153157
.map(|m| m)
154158
.unwrap_or("text/plain");
159+
155160
Ok(match mime {
156161
"text/plain" | "text/troff" | "text/x-markdown" | "text/x-rust" | "text/x-toml" => {
157162
match file_path.extension().and_then(OsStr::to_str) {
@@ -178,7 +183,7 @@ impl<'a> From<DatabaseBackend<'a>> for Storage<'a> {
178183

179184
impl<'a> From<S3Backend<'a>> for Storage<'a> {
180185
fn from(db: S3Backend<'a>) -> Self {
181-
Self::S3(db)
186+
Self::S3(Box::new(db))
182187
}
183188
}
184189

@@ -200,19 +205,23 @@ mod test {
200205
.prefix("docs.rs-upload-test")
201206
.tempdir()
202207
.unwrap();
208+
203209
for blob in blobs {
204210
let path = dir.path().join(&blob.path);
205211
if let Some(parent) = path.parent() {
206212
fs::create_dir_all(parent).unwrap();
207213
}
214+
208215
fs::write(path, &blob.content).expect("failed to write to file");
209216
}
217+
210218
wrapper(|env| {
211219
let db = env.db();
212220
let conn = db.conn();
213221
let mut backend = Storage::Database(DatabaseBackend::new(&conn));
214222
let stored_files = backend.store_all(&conn, "", dir.path()).unwrap();
215223
assert_eq!(stored_files.len(), blobs.len());
224+
216225
for blob in blobs {
217226
let name = Path::new(&blob.path);
218227
assert!(stored_files.contains_key(name));

src/storage/s3.rs

+67-29
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use super::Blob;
22
use chrono::{DateTime, NaiveDateTime, Utc};
33
use failure::Error;
4-
use futures::Future;
4+
use futures_util::{
5+
future::FutureExt,
6+
stream::{FuturesUnordered, StreamExt},
7+
};
58
use log::{error, warn};
69
use rusoto_core::region::Region;
710
use rusoto_credential::DefaultCredentialsProvider;
811
use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3};
9-
use std::convert::TryInto;
10-
use std::io::Read;
11-
use tokio::runtime::Runtime;
12+
use std::{convert::TryInto, io::Read};
13+
use tokio::runtime::{Handle, Runtime};
1214

1315
#[cfg(test)]
1416
mod test;
@@ -32,15 +34,23 @@ impl<'a> S3Backend<'a> {
3234
}
3335
}
3436

35-
pub(super) fn get(&self, path: &str) -> Result<Blob, Error> {
37+
#[cfg(test)]
38+
pub(crate) fn with_runtime(client: S3Client, bucket: &'a str, runtime: Runtime) -> Self {
39+
Self {
40+
client,
41+
bucket,
42+
runtime,
43+
}
44+
}
45+
46+
pub(super) fn get(&mut self, path: &str) -> Result<Blob, Error> {
3647
let res = self
37-
.client
38-
.get_object(GetObjectRequest {
48+
.runtime
49+
.block_on(self.client.get_object(GetObjectRequest {
3950
bucket: self.bucket.to_string(),
4051
key: path.into(),
4152
..Default::default()
42-
})
43-
.sync()?;
53+
}))?;
4454

4555
let mut b = res.body.unwrap().into_blocking_read();
4656
let mut content = Vec::with_capacity(
@@ -60,14 +70,16 @@ impl<'a> S3Backend<'a> {
6070
})
6171
}
6272

63-
pub(super) fn store_batch(&mut self, batch: &[Blob]) -> Result<(), Error> {
64-
use futures::stream::FuturesUnordered;
65-
use futures::stream::Stream;
73+
pub(super) fn store_batch(&mut self, mut uploads: Vec<Blob>) -> Result<(), Error> {
6674
let mut attempts = 0;
6775

6876
loop {
69-
let mut futures = FuturesUnordered::new();
70-
for blob in batch {
77+
// `FuturesUnordered` is used because the order of execution doesn't
78+
// matter, we just want things to execute as fast as possible
79+
let futures = FuturesUnordered::new();
80+
81+
// Drain uploads, filling `futures` with upload requests
82+
for blob in uploads.drain(..) {
7183
futures.push(
7284
self.client
7385
.put_object(PutObjectRequest {
@@ -77,27 +89,53 @@ impl<'a> S3Backend<'a> {
7789
content_type: Some(blob.mime.clone()),
7890
..Default::default()
7991
})
80-
.inspect(|_| {
81-
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
92+
// Drop the value returned by `put_object` because we don't need it,
93+
// emit an error and replace the error values with the blob that failed
94+
// to upload so that we can retry failed uploads
95+
.map(|resp| match resp {
96+
Ok(..) => {
97+
// Increment the total uploaded files when a file is uploaded
98+
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
99+
100+
Ok(())
101+
}
102+
Err(err) => {
103+
error!("failed to upload file to s3: {:?}", err);
104+
Err(blob)
105+
}
82106
}),
83107
);
84108
}
85109
attempts += 1;
86110

87-
match self.runtime.block_on(futures.map(drop).collect()) {
88-
// this batch was successful, start another batch if there are still more files
89-
Ok(_) => break,
90-
Err(err) => {
91-
error!("failed to upload to s3: {:?}", err);
92-
// if a futures error occurs, retry the batch
93-
if attempts > 2 {
94-
panic!("failed to upload 3 times, exiting");
95-
}
96-
}
111+
// Collect all the failed uploads so that we can retry them
112+
uploads = self.runtime.block_on(
113+
futures
114+
.filter_map(|resp| async move { resp.err() })
115+
.collect(),
116+
);
117+
118+
// If there are no further uploads we were successful and can return
119+
if uploads.is_empty() {
120+
break;
121+
122+
// If more than three attempts to upload fail, return an error
123+
} else if attempts >= 3 {
124+
error!("failed to upload to s3, abandoning");
125+
failure::bail!("Failed to upload to s3 three times, abandoning");
97126
}
98127
}
128+
99129
Ok(())
100130
}
131+
132+
pub fn runtime_handle(&self) -> Handle {
133+
self.runtime.handle().clone()
134+
}
135+
136+
pub fn client(&self) -> &S3Client {
137+
&self.client
138+
}
101139
}
102140

103141
fn parse_timespec(mut raw: &str) -> Result<DateTime<Utc>, Error> {
@@ -142,7 +180,6 @@ pub(crate) mod tests {
142180
use super::*;
143181
use crate::test::*;
144182
use chrono::TimeZone;
145-
use std::slice;
146183

147184
#[test]
148185
fn test_parse_timespec() {
@@ -172,7 +209,7 @@ pub(crate) mod tests {
172209

173210
// Add a test file to the database
174211
let s3 = env.s3();
175-
s3.upload(slice::from_ref(&blob)).unwrap();
212+
s3.upload(vec![blob.clone()]).unwrap();
176213

177214
// Test that the proper file was returned
178215
s3.assert_blob(&blob, "dir/foo.txt");
@@ -207,10 +244,11 @@ pub(crate) mod tests {
207244
})
208245
.collect();
209246

210-
s3.upload(&blobs).unwrap();
247+
s3.upload(blobs.clone()).unwrap();
211248
for blob in &blobs {
212249
s3.assert_blob(blob, &blob.path);
213250
}
251+
214252
Ok(())
215253
})
216254
}

0 commit comments

Comments
 (0)