Compare commits

..

1 Commits

Author SHA1 Message Date
Kerollmops
15321ce924 Replace the hand-made VecDequeue by a FutureUnordered 2025-11-12 09:12:37 +01:00
48 changed files with 147 additions and 535 deletions

View File

@@ -65,9 +65,9 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [macos-14, windows-2022]
os: [macos-13, windows-2022]
include:
- os: macos-14
- os: macos-13
artifact_name: meilisearch
asset_name: meilisearch-macos-amd64
- os: windows-2022
@@ -90,7 +90,7 @@ jobs:
publish-macos-apple-silicon:
name: Publish binary for macOS silicon
runs-on: macos-14
runs-on: macos-13
needs: check-version
strategy:
matrix:

View File

@@ -47,7 +47,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [macos-14, windows-2022]
os: [macos-13, windows-2022]
steps:
- uses: actions/checkout@v5
- name: Cache dependencies

57
Cargo.lock generated
View File

@@ -345,6 +345,12 @@ version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "allocator-api2"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c583acf993cf4245c4acb0a2cc2ab1f9cc097de73411bb6d3647ff6af2b1013d"
[[package]]
name = "anes"
version = "0.1.6"
@@ -584,7 +590,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
[[package]]
name = "benchmarks"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"anyhow",
"bumpalo",
@@ -794,7 +800,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"anyhow",
"time",
@@ -807,7 +813,7 @@ version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
dependencies = [
"allocator-api2",
"allocator-api2 0.2.21",
"serde",
]
@@ -817,7 +823,7 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ce682bdc86c2e25ef5cd95881d9d6a1902214eddf74cf9ffea88fe1464377e8"
dependencies = [
"allocator-api2",
"allocator-api2 0.2.21",
"bitpacking",
"bumpalo",
"hashbrown 0.15.5",
@@ -1784,7 +1790,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"anyhow",
"big_s",
@@ -2027,7 +2033,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-store"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"tempfile",
"thiserror 2.0.16",
@@ -2049,7 +2055,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"insta",
"levenshtein_automata",
@@ -2077,7 +2083,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"criterion",
"serde_json",
@@ -2234,7 +2240,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"arbitrary",
"bumpalo",
@@ -2760,7 +2766,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash 0.8.12",
"allocator-api2",
"allocator-api2 0.2.21",
]
[[package]]
@@ -2769,7 +2775,7 @@ version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2",
"allocator-api2 0.2.21",
"equivalent",
"foldhash",
"serde",
@@ -2922,12 +2928,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]]
name = "hyper"
version = "1.7.0"
@@ -3194,7 +3194,7 @@ dependencies = [
[[package]]
name = "index-scheduler"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"anyhow",
"backoff",
@@ -3212,6 +3212,7 @@ dependencies = [
"enum-iterator",
"file-store",
"flate2",
"futures",
"indexmap",
"insta",
"maplit",
@@ -3467,7 +3468,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"criterion",
"serde_json",
@@ -3986,7 +3987,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"insta",
"md5",
@@ -3997,7 +3998,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"actix-cors",
"actix-http",
@@ -4026,7 +4027,6 @@ dependencies = [
"futures",
"futures-util",
"hex",
"humantime",
"index-scheduler",
"indexmap",
"insta",
@@ -4095,7 +4095,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@@ -4114,7 +4114,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"actix-web",
"anyhow",
@@ -4149,7 +4149,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"anyhow",
"clap",
@@ -4183,8 +4183,9 @@ dependencies = [
[[package]]
name = "milli"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"allocator-api2 0.3.1",
"arroy",
"bbqueue",
"big_s",
@@ -4764,7 +4765,7 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "permissive-json-pointer"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"big_s",
"serde_json",
@@ -7886,7 +7887,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.26.0"
version = "1.25.0"
dependencies = [
"anyhow",
"build-info",

View File

@@ -23,7 +23,7 @@ members = [
]
[workspace.package]
version = "1.26.0"
version = "1.25.0"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@@ -341,7 +341,6 @@ pub(crate) mod test {
prefix_search: Setting::NotSet,
chat: Setting::NotSet,
vector_store: Setting::NotSet,
execute_after_update: Setting::NotSet,
_kind: std::marker::PhantomData,
};
settings.check()

View File

@@ -423,7 +423,6 @@ impl<T> From<v5::Settings<T>> for v6::Settings<v6::Unchecked> {
prefix_search: v6::Setting::NotSet,
chat: v6::Setting::NotSet,
vector_store: v6::Setting::NotSet,
execute_after_update: v6::Setting::NotSet,
_kind: std::marker::PhantomData,
}
}

View File

@@ -46,10 +46,11 @@ time = { version = "0.3.41", features = [
tracing = "0.1.41"
ureq = "2.12.1"
uuid = { version = "1.17.0", features = ["serde", "v4"] }
backoff = "0.4.0"
backoff = { version = "0.4.0", features = ["tokio"] }
reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false }
rusty-s3 = "0.8.1"
tokio = { version = "1.47.1", features = ["full"] }
futures = "0.3.31"
[dev-dependencies]
big_s = "1.0.2"

View File

@@ -306,18 +306,6 @@ fn create_or_open_index(
) -> Result<Index> {
let options = EnvOpenOptions::new();
let mut options = options.read_txn_without_tls();
let map_size = match std::env::var("MEILI_MAX_INDEX_SIZE") {
Ok(max_size) => {
let max_size = max_size.parse().unwrap();
map_size.min(max_size)
}
Err(VarError::NotPresent) => map_size,
Err(VarError::NotUnicode(e)) => {
panic!("Non unicode max index size in `MEILI_MAX_INDEX_SIZE`: {e:?}")
}
};
options.map_size(clamp_to_page_size(map_size));
// You can find more details about this experimental

View File

@@ -438,16 +438,13 @@ async fn multipart_stream_to_s3(
db_name: String,
reader: std::io::PipeReader,
) -> Result<(), Error> {
use std::collections::VecDeque;
use std::io;
use std::os::fd::OwnedFd;
use std::path::PathBuf;
use std::{os::fd::OwnedFd, path::PathBuf};
use bytes::{Bytes, BytesMut};
use reqwest::{Client, Response};
use rusty_s3::actions::CreateMultipartUpload;
use rusty_s3::{Bucket, BucketError, Credentials, S3Action as _, UrlStyle};
use tokio::task::JoinHandle;
use bytes::BytesMut;
use futures::stream::{FuturesUnordered, StreamExt};
use reqwest::Client;
use rusty_s3::S3Action as _;
use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle};
let reader = OwnedFd::from(reader);
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
@@ -485,9 +482,7 @@ async fn multipart_stream_to_s3(
// We use this bumpalo for etags strings.
let bump = bumpalo::Bump::new();
let mut etags = Vec::<&str>::new();
let mut in_flight = VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
s3_max_in_flight_parts.get(),
);
let mut in_flight = FuturesUnordered::new();
// Part numbers start at 1 and cannot be larger than 10k
for part_number in 1u16.. {
@@ -501,8 +496,21 @@ async fn multipart_stream_to_s3(
// Wait for a buffer to be ready if there are in-flight parts that landed
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() {
let (handle, buffer) = in_flight.pop_front().expect("At least one in flight request");
let resp = join_and_map_error(handle).await?;
let (join_result, buffer): (
Result<reqwest::Result<reqwest::Response>, tokio::task::JoinError>,
bytes::Bytes,
) = in_flight.next().await.expect("At least one in flight request");
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let resp = join_result.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(_) => resp,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})
}
};
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
let mut buffer = match buffer.try_into_mut() {
@@ -520,6 +528,7 @@ async fn multipart_stream_to_s3(
while buffer.len() < (s3_multipart_part_size as usize / 2) {
// Wait for the pipe to be readable
use std::io;
reader.readable().await?;
match reader.try_read_buf(&mut buffer) {
@@ -558,11 +567,24 @@ async fn multipart_stream_to_s3(
}
})
});
in_flight.push_back((task, body));
// Wrap the task to return both the result and the buffer
let task_with_buffer = async move { (task.await, body) };
in_flight.push(task_with_buffer);
}
for (handle, _buffer) in in_flight {
let resp = join_and_map_error(handle).await?;
while let Some((join_result, _buffer)) = in_flight.next().await {
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let resp = join_result.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(_) => resp,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})
}
};
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
}
@@ -583,17 +605,15 @@ async fn multipart_stream_to_s3(
async move {
match client.post(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => {
Err(backoff::Error::Permanent(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
}))
resp.error_for_status().map_err(backoff::Error::Permanent)
}
Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(Error::S3HttpError(e))),
Err(e) => Err(backoff::Error::transient(e)),
}
}
})
.await?;
.await
.map_err(Error::S3HttpError)?;
let status = resp.status();
let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;
@@ -604,22 +624,6 @@ async fn multipart_stream_to_s3(
}
}
#[cfg(unix)]
async fn join_and_map_error(
join_handle: tokio::task::JoinHandle<Result<reqwest::Response, reqwest::Error>>,
) -> Result<reqwest::Response> {
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let request = join_handle.await.unwrap();
let resp = request.map_err(Error::S3HttpError)?;
match resp.error_for_status_ref() {
Ok(_) => Ok(resp),
Err(_) => Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
}),
}
}
#[cfg(unix)]
fn extract_and_append_etag<'b>(
bump: &'b bumpalo::Bump,

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 25, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -57,7 +57,7 @@ girafo: { number_of_documents: 0, field_distribution: {} }
[timestamp] [4,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.26.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.25.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
1 {uid: 1, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, stop reason: "created batch containing only task with id 1 of type `indexCreation` that cannot be batched with any other task.", }
2 {uid: 2, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 2 of type `indexCreation` that cannot be batched with any other task.", }
3 {uid: 3, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 3 of type `indexCreation` that cannot be batched with any other task.", }

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 25, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
----------------------------------------------------------------------
### Status:
enqueued [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 25, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 25, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:
@@ -37,7 +37,7 @@ catto [1,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.26.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.25.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 25, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
----------------------------------------------------------------------
@@ -40,7 +40,7 @@ doggo [2,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.26.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.25.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 25, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -43,7 +43,7 @@ doggo [2,3,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.26.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.25.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -48,8 +48,6 @@ pub fn upgrade_index_scheduler(
(1, 22, _) => 0,
(1, 23, _) => 0,
(1, 24, _) => 0,
(1, 25, _) => 0,
(1, 26, _) => 0,
(major, minor, patch) => {
if major > current_major
|| (major == current_major && minor > current_minor)

View File

@@ -324,7 +324,6 @@ InvalidSettingsDisplayedAttributes , InvalidRequest , BAD_REQU
InvalidSettingsDistinctAttribute , InvalidRequest , BAD_REQUEST ;
InvalidSettingsProximityPrecision , InvalidRequest , BAD_REQUEST ;
InvalidSettingsFacetSearch , InvalidRequest , BAD_REQUEST ;
InvalidSettingsexecuteAfterUpdate , InvalidRequest , BAD_REQUEST ;
InvalidSettingsPrefixSearch , InvalidRequest , BAD_REQUEST ;
InvalidSettingsFaceting , InvalidRequest , BAD_REQUEST ;
InvalidSettingsFilterableAttributes , InvalidRequest , BAD_REQUEST ;

View File

@@ -326,12 +326,6 @@ pub struct Settings<T> {
#[schema(value_type = Option<VectorStoreBackend>)]
pub vector_store: Setting<VectorStoreBackend>,
/// Function to execute after an update
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[deserr(default, error = DeserrJsonError<InvalidSettingsexecuteAfterUpdate>)]
#[schema(value_type = Option<String>, example = json!("doc.likes += 1"))]
pub execute_after_update: Setting<String>,
#[serde(skip)]
#[deserr(skip)]
pub _kind: PhantomData<T>,
@@ -401,7 +395,6 @@ impl Settings<Checked> {
prefix_search: Setting::Reset,
chat: Setting::Reset,
vector_store: Setting::Reset,
execute_after_update: Setting::Reset,
_kind: PhantomData,
}
}
@@ -430,7 +423,6 @@ impl Settings<Checked> {
prefix_search,
chat,
vector_store,
execute_after_update,
_kind,
} = self;
@@ -457,7 +449,6 @@ impl Settings<Checked> {
prefix_search,
vector_store,
chat,
execute_after_update,
_kind: PhantomData,
}
}
@@ -510,7 +501,6 @@ impl Settings<Unchecked> {
prefix_search: self.prefix_search,
chat: self.chat,
vector_store: self.vector_store,
execute_after_update: self.execute_after_update,
_kind: PhantomData,
}
}
@@ -592,10 +582,6 @@ impl Settings<Unchecked> {
prefix_search: other.prefix_search.or(self.prefix_search),
chat: other.chat.clone().or(self.chat.clone()),
vector_store: other.vector_store.or(self.vector_store),
execute_after_update: other
.execute_after_update
.clone()
.or(self.execute_after_update.clone()),
_kind: PhantomData,
}
}
@@ -636,7 +622,6 @@ pub fn apply_settings_to_builder(
prefix_search,
chat,
vector_store,
execute_after_update,
_kind,
} = settings;
@@ -860,14 +845,6 @@ pub fn apply_settings_to_builder(
Setting::Reset => builder.reset_vector_store(),
Setting::NotSet => (),
}
match execute_after_update {
Setting::Set(execute_after_update) => {
builder.set_execute_after_update(execute_after_update.clone())
}
Setting::Reset => builder.reset_execute_after_update(),
Setting::NotSet => (),
}
}
pub enum SecretPolicy {
@@ -967,13 +944,13 @@ pub fn settings(
.collect();
let vector_store = index.get_vector_store(rtxn)?;
let embedders = Setting::Set(embedders);
let search_cutoff_ms = index.search_cutoff(rtxn)?;
let localized_attributes_rules = index.localized_attributes_rules(rtxn)?;
let prefix_search = index.prefix_search(rtxn)?.map(PrefixSearchSettings::from);
let facet_search = index.facet_search(rtxn)?;
let chat = index.chat_config(rtxn).map(ChatSettings::from)?;
let execute_after_update = index.execute_after_update(rtxn)?;
let mut settings = Settings {
displayed_attributes: match displayed_attributes {
@@ -1018,10 +995,6 @@ pub fn settings(
Some(vector_store) => Setting::Set(vector_store),
None => Setting::Reset,
},
execute_after_update: match execute_after_update {
Some(function) => Setting::Set(function.to_string()),
None => Setting::NotSet,
},
_kind: PhantomData,
};
@@ -1252,7 +1225,6 @@ pub(crate) mod test {
prefix_search: Setting::NotSet,
chat: Setting::NotSet,
vector_store: Setting::NotSet,
execute_after_update: Setting::NotSet,
_kind: PhantomData::<Unchecked>,
};
@@ -1286,7 +1258,7 @@ pub(crate) mod test {
prefix_search: Setting::NotSet,
chat: Setting::NotSet,
vector_store: Setting::NotSet,
execute_after_update: Setting::NotSet,
_kind: PhantomData::<Unchecked>,
};

View File

@@ -117,7 +117,7 @@ secrecy = "0.10.3"
actix-web-lab = { version = "0.24.1", default-features = false }
urlencoding = "2.1.3"
backoff = { version = "0.4.0", features = ["tokio"] }
humantime = { version = "2.3.0", default-features = false }
[dev-dependencies]
actix-rt = "2.10.0"

View File

@@ -195,7 +195,7 @@ struct Infos {
experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize,
experimental_limit_batched_tasks_total_size: Option<u64>,
experimental_limit_batched_tasks_total_size: u64,
experimental_network: bool,
experimental_multimodal: bool,
experimental_chat_completions: bool,
@@ -359,7 +359,7 @@ impl Infos {
http_payload_size_limit,
experimental_max_number_of_batched_tasks,
experimental_limit_batched_tasks_total_size:
experimental_limit_batched_tasks_total_size.map(|size| size.as_u64()),
experimental_limit_batched_tasks_total_size.into(),
task_queue_webhook: task_webhook_url.is_some(),
task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
log_level: log_level.to_string(),

View File

@@ -230,11 +230,7 @@ pub fn setup_meilisearch(
cleanup_enabled: !opt.experimental_replication_parameters,
max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size.map_or_else(
// By default, we use half of the available memory to determine the size of batched tasks
|| opt.indexer_options.max_indexing_memory.map_or(u64::MAX, |mem| mem.as_u64() / 2),
|size| size.as_u64(),
),
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size.into(),
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
index_count: DEFAULT_INDEX_COUNT,
instance_features: opt.to_instance_features(),

View File

@@ -1,8 +1,7 @@
use lazy_static::lazy_static;
use prometheus::{
opts, register_gauge, register_gauge_vec, register_histogram_vec, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, Gauge, GaugeVec, HistogramVec, IntCounterVec,
IntGauge, IntGaugeVec,
opts, register_gauge, register_histogram_vec, register_int_counter_vec, register_int_gauge,
register_int_gauge_vec, Gauge, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
};
lazy_static! {
@@ -74,20 +73,6 @@ lazy_static! {
&["kind", "value"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE: GaugeVec = register_gauge_vec!(
opts!("meilisearch_batch_running_progress_trace", "The currently running progress trace"),
&["batch_uid", "step_name"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS: IntGaugeVec =
register_int_gauge_vec!(
opts!(
"meilisearch_last_finished_batches_progress_trace_ms",
"The last few batches progress trace in milliseconds"
),
&["batch_uid", "step_name"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_LAST_UPDATE: IntGauge =
register_int_gauge!(opts!("meilisearch_last_update", "Meilisearch Last Update"))
.expect("Can't create a metric");

View File

@@ -473,13 +473,11 @@ pub struct Opt {
#[serde(default = "default_limit_batched_tasks")]
pub experimental_max_number_of_batched_tasks: usize,
/// Experimentally controls the maximum total size, in bytes, of tasks that will be processed
/// simultaneously. When unspecified, defaults to half of the maximum indexing memory.
///
/// See: <https://github.com/orgs/meilisearch/discussions/801>
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE)]
#[serde(default)]
pub experimental_limit_batched_tasks_total_size: Option<Byte>,
/// Experimentally reduces the maximum total size, in bytes, of tasks that will be processed at once,
/// see: <https://github.com/orgs/meilisearch/discussions/801>
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())]
#[serde(default = "default_limit_batched_tasks_total_size")]
pub experimental_limit_batched_tasks_total_size: Byte,
/// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each
/// distinct embedder.
@@ -703,12 +701,10 @@ impl Opt {
MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS,
experimental_max_number_of_batched_tasks.to_string(),
);
if let Some(limit) = experimental_limit_batched_tasks_total_size {
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
limit.to_string(),
);
}
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
experimental_limit_batched_tasks_total_size.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES,
experimental_embedding_cache_entries.to_string(),
@@ -1277,6 +1273,10 @@ fn default_limit_batched_tasks() -> usize {
usize::MAX
}
fn default_limit_batched_tasks_total_size() -> Byte {
Byte::from_u64(u64::MAX)
}
fn default_embedding_cache_entries() -> usize {
0
}

View File

@@ -1,14 +1,14 @@
use std::time::Duration;
use meilisearch_types::error::{Code, ErrorCode, ResponseError};
use meilisearch_types::milli::TimeBudget;
use crate::search::{Personalize, SearchResult};
use meilisearch_types::{
error::{Code, ErrorCode, ResponseError},
milli::TimeBudget,
};
use rand::Rng;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{debug, info, warn};
use crate::search::{Personalize, SearchResult};
const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank";
const MAX_RETRIES: u32 = 10;

View File

@@ -498,17 +498,6 @@ make_setting_routes!(
camelcase_attr: "facetSearch",
analytics: FacetSearchAnalytics
},
{
route: "/execute-after-update",
update_verb: put,
value_type: String,
err_type: meilisearch_types::deserr::DeserrJsonError<
meilisearch_types::error::deserr_codes::InvalidSettingsexecuteAfterUpdate,
>,
attr: execute_after_update,
camelcase_attr: "executeAfterUpdate",
analytics: ExecuteAfterUpdateAnalytics
},
{
route: "/prefix-search",
update_verb: put,
@@ -630,9 +619,6 @@ pub async fn update_all(
new_settings.non_separator_tokens.as_ref().set(),
),
facet_search: FacetSearchAnalytics::new(new_settings.facet_search.as_ref().set()),
execute_after_update: ExecuteAfterUpdateAnalytics::new(
new_settings.execute_after_update.as_ref().set(),
),
prefix_search: PrefixSearchAnalytics::new(new_settings.prefix_search.as_ref().set()),
chat: ChatAnalytics::new(new_settings.chat.as_ref().set()),
vector_store: VectorStoreAnalytics::new(new_settings.vector_store.as_ref().set()),

View File

@@ -42,7 +42,6 @@ pub struct SettingsAnalytics {
pub prefix_search: PrefixSearchAnalytics,
pub chat: ChatAnalytics,
pub vector_store: VectorStoreAnalytics,
pub execute_after_update: ExecuteAfterUpdateAnalytics,
}
impl Aggregate for SettingsAnalytics {
@@ -198,9 +197,6 @@ impl Aggregate for SettingsAnalytics {
set: new.facet_search.set | self.facet_search.set,
value: new.facet_search.value.or(self.facet_search.value),
},
execute_after_update: ExecuteAfterUpdateAnalytics {
set: new.execute_after_update.set | self.execute_after_update.set,
},
prefix_search: PrefixSearchAnalytics {
set: new.prefix_search.set | self.prefix_search.set,
value: new.prefix_search.value.or(self.prefix_search.value),
@@ -673,21 +669,6 @@ impl FacetSearchAnalytics {
}
}
#[derive(Serialize, Default)]
pub struct ExecuteAfterUpdateAnalytics {
pub set: bool,
}
impl ExecuteAfterUpdateAnalytics {
pub fn new(distinct: Option<&String>) -> Self {
Self { set: distinct.is_some() }
}
pub fn into_settings(self) -> SettingsAnalytics {
SettingsAnalytics { execute_after_update: self, ..Default::default() }
}
}
#[derive(Serialize, Default)]
pub struct PrefixSearchAnalytics {
pub set: bool,

View File

@@ -4,7 +4,6 @@ use index_scheduler::{IndexScheduler, Query};
use meilisearch_auth::AuthController;
use meilisearch_types::error::ResponseError;
use meilisearch_types::keys::actions;
use meilisearch_types::milli::progress::ProgressStepView;
use meilisearch_types::tasks::Status;
use prometheus::{Encoder, TextEncoder};
use time::OffsetDateTime;
@@ -39,12 +38,6 @@ pub fn configure(config: &mut web::ServiceConfig) {
# HELP meilisearch_db_size_bytes Meilisearch DB Size In Bytes
# TYPE meilisearch_db_size_bytes gauge
meilisearch_db_size_bytes 1130496
# HELP meilisearch_batch_running_progress_trace The currently running progress trace
# TYPE meilisearch_batch_running_progress_trace gauge
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="document"} 0.710618582519409
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="extracting word proximity"} 0.2222222222222222
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="indexing"} 0.6666666666666666
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="processing tasks"} 0
# HELP meilisearch_http_requests_total Meilisearch HTTP requests total
# TYPE meilisearch_http_requests_total counter
meilisearch_http_requests_total{method="GET",path="/metrics",status="400"} 1
@@ -68,13 +61,6 @@ meilisearch_http_response_time_seconds_bucket{method="GET",path="/metrics",le="1
meilisearch_http_response_time_seconds_bucket{method="GET",path="/metrics",le="+Inf"} 0
meilisearch_http_response_time_seconds_sum{method="GET",path="/metrics"} 0
meilisearch_http_response_time_seconds_count{method="GET",path="/metrics"} 0
# HELP meilisearch_last_finished_batches_progress_trace_ms The last few batches progress trace in milliseconds
# TYPE meilisearch_last_finished_batches_progress_trace_ms gauge
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks"} 19360
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes"} 368
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes > preparing payloads"} 367
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes > preparing payloads > payload"} 367
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > indexing"} 18970
# HELP meilisearch_index_count Meilisearch Index Count
# TYPE meilisearch_index_count gauge
meilisearch_index_count 1
@@ -162,46 +148,6 @@ pub async fn get_metrics(
}
}
// Fetch and expose the current progressing step
crate::metrics::MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE.reset();
let (batches, _total) = index_scheduler.get_batches_from_authorized_indexes(
&Query { statuses: Some(vec![Status::Processing]), ..Query::default() },
auth_filters,
)?;
if let Some(batch) = batches.into_iter().next() {
let batch_uid = batch.uid.to_string();
if let Some(progress) = batch.progress {
for ProgressStepView { current_step, finished, total } in progress.steps {
crate::metrics::MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE
.with_label_values(&[batch_uid.as_str(), current_step.as_ref()])
// We return the completion ratio of the current step
.set(finished as f64 / total as f64);
}
}
}
crate::metrics::MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS.reset();
let (batches, _total) = index_scheduler.get_batches_from_authorized_indexes(
// Fetch the finished batches...
&Query { statuses: Some(vec![Status::Succeeded, Status::Failed]), ..Query::default() },
auth_filters,
)?;
// ...and get the last batch only.
if let Some(batch) = batches.into_iter().next() {
let batch_uid = batch.uid.to_string();
for (step_name, duration_str) in batch.stats.progress_trace {
let Some(duration_str) = duration_str.as_str() else { continue };
match humantime::parse_duration(duration_str) {
Ok(duration) => {
crate::metrics::MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS
.with_label_values(&[&batch_uid, &step_name])
.set(duration.as_millis() as i64);
}
Err(e) => tracing::error!("Failed to parse duration: {e}"),
}
}
}
if let Some(last_update) = response.last_update {
crate::metrics::MEILISEARCH_LAST_UPDATE.set(last_update.unix_timestamp());
}

View File

@@ -18,9 +18,10 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
use crate::search::SearchMetadata;
use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex};
use crate::milli::vector::Embedding;
use crate::search::SearchMetadata;
pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0;

View File

@@ -137,60 +137,6 @@ static SIMPLE_SEARCH_DOCUMENTS: Lazy<Value> = Lazy::new(|| {
}])
});
static MANY_DOCS: Lazy<Value> = Lazy::new(|| {
json!([
{
"title": "Shazam!",
"desc": "a Captain Marvel ersatz",
"id": "1",
},
{
"title": "Captain Planet",
"desc": "He's not part of the Marvel Cinematic Universe",
"id": "2",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "3",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "4",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "5",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "6",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "7",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "8",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "9",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "10",
}])
});
#[actix_rt::test]
async fn simple_search() {
let server = Server::new_shared();
@@ -503,38 +449,6 @@ async fn simple_search_hf() {
snapshot!(response["semanticHitCount"], @"3");
}
#[actix_rt::test]
async fn issue_5976_missing_docs_hf() {
let server = Server::new_shared();
let index = index_with_documents_hf(server, &MANY_DOCS).await;
let (response, code) = index
.search_post(
json!({"q": "Wonder replacement", "hybrid": {"embedder": "default", "semanticRatio": 1.0}, "retrieveVectors": true}),
)
.await;
snapshot!(code, @"200 OK");
let are_empty: Vec<_> = response["hits"]
.as_array()
.unwrap()
.iter()
.map(|hit| hit["_vectors"]["default"]["embeddings"].as_array().unwrap().is_empty())
.collect();
snapshot!(json!(are_empty), @r###"
[
false,
false,
false,
false,
false,
false,
false,
false,
false,
false
]
"###);
}
#[actix_rt::test]
async fn distribution_shift() {
let server = Server::new_shared();

View File

@@ -43,7 +43,7 @@ async fn version_too_old() {
std::fs::write(db_path.join("VERSION"), "1.11.9999").unwrap();
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.26.0");
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.25.0");
}
#[actix_rt::test]
@@ -58,7 +58,7 @@ async fn version_requires_downgrade() {
std::fs::write(db_path.join("VERSION"), format!("{major}.{minor}.{patch}")).unwrap();
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
snapshot!(err, @"Database version 1.26.1 is higher than the Meilisearch version 1.26.0. Downgrade is not supported");
snapshot!(err, @"Database version 1.25.1 is higher than the Meilisearch version 1.25.0. Downgrade is not supported");
}
#[actix_rt::test]

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.26.0"
"upgradeTo": "v1.25.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.26.0"
"upgradeTo": "v1.25.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.26.0"
"upgradeTo": "v1.25.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.26.0"
"upgradeTo": "v1.25.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.26.0"
"upgradeTo": "v1.25.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.26.0"
"upgradeTo": "v1.25.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.26.0"
"upgradeTo": "v1.25.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.26.0"
"upgradeTo": "v1.25.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -101,6 +101,7 @@ bumpalo = "3.18.1"
bumparaw-collections = "0.1.4"
steppe = { version = "0.4", default-features = false }
thread_local = "1.1.9"
allocator-api2 = "0.3.0"
rustc-hash = "2.1.1"
enum-iterator = "2.1.0"
bbqueue = { git = "https://github.com/meilisearch/bbqueue" }

View File

@@ -84,7 +84,6 @@ pub mod main_key {
pub const SEARCH_CUTOFF: &str = "search_cutoff";
pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules";
pub const FACET_SEARCH: &str = "facet_search";
pub const EXECUTE_AFTER_UPDATE: &str = "execute-after-update";
pub const PREFIX_SEARCH: &str = "prefix_search";
pub const DOCUMENTS_STATS: &str = "documents_stats";
pub const DISABLED_TYPOS_TERMS: &str = "disabled_typos_terms";
@@ -1766,22 +1765,6 @@ impl Index {
self.main.remap_key_type::<Str>().delete(txn, main_key::CHAT)
}
pub fn execute_after_update<'t>(&self, txn: &'t RoTxn<'_>) -> heed::Result<Option<&'t str>> {
self.main.remap_types::<Str, Str>().get(txn, main_key::EXECUTE_AFTER_UPDATE)
}
pub(crate) fn put_execute_after_update(
&self,
txn: &mut RwTxn<'_>,
val: &str,
) -> heed::Result<()> {
self.main.remap_types::<Str, Str>().put(txn, main_key::EXECUTE_AFTER_UPDATE, &val)
}
pub(crate) fn delete_execute_after_update(&self, txn: &mut RwTxn<'_>) -> heed::Result<bool> {
self.main.remap_key_type::<Str>().delete(txn, main_key::EXECUTE_AFTER_UPDATE)
}
pub fn localized_attributes_rules(
&self,
rtxn: &RoTxn<'_>,

View File

@@ -470,71 +470,6 @@ impl<'doc> Versions<'doc> {
Ok(Some(Self::single(data)))
}
pub fn multiple_with_edits(
doc: Option<rhai::Map>,
mut versions: impl Iterator<Item = Result<RawMap<'doc, FxBuildHasher>>>,
engine: &rhai::Engine,
edit_function: &rhai::AST,
doc_alloc: &'doc bumpalo::Bump,
) -> Result<Option<Option<Self>>> {
let Some(data) = versions.next() else { return Ok(None) };
let mut doc = doc.unwrap_or_default();
let mut data = data?;
for version in versions {
let version = version?;
for (field, value) in version {
data.insert(field, value);
}
let mut scope = rhai::Scope::new();
data.iter().for_each(|(k, v)| {
doc.insert(k.into(), serde_json::from_str(v.get()).unwrap());
});
scope.push("doc", doc.clone());
let _ = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, edit_function).unwrap();
data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc);
match scope.get_value::<rhai::Map>("doc") {
Some(map) => {
for (key, value) in map {
let mut vec = bumpalo::collections::Vec::new_in(doc_alloc);
serde_json::to_writer(&mut vec, &value).unwrap();
let key = doc_alloc.alloc_str(key.as_str());
let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap();
data.insert(key, raw_value);
}
}
// In case the deletes the document and it's not the last change
// we simply set the document to an empty one and await the next change.
None => (),
}
}
// We must also run the code after the last change
let mut scope = rhai::Scope::new();
data.iter().for_each(|(k, v)| {
doc.insert(k.into(), serde_json::from_str(v.get()).unwrap());
});
scope.push("doc", doc);
let _ = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, edit_function).unwrap();
data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc);
match scope.get_value::<rhai::Map>("doc") {
Some(map) => {
for (key, value) in map {
let mut vec = bumpalo::collections::Vec::new_in(doc_alloc);
serde_json::to_writer(&mut vec, &value).unwrap();
let key = doc_alloc.alloc_str(key.as_str());
let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap();
data.insert(key, raw_value);
}
Ok(Some(Some(Self::single(data))))
}
None => Ok(Some(None)),
}
}
pub fn single(version: RawMap<'doc, FxBuildHasher>) -> Self {
Self { data: version }
}

View File

@@ -18,7 +18,6 @@ use crate::documents::PrimaryKey;
use crate::progress::{AtomicPayloadStep, Progress};
use crate::update::new::document::{DocumentContext, Versions};
use crate::update::new::indexer::enterprise_edition::sharding::Shards;
use crate::update::new::indexer::update_by_function::obkv_to_rhaimap;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{DocumentIdentifiers, Insertion, Update};
@@ -163,16 +162,7 @@ impl<'pl> DocumentOperation<'pl> {
.sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0));
let docids_version_offsets = docids_version_offsets.into_bump_slice();
let engine = rhai::Engine::new();
// Make sure to correctly setup the engine and remove all settings
let ast = index.execute_after_update(rtxn)?.map(|f| engine.compile(f).unwrap());
let fidmap = index.fields_ids_map(rtxn)?;
Ok((
DocumentOperationChanges { docids_version_offsets, engine, ast, fidmap },
operations_stats,
primary_key,
))
Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key))
}
}
@@ -445,15 +435,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
'pl: 'doc,
{
let (external_doc, payload_operations) = item;
payload_operations.merge(
&context.rtxn,
context.index,
&self.fidmap,
&self.engine,
self.ast.as_ref(),
external_doc,
&context.doc_alloc,
)
payload_operations.merge(external_doc, &context.doc_alloc)
}
fn len(&self) -> usize {
@@ -462,9 +444,6 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
}
pub struct DocumentOperationChanges<'pl> {
engine: rhai::Engine,
ast: Option<rhai::AST>,
fidmap: FieldsIdsMap,
docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)],
}
@@ -527,14 +506,10 @@ impl<'pl> PayloadOperations<'pl> {
}
/// Returns only the most recent version of a document based on the updates from the payloads.
#[allow(clippy::too_many_arguments)]
///
/// This function is only meant to be used when doing a replacement and not an update.
fn merge<'doc>(
&self,
rtxn: &heed::RoTxn,
index: &Index,
fidmap: &FieldsIdsMap,
engine: &rhai::Engine,
ast: Option<&rhai::AST>,
external_doc: &'doc str,
doc_alloc: &'doc Bump,
) -> Result<Option<DocumentChange<'doc>>>
@@ -598,33 +573,9 @@ impl<'pl> PayloadOperations<'pl> {
Ok(document)
});
let versions = match ast {
Some(ast) => {
let doc = index
.documents
.get(rtxn, &self.docid)?
.map(|obkv| obkv_to_rhaimap(obkv, fidmap))
.transpose()?;
match Versions::multiple_with_edits(doc, versions, engine, ast, doc_alloc)?
{
Some(Some(versions)) => Some(versions),
Some(None) if self.is_new => return Ok(None),
Some(None) => {
return Ok(Some(DocumentChange::Deletion(
DocumentIdentifiers::create(self.docid, external_doc),
)));
}
None => None,
}
}
None => Versions::multiple(versions)?,
};
let Some(versions) = Versions::multiple(versions)? else { return Ok(None) };
let Some(versions) = versions else {
return Ok(None);
};
if self.is_new || ast.is_some() {
if self.is_new {
Ok(Some(DocumentChange::Insertion(Insertion::create(
self.docid,
external_doc,

View File

@@ -187,7 +187,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
}
}
pub fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {
fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
let map: Result<rhai::Map> = all_keys
.iter()

View File

@@ -2,7 +2,6 @@ use std::cell::RefCell;
use std::collections::BTreeSet;
use std::io::{BufReader, BufWriter, Read, Seek, Write};
use std::iter;
use std::num::NonZeroUsize;
use hashbrown::HashMap;
use heed::types::{Bytes, DecodeIgnore, Str};
@@ -346,7 +345,7 @@ impl<'i> WordPrefixIntegerDocids<'i> {
indexes.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: NonZeroUsize::new(buffer.len()),
serialized_length: Some(buffer.len()),
});
file.write_all(&buffer)?;
}
@@ -372,7 +371,7 @@ impl<'i> WordPrefixIntegerDocids<'i> {
key_buffer.extend_from_slice(&pos.to_be_bytes());
match serialized_length {
Some(serialized_length) => {
buffer.resize(serialized_length.get(), 0);
buffer.resize(serialized_length, 0);
file.read_exact(&mut buffer)?;
self.prefix_database.remap_data_type::<Bytes>().put(
wtxn,
@@ -427,7 +426,7 @@ impl<'i> WordPrefixIntegerDocids<'i> {
index.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: NonZeroUsize::new(buffer.len()),
serialized_length: Some(buffer.len()),
});
file.write_all(buffer)?;
}
@@ -453,7 +452,7 @@ impl<'i> WordPrefixIntegerDocids<'i> {
key_buffer.extend_from_slice(&pos.to_be_bytes());
match serialized_length {
Some(serialized_length) => {
buffer.resize(serialized_length.get(), 0);
buffer.resize(serialized_length, 0);
file.read_exact(&mut buffer)?;
self.prefix_database.remap_data_type::<Bytes>().put(
wtxn,
@@ -476,7 +475,7 @@ impl<'i> WordPrefixIntegerDocids<'i> {
struct PrefixIntegerEntry<'a> {
prefix: &'a str,
pos: u16,
serialized_length: Option<NonZeroUsize>,
serialized_length: Option<usize>,
}
/// TODO doc

View File

@@ -202,7 +202,6 @@ pub struct Settings<'a, 't, 'i> {
facet_search: Setting<bool>,
chat: Setting<ChatSettings>,
vector_store: Setting<VectorStoreBackend>,
execute_after_update: Setting<String>,
}
impl<'a, 't, 'i> Settings<'a, 't, 'i> {
@@ -243,7 +242,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
facet_search: Setting::NotSet,
chat: Setting::NotSet,
vector_store: Setting::NotSet,
execute_after_update: Setting::NotSet,
indexer_config,
}
}
@@ -490,14 +488,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
self.vector_store = Setting::Reset;
}
pub fn set_execute_after_update(&mut self, value: String) {
self.execute_after_update = Setting::Set(value);
}
pub fn reset_execute_after_update(&mut self) {
self.execute_after_update = Setting::Reset;
}
#[tracing::instrument(
level = "trace"
skip(self, progress_callback, should_abort, settings_diff, embedder_stats),
@@ -1069,18 +1059,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
Ok(changed)
}
fn update_execute_after_update(&mut self) -> Result<()> {
match self.execute_after_update.as_ref() {
Setting::Set(new) => {
self.index.put_execute_after_update(self.wtxn, &new).map_err(Into::into)
}
Setting::Reset => {
self.index.delete_execute_after_update(self.wtxn).map(drop).map_err(Into::into)
}
Setting::NotSet => Ok(()),
}
}
fn update_embedding_configs(&mut self) -> Result<BTreeMap<String, EmbedderAction>> {
match std::mem::take(&mut self.embedder_settings) {
Setting::Set(configs) => self.update_embedding_configs_set(configs),
@@ -1491,7 +1469,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
self.update_proximity_precision()?;
self.update_prefix_search()?;
self.update_facet_search()?;
self.update_execute_after_update()?;
self.update_localized_attributes_rules()?;
self.update_disabled_typos_terms()?;
self.update_chat_config()?;
@@ -1641,7 +1618,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
disable_on_numbers: Setting::NotSet,
chat: Setting::NotSet,
vector_store: Setting::NotSet,
execute_after_update: Setting::NotSet,
wtxn: _,
index: _,
indexer_config: _,

View File

@@ -899,7 +899,6 @@ fn test_correct_settings_init() {
disable_on_numbers,
chat,
vector_store,
execute_after_update,
} = settings;
assert!(matches!(searchable_fields, Setting::NotSet));
assert!(matches!(displayed_fields, Setting::NotSet));
@@ -930,7 +929,6 @@ fn test_correct_settings_init() {
assert!(matches!(disable_on_numbers, Setting::NotSet));
assert!(matches!(chat, Setting::NotSet));
assert!(matches!(vector_store, Setting::NotSet));
assert!(matches!(execute_after_update, Setting::NotSet));
})
.unwrap();
}

View File

@@ -43,7 +43,6 @@ const UPGRADE_FUNCTIONS: &[&dyn UpgradeIndex] = &[
&ToTargetNoOp { target: (1, 23, 0) },
&ToTargetNoOp { target: (1, 24, 0) },
&ToTargetNoOp { target: (1, 25, 0) },
&ToTargetNoOp { target: (1, 26, 0) },
// This is the last upgrade function, it will be called when the index is up to date.
// any other upgrade function should be added before this one.
&ToCurrentNoOp {},
@@ -80,7 +79,6 @@ const fn start(from: (u32, u32, u32)) -> Option<usize> {
(1, 23, _) => function_index!(13),
(1, 24, _) => function_index!(14),
(1, 25, _) => function_index!(15),
(1, 26, _) => function_index!(16),
// We deliberately don't add a placeholder with (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) here to force manually
// considering dumpless upgrade.
(_major, _minor, _patch) => return None,

View File

@@ -112,12 +112,13 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
rendered: I,
unused_vectors_distribution: &C::ErrorMetadata,
) -> Result<()> {
if self.inputs.len() >= self.inputs.capacity() {
self.embed_chunks(unused_vectors_distribution)?;
if self.inputs.len() < self.inputs.capacity() {
self.inputs.push(rendered);
self.metadata.push(metadata);
return Ok(());
}
self.inputs.push(rendered);
self.metadata.push(metadata);
Ok(())
self.embed_chunks(unused_vectors_distribution)
}
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<C> {