Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 794fa49

Browse files
committedJan 9, 2025·
Fetch concurrently for non-first-match index strategies
1 parent 15ec830 commit 794fa49

File tree

1 file changed

+83
-34
lines changed

1 file changed

+83
-34
lines changed
 

‎crates/uv-client/src/registry_client.rs

+83-34
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
use async_http_range_reader::AsyncHttpRangeReader;
2-
use futures::{FutureExt, TryStreamExt};
3-
use http::HeaderMap;
4-
use itertools::Either;
5-
use reqwest::{Client, Response, StatusCode};
6-
use reqwest_middleware::ClientWithMiddleware;
71
use std::collections::BTreeMap;
82
use std::fmt::Debug;
93
use std::path::PathBuf;
104
use std::str::FromStr;
115
use std::time::Duration;
6+
7+
use async_http_range_reader::AsyncHttpRangeReader;
8+
use futures::{FutureExt, StreamExt, TryStreamExt};
9+
use http::HeaderMap;
10+
use itertools::Either;
11+
use reqwest::{Client, Response, StatusCode};
12+
use reqwest_middleware::ClientWithMiddleware;
1213
use tracing::{info_span, instrument, trace, warn, Instrument};
1314
use url::Url;
1415

@@ -247,38 +248,86 @@ impl RegistryClient {
247248
}
248249

249250
let mut results = Vec::new();
250-
for index in it {
251-
match self.simple_single_index(package_name, index).await {
252-
Ok(metadata) => {
253-
results.push((index, metadata));
254-
255-
// If we're only using the first match, we can stop here.
256-
if self.index_strategy == IndexStrategy::FirstIndex {
257-
break;
258-
}
259-
}
260-
Err(err) => match err.into_kind() {
261-
// The package could not be found in the remote index.
262-
ErrorKind::WrappedReqwestError(url, err) => match err.status() {
263-
Some(StatusCode::NOT_FOUND) => {}
264-
Some(StatusCode::UNAUTHORIZED) => {
265-
capabilities.set_unauthorized(index.clone());
266-
}
267-
Some(StatusCode::FORBIDDEN) => {
268-
capabilities.set_forbidden(index.clone());
251+
252+
match self.index_strategy {
253+
// If we're searching for the first index that contains the package, fetch serially.
254+
IndexStrategy::FirstIndex => {
255+
for index in it {
256+
match self.simple_single_index(package_name, index).await {
257+
Ok(metadata) => {
258+
results.push((index, metadata));
259+
break;
269260
}
270-
_ => return Err(ErrorKind::WrappedReqwestError(url, err).into()),
271-
},
261+
Err(err) => match err.into_kind() {
262+
// The package could not be found in the remote index.
263+
ErrorKind::WrappedReqwestError(url, err) => match err.status() {
264+
Some(StatusCode::NOT_FOUND) => {}
265+
Some(StatusCode::UNAUTHORIZED) => {
266+
capabilities.set_unauthorized(index.clone());
267+
}
268+
Some(StatusCode::FORBIDDEN) => {
269+
capabilities.set_forbidden(index.clone());
270+
}
271+
_ => return Err(ErrorKind::WrappedReqwestError(url, err).into()),
272+
},
273+
274+
// The package is unavailable due to a lack of connectivity.
275+
ErrorKind::Offline(_) => {}
276+
277+
// The package could not be found in the local index.
278+
ErrorKind::FileNotFound(_) => {}
279+
280+
err => return Err(err.into()),
281+
},
282+
};
283+
}
284+
}
272285

273-
// The package is unavailable due to a lack of connectivity.
274-
ErrorKind::Offline(_) => {}
286+
// Otherwise, fetch concurrently.
287+
IndexStrategy::UnsafeBestMatch | IndexStrategy::UnsafeFirstMatch => {
288+
let fetches = futures::stream::iter(it)
289+
.map(|index| async move {
290+
match self.simple_single_index(package_name, index).await {
291+
Ok(metadata) => Ok(Some((index, metadata))),
292+
Err(err) => match err.into_kind() {
293+
// The package could not be found in the remote index.
294+
ErrorKind::WrappedReqwestError(url, err) => match err.status() {
295+
Some(StatusCode::NOT_FOUND) => Ok(None),
296+
Some(StatusCode::UNAUTHORIZED) => {
297+
capabilities.set_unauthorized(index.clone());
298+
Ok(None)
299+
}
300+
Some(StatusCode::FORBIDDEN) => {
301+
capabilities.set_forbidden(index.clone());
302+
Ok(None)
303+
}
304+
_ => Err(ErrorKind::WrappedReqwestError(url, err).into()),
305+
},
306+
307+
// The package is unavailable due to a lack of connectivity.
308+
ErrorKind::Offline(_) => Ok(None),
309+
310+
// The package could not be found in the local index.
311+
ErrorKind::FileNotFound(_) => Ok(None),
312+
313+
err => Err(err.into()),
314+
},
315+
}
316+
})
317+
.buffered(8);
275318

276-
// The package could not be found in the local index.
277-
ErrorKind::FileNotFound(_) => {}
319+
futures::pin_mut!(fetches);
278320

279-
other => return Err(other.into()),
280-
},
281-
};
321+
while let Some(result) = fetches.next().await {
322+
match result {
323+
Ok(Some((index, metadata))) => {
324+
results.push((index, metadata));
325+
}
326+
Ok(None) => continue,
327+
Err(err) => return Err(err),
328+
}
329+
}
330+
}
282331
}
283332

284333
if results.is_empty() {

0 commit comments

Comments
 (0)
Please sign in to comment.