mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Compare commits
1 Commits
prototype-
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
15321ce924 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3212,6 +3212,7 @@ dependencies = [
|
||||
"enum-iterator",
|
||||
"file-store",
|
||||
"flate2",
|
||||
"futures",
|
||||
"indexmap",
|
||||
"insta",
|
||||
"maplit",
|
||||
|
||||
@@ -46,10 +46,11 @@ time = { version = "0.3.41", features = [
|
||||
tracing = "0.1.41"
|
||||
ureq = "2.12.1"
|
||||
uuid = { version = "1.17.0", features = ["serde", "v4"] }
|
||||
backoff = "0.4.0"
|
||||
backoff = { version = "0.4.0", features = ["tokio"] }
|
||||
reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false }
|
||||
rusty-s3 = "0.8.1"
|
||||
tokio = { version = "1.47.1", features = ["full"] }
|
||||
futures = "0.3.31"
|
||||
|
||||
[dev-dependencies]
|
||||
big_s = "1.0.2"
|
||||
|
||||
@@ -438,13 +438,13 @@ async fn multipart_stream_to_s3(
|
||||
db_name: String,
|
||||
reader: std::io::PipeReader,
|
||||
) -> Result<(), Error> {
|
||||
use std::{collections::VecDeque, os::fd::OwnedFd, path::PathBuf};
|
||||
use std::{os::fd::OwnedFd, path::PathBuf};
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use reqwest::{Client, Response};
|
||||
use bytes::BytesMut;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use reqwest::Client;
|
||||
use rusty_s3::S3Action as _;
|
||||
use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
let reader = OwnedFd::from(reader);
|
||||
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
|
||||
@@ -482,9 +482,7 @@ async fn multipart_stream_to_s3(
|
||||
// We use this bumpalo for etags strings.
|
||||
let bump = bumpalo::Bump::new();
|
||||
let mut etags = Vec::<&str>::new();
|
||||
let mut in_flight = VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
|
||||
s3_max_in_flight_parts.get(),
|
||||
);
|
||||
let mut in_flight = FuturesUnordered::new();
|
||||
|
||||
// Part numbers start at 1 and cannot be larger than 10k
|
||||
for part_number in 1u16.. {
|
||||
@@ -498,8 +496,21 @@ async fn multipart_stream_to_s3(
|
||||
|
||||
// Wait for a buffer to be ready if there are in-flight parts that landed
|
||||
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() {
|
||||
let (handle, buffer) = in_flight.pop_front().expect("At least one in flight request");
|
||||
let resp = join_and_map_error(handle).await?;
|
||||
let (join_result, buffer): (
|
||||
Result<reqwest::Result<reqwest::Response>, tokio::task::JoinError>,
|
||||
bytes::Bytes,
|
||||
) = in_flight.next().await.expect("At least one in flight request");
|
||||
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
|
||||
let resp = join_result.unwrap().map_err(Error::S3HttpError)?;
|
||||
let resp = match resp.error_for_status_ref() {
|
||||
Ok(_) => resp,
|
||||
Err(_) => {
|
||||
return Err(Error::S3Error {
|
||||
status: resp.status(),
|
||||
body: resp.text().await.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
};
|
||||
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
|
||||
|
||||
let mut buffer = match buffer.try_into_mut() {
|
||||
@@ -556,11 +567,24 @@ async fn multipart_stream_to_s3(
|
||||
}
|
||||
})
|
||||
});
|
||||
in_flight.push_back((task, body));
|
||||
|
||||
// Wrap the task to return both the result and the buffer
|
||||
let task_with_buffer = async move { (task.await, body) };
|
||||
in_flight.push(task_with_buffer);
|
||||
}
|
||||
|
||||
for (handle, _buffer) in in_flight {
|
||||
let resp = join_and_map_error(handle).await?;
|
||||
while let Some((join_result, _buffer)) = in_flight.next().await {
|
||||
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
|
||||
let resp = join_result.unwrap().map_err(Error::S3HttpError)?;
|
||||
let resp = match resp.error_for_status_ref() {
|
||||
Ok(_) => resp,
|
||||
Err(_) => {
|
||||
return Err(Error::S3Error {
|
||||
status: resp.status(),
|
||||
body: resp.text().await.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
};
|
||||
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
|
||||
}
|
||||
|
||||
@@ -600,22 +624,6 @@ async fn multipart_stream_to_s3(
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn join_and_map_error(
|
||||
join_handle: tokio::task::JoinHandle<Result<reqwest::Response, reqwest::Error>>,
|
||||
) -> Result<reqwest::Response> {
|
||||
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
|
||||
let request = join_handle.await.unwrap();
|
||||
let resp = request.map_err(Error::S3HttpError)?;
|
||||
match resp.error_for_status_ref() {
|
||||
Ok(_) => Ok(resp),
|
||||
Err(_) => Err(Error::S3Error {
|
||||
status: resp.status(),
|
||||
body: resp.text().await.unwrap_or_default(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn extract_and_append_etag<'b>(
|
||||
bump: &'b bumpalo::Bump,
|
||||
|
||||
Reference in New Issue
Block a user