Merge pull request #5509 from meilisearch/release-v1.14.0-tmp

Bring back changes from v1.14.0 to main
This commit is contained in:
Louis Dureuil
2025-04-14 13:59:23 +00:00
committed by GitHub
43 changed files with 1047 additions and 508 deletions

134
Cargo.lock generated
View File

@@ -258,7 +258,7 @@ version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
"once_cell", "once_cell",
"version_check", "version_check",
] ]
@@ -271,7 +271,7 @@ checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"const-random", "const-random",
"getrandom", "getrandom 0.2.15",
"once_cell", "once_cell",
"version_check", "version_check",
"zerocopy", "zerocopy",
@@ -790,22 +790,20 @@ dependencies = [
[[package]] [[package]]
name = "bzip2" name = "bzip2"
version = "0.4.4" version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" checksum = "49ecfb22d906f800d4fe833b6282cf4dc1c298f5057ca0b5445e5c209735ca47"
dependencies = [ dependencies = [
"bzip2-sys", "bzip2-sys",
"libc",
] ]
[[package]] [[package]]
name = "bzip2-sys" name = "bzip2-sys"
version = "0.1.11+1.0.8" version = "0.1.13+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14"
dependencies = [ dependencies = [
"cc", "cc",
"libc",
"pkg-config", "pkg-config",
] ]
@@ -1143,7 +1141,7 @@ version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
"once_cell", "once_cell",
"tiny-keccak", "tiny-keccak",
] ]
@@ -2216,10 +2214,24 @@ dependencies = [
"cfg-if", "cfg-if",
"js-sys", "js-sys",
"libc", "libc",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "getrandom"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi 0.13.3+wasi-0.2.2",
"wasm-bindgen",
"windows-targets 0.52.6",
]
[[package]] [[package]]
name = "gimli" name = "gimli"
version = "0.27.3" version = "0.27.3"
@@ -2733,6 +2745,7 @@ dependencies = [
"bincode", "bincode",
"bumpalo", "bumpalo",
"bumparaw-collections", "bumparaw-collections",
"byte-unit",
"convert_case 0.6.0", "convert_case 0.6.0",
"crossbeam-channel", "crossbeam-channel",
"csv", "csv",
@@ -2741,6 +2754,7 @@ dependencies = [
"enum-iterator", "enum-iterator",
"file-store", "file-store",
"flate2", "flate2",
"indexmap",
"insta", "insta",
"maplit", "maplit",
"meili-snap", "meili-snap",
@@ -2923,10 +2937,11 @@ dependencies = [
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.69" version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f"
dependencies = [ dependencies = [
"once_cell",
"wasm-bindgen", "wasm-bindgen",
] ]
@@ -3518,6 +3533,17 @@ dependencies = [
"crc", "crc",
] ]
[[package]]
name = "lzma-sys"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]] [[package]]
name = "macro_rules_attribute" name = "macro_rules_attribute"
version = "0.2.0" version = "0.2.0"
@@ -3656,7 +3682,7 @@ dependencies = [
"uuid", "uuid",
"wiremock", "wiremock",
"yaup", "yaup",
"zip 2.2.2", "zip 2.3.0",
] ]
[[package]] [[package]]
@@ -3882,7 +3908,7 @@ checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
@@ -3893,7 +3919,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [ dependencies = [
"libc", "libc",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
@@ -4670,7 +4696,7 @@ version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
] ]
[[package]] [[package]]
@@ -4762,7 +4788,7 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
"redox_syscall 0.2.16", "redox_syscall 0.2.16",
"thiserror 1.0.69", "thiserror 1.0.69",
] ]
@@ -4886,13 +4912,13 @@ dependencies = [
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.17.13" version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ac5d832aa16abd7d1def883a8545280c20a60f523a370aa3a9617c2b8550ee" checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [ dependencies = [
"cc", "cc",
"cfg-if", "cfg-if",
"getrandom", "getrandom 0.2.15",
"libc", "libc",
"untrusted", "untrusted",
"windows-sys 0.52.0", "windows-sys 0.52.0",
@@ -5576,7 +5602,7 @@ checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"fastrand", "fastrand",
"getrandom", "getrandom 0.2.15",
"once_cell", "once_cell",
"rustix", "rustix",
"windows-sys 0.52.0", "windows-sys 0.52.0",
@@ -5751,7 +5777,7 @@ dependencies = [
"aho-corasick", "aho-corasick",
"derive_builder 0.12.0", "derive_builder 0.12.0",
"esaxx-rs", "esaxx-rs",
"getrandom", "getrandom 0.2.15",
"itertools 0.12.1", "itertools 0.12.1",
"lazy_static", "lazy_static",
"log", "log",
@@ -6238,7 +6264,7 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
"serde", "serde",
] ]
@@ -6335,24 +6361,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]] [[package]]
name = "wasm-bindgen" name = "wasi"
version = "0.2.92" version = "0.13.3+wasi-0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2"
dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro", "wasm-bindgen-macro",
] ]
[[package]] [[package]]
name = "wasm-bindgen-backend" name = "wasm-bindgen-backend"
version = "0.2.92" version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6"
dependencies = [ dependencies = [
"bumpalo", "bumpalo",
"log", "log",
"once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.87", "syn 2.0.87",
@@ -6373,9 +6409,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-macro" name = "wasm-bindgen-macro"
version = "0.2.92" version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407"
dependencies = [ dependencies = [
"quote", "quote",
"wasm-bindgen-macro-support", "wasm-bindgen-macro-support",
@@ -6383,9 +6419,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-macro-support" name = "wasm-bindgen-macro-support"
version = "0.2.92" version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -6396,9 +6432,12 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-shared" name = "wasm-bindgen-shared"
version = "0.2.92" version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
dependencies = [
"unicode-ident",
]
[[package]] [[package]]
name = "wasm-streams" name = "wasm-streams"
@@ -6803,6 +6842,15 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "wit-bindgen-rt"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
dependencies = [
"bitflags 2.9.0",
]
[[package]] [[package]]
name = "write16" name = "write16"
version = "1.0.0" version = "1.0.0"
@@ -6858,6 +6906,15 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "xz2"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2"
dependencies = [
"lzma-sys",
]
[[package]] [[package]]
name = "yada" name = "yada"
version = "0.5.1" version = "0.5.1"
@@ -6999,9 +7056,9 @@ dependencies = [
[[package]] [[package]]
name = "zip" name = "zip"
version = "2.2.2" version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae9c1ea7b3a5e1f4b922ff856a129881167511563dc219869afe3787fc0c1a45" checksum = "84e9a772a54b54236b9b744aaaf8d7be01b4d6e99725523cb82cb32d1c81b1d7"
dependencies = [ dependencies = [
"aes", "aes",
"arbitrary", "arbitrary",
@@ -7012,15 +7069,16 @@ dependencies = [
"deflate64", "deflate64",
"displaydoc", "displaydoc",
"flate2", "flate2",
"getrandom 0.3.1",
"hmac", "hmac",
"indexmap", "indexmap",
"lzma-rs", "lzma-rs",
"memchr", "memchr",
"pbkdf2", "pbkdf2",
"rand",
"sha1", "sha1",
"thiserror 2.0.9", "thiserror 2.0.9",
"time", "time",
"xz2",
"zeroize", "zeroize",
"zopfli", "zopfli",
"zstd", "zstd",

View File

@@ -326,6 +326,7 @@ pub(crate) mod test {
index_uids: maplit::btreemap! { "doggo".to_string() => 1 }, index_uids: maplit::btreemap! { "doggo".to_string() => 1 },
progress_trace: Default::default(), progress_trace: Default::default(),
write_channel_congestion: None, write_channel_congestion: None,
internal_database_sizes: Default::default(),
}, },
enqueued_at: Some(BatchEnqueuedAt { enqueued_at: Some(BatchEnqueuedAt {
earliest: datetime!(2022-11-11 0:00 UTC), earliest: datetime!(2022-11-11 0:00 UTC),

View File

@@ -13,6 +13,7 @@ license.workspace = true
[dependencies] [dependencies]
anyhow = "1.0.95" anyhow = "1.0.95"
bincode = "1.3.3" bincode = "1.3.3"
byte-unit = "5.1.6"
bumpalo = "3.16.0" bumpalo = "3.16.0"
bumparaw-collections = "0.1.4" bumparaw-collections = "0.1.4"
convert_case = "0.6.0" convert_case = "0.6.0"
@@ -22,6 +23,7 @@ dump = { path = "../dump" }
enum-iterator = "2.1.0" enum-iterator = "2.1.0"
file-store = { path = "../file-store" } file-store = { path = "../file-store" }
flate2 = "1.0.35" flate2 = "1.0.35"
indexmap = "2.7.0"
meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" } meilisearch-types = { path = "../meilisearch-types" }
memmap2 = "0.9.5" memmap2 = "0.9.5"

View File

@@ -344,6 +344,7 @@ pub fn snapshot_batch(batch: &Batch) -> String {
let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch; let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch;
let stats = BatchStats { let stats = BatchStats {
progress_trace: Default::default(), progress_trace: Default::default(),
internal_database_sizes: Default::default(),
write_channel_congestion: None, write_channel_congestion: None,
..stats.clone() ..stats.clone()
}; };

View File

@@ -625,8 +625,8 @@ impl IndexScheduler {
task_id: Option<TaskId>, task_id: Option<TaskId>,
dry_run: bool, dry_run: bool,
) -> Result<Task> { ) -> Result<Task> {
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task // if the task doesn't delete or cancel anything and 40% of the task queue is full, we must refuse to enqueue the incoming task
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } | KindWithContent::TaskCancelation { tasks, .. } if !tasks.is_empty())
&& (self.env.non_free_pages_size()? * 100) / self.env.info().map_size as u64 > 40 && (self.env.non_free_pages_size()? * 100) / self.env.info().map_size as u64 > 40
{ {
return Err(Error::NoSpaceLeftInTaskQueue); return Err(Error::NoSpaceLeftInTaskQueue);

View File

@@ -64,6 +64,13 @@ make_enum_progress! {
} }
} }
make_enum_progress! {
pub enum FinalizingIndexStep {
Committing,
ComputingStats,
}
}
make_enum_progress! { make_enum_progress! {
pub enum TaskCancelationProgress { pub enum TaskCancelationProgress {
RetrievingTasks, RetrievingTasks,

View File

@@ -292,8 +292,6 @@ impl Queue {
return Ok(task); return Ok(task);
} }
// Get rid of the mutability.
let task = task;
self.tasks.register(wtxn, &task)?; self.tasks.register(wtxn, &task)?;
Ok(task) Ok(task)

View File

@@ -364,7 +364,7 @@ fn test_task_queue_is_full() {
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
// Even the task deletion that doesn't delete anything shouldn't be accepted // Even the task deletion and cancelation that don't delete anything should be refused
let result = index_scheduler let result = index_scheduler
.register( .register(
KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() }, KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() },
@@ -373,10 +373,39 @@ fn test_task_queue_is_full() {
) )
.unwrap_err(); .unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
let result = index_scheduler
.register(
KindWithContent::TaskCancelation { query: S("test"), tasks: RoaringBitmap::new() },
None,
false,
)
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
// But a task deletion that delete something should works // But a task cancelation that cancel something should work
index_scheduler
.register(
KindWithContent::TaskCancelation { query: S("test"), tasks: (0..100).collect() },
None,
false,
)
.unwrap();
handle.advance_one_successful_batch();
// But we should still be forbidden from enqueuing new tasks
let result = index_scheduler
.register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None,
false,
)
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
// And a task deletion that delete something should works
index_scheduler index_scheduler
.register( .register(
KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() }, KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() },

View File

@@ -20,10 +20,12 @@ use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc; use std::sync::Arc;
use convert_case::{Case, Casing as _};
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::heed::{Env, WithoutTls};
use meilisearch_types::milli; use meilisearch_types::milli;
use meilisearch_types::tasks::Status; use meilisearch_types::tasks::Status;
use process_batch::ProcessBatchInfo;
use rayon::current_num_threads; use rayon::current_num_threads;
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
@@ -223,16 +225,16 @@ impl IndexScheduler {
let mut stop_scheduler_forever = false; let mut stop_scheduler_forever = false;
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let mut canceled = RoaringBitmap::new(); let mut canceled = RoaringBitmap::new();
let mut congestion = None; let mut process_batch_info = ProcessBatchInfo::default();
match res { match res {
Ok((tasks, cong)) => { Ok((tasks, info)) => {
#[cfg(test)] #[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded); self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32); let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32);
progress.update_progress(task_progress_obj); progress.update_progress(task_progress_obj);
congestion = cong; process_batch_info = info;
let mut success = 0; let mut success = 0;
let mut failure = 0; let mut failure = 0;
let mut canceled_by = None; let mut canceled_by = None;
@@ -350,6 +352,9 @@ impl IndexScheduler {
// We must re-add the canceled task so they're part of the same batch. // We must re-add the canceled task so they're part of the same batch.
ids |= canceled; ids |= canceled;
let ProcessBatchInfo { congestion, pre_commit_dabases_sizes, post_commit_dabases_sizes } =
process_batch_info;
processing_batch.stats.progress_trace = processing_batch.stats.progress_trace =
progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect(); progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect();
processing_batch.stats.write_channel_congestion = congestion.map(|congestion| { processing_batch.stats.write_channel_congestion = congestion.map(|congestion| {
@@ -359,6 +364,33 @@ impl IndexScheduler {
congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into()); congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into());
congestion_info congestion_info
}); });
processing_batch.stats.internal_database_sizes = pre_commit_dabases_sizes
.iter()
.flat_map(|(dbname, pre_size)| {
post_commit_dabases_sizes
.get(dbname)
.map(|post_size| {
use byte_unit::{Byte, UnitType::Binary};
use std::cmp::Ordering::{Equal, Greater, Less};
let post = Byte::from_u64(*post_size as u64).get_appropriate_unit(Binary);
let diff_size = post_size.abs_diff(*pre_size) as u64;
let diff = Byte::from_u64(diff_size).get_appropriate_unit(Binary);
let sign = match post_size.cmp(pre_size) {
Equal => return None,
Greater => "+",
Less => "-",
};
Some((
dbname.to_case(Case::Camel),
format!("{post:#.2} ({sign}{diff:#.2})").into(),
))
})
.into_iter()
.flatten()
})
.collect();
if let Some(congestion) = congestion { if let Some(congestion) = congestion {
tracing::debug!( tracing::debug!(

View File

@@ -12,7 +12,7 @@ use roaring::RoaringBitmap;
use super::create_batch::Batch; use super::create_batch::Batch;
use crate::processing::{ use crate::processing::{
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep,
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress, UpdateIndexProgress,
}; };
@@ -22,6 +22,16 @@ use crate::utils::{
}; };
use crate::{Error, IndexScheduler, Result, TaskId}; use crate::{Error, IndexScheduler, Result, TaskId};
#[derive(Debug, Default)]
pub struct ProcessBatchInfo {
/// The write channel congestion. None when unavailable: settings update.
pub congestion: Option<ChannelCongestion>,
/// The sizes of the different databases before starting the indexation.
pub pre_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>,
/// The sizes of the different databases after commiting the indexation.
pub post_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>,
}
impl IndexScheduler { impl IndexScheduler {
/// Apply the operation associated with the given batch. /// Apply the operation associated with the given batch.
/// ///
@@ -35,7 +45,7 @@ impl IndexScheduler {
batch: Batch, batch: Batch,
current_batch: &mut ProcessingBatch, current_batch: &mut ProcessingBatch,
progress: Progress, progress: Progress,
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> { ) -> Result<(Vec<Task>, ProcessBatchInfo)> {
#[cfg(test)] #[cfg(test)]
{ {
self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?; self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?;
@@ -76,7 +86,7 @@ impl IndexScheduler {
canceled_tasks.push(task); canceled_tasks.push(task);
Ok((canceled_tasks, None)) Ok((canceled_tasks, ProcessBatchInfo::default()))
} }
Batch::TaskDeletions(mut tasks) => { Batch::TaskDeletions(mut tasks) => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.
@@ -115,14 +125,14 @@ impl IndexScheduler {
_ => unreachable!(), _ => unreachable!(),
} }
} }
Ok((tasks, None)) Ok((tasks, ProcessBatchInfo::default()))
}
Batch::SnapshotCreation(tasks) => {
self.process_snapshot(progress, tasks).map(|tasks| (tasks, None))
}
Batch::Dump(task) => {
self.process_dump_creation(progress, task).map(|tasks| (tasks, None))
} }
Batch::SnapshotCreation(tasks) => self
.process_snapshot(progress, tasks)
.map(|tasks| (tasks, ProcessBatchInfo::default())),
Batch::Dump(task) => self
.process_dump_creation(progress, task)
.map(|tasks| (tasks, ProcessBatchInfo::default())),
Batch::IndexOperation { op, must_create_index } => { Batch::IndexOperation { op, must_create_index } => {
let index_uid = op.index_uid().to_string(); let index_uid = op.index_uid().to_string();
let index = if must_create_index { let index = if must_create_index {
@@ -139,10 +149,12 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.clone(), index.clone()))); .set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let mut index_wtxn = index.write_txn()?; let mut index_wtxn = index.write_txn()?;
let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?;
let (tasks, congestion) = let (tasks, congestion) =
self.apply_index_operation(&mut index_wtxn, &index, op, progress)?; self.apply_index_operation(&mut index_wtxn, &index, op, &progress)?;
{ {
progress.update_progress(FinalizingIndexStep::Committing);
let span = tracing::trace_span!(target: "indexing::scheduler", "commit"); let span = tracing::trace_span!(target: "indexing::scheduler", "commit");
let _entered = span.enter(); let _entered = span.enter();
@@ -153,12 +165,15 @@ impl IndexScheduler {
// stats of the index. Since the tasks have already been processed and // stats of the index. Since the tasks have already been processed and
// this is a non-critical operation. If it fails, we should not fail // this is a non-critical operation. If it fails, we should not fail
// the entire batch. // the entire batch.
let mut post_commit_dabases_sizes = None;
let res = || -> Result<()> { let res = || -> Result<()> {
progress.update_progress(FinalizingIndexStep::ComputingStats);
let index_rtxn = index.read_txn()?; let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn) let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?; self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
post_commit_dabases_sizes = Some(index.database_sizes(&index_rtxn)?);
wtxn.commit()?; wtxn.commit()?;
Ok(()) Ok(())
}(); }();
@@ -171,7 +186,16 @@ impl IndexScheduler {
), ),
} }
Ok((tasks, congestion)) let info = ProcessBatchInfo {
congestion,
// In case we fail to the get post-commit sizes we decide
// that nothing changed and use the pre-commit sizes.
post_commit_dabases_sizes: post_commit_dabases_sizes
.unwrap_or_else(|| pre_commit_dabases_sizes.clone()),
pre_commit_dabases_sizes,
};
Ok((tasks, info))
} }
Batch::IndexCreation { index_uid, primary_key, task } => { Batch::IndexCreation { index_uid, primary_key, task } => {
progress.update_progress(CreateIndexProgress::CreatingTheIndex); progress.update_progress(CreateIndexProgress::CreatingTheIndex);
@@ -239,7 +263,7 @@ impl IndexScheduler {
), ),
} }
Ok((vec![task], None)) Ok((vec![task], ProcessBatchInfo::default()))
} }
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => { Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
progress.update_progress(DeleteIndexProgress::DeletingTheIndex); progress.update_progress(DeleteIndexProgress::DeletingTheIndex);
@@ -273,7 +297,9 @@ impl IndexScheduler {
}; };
} }
Ok((tasks, None)) // Here we could also show that all the internal database sizes goes to 0
// but it would mean opening the index and that's costly.
Ok((tasks, ProcessBatchInfo::default()))
} }
Batch::IndexSwap { mut task } => { Batch::IndexSwap { mut task } => {
progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap); progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap);
@@ -321,7 +347,7 @@ impl IndexScheduler {
} }
wtxn.commit()?; wtxn.commit()?;
task.status = Status::Succeeded; task.status = Status::Succeeded;
Ok((vec![task], None)) Ok((vec![task], ProcessBatchInfo::default()))
} }
Batch::UpgradeDatabase { mut tasks } => { Batch::UpgradeDatabase { mut tasks } => {
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
@@ -351,7 +377,7 @@ impl IndexScheduler {
task.error = None; task.error = None;
} }
Ok((tasks, None)) Ok((tasks, ProcessBatchInfo::default()))
} }
} }
} }

View File

@@ -32,7 +32,7 @@ impl IndexScheduler {
index_wtxn: &mut RwTxn<'i>, index_wtxn: &mut RwTxn<'i>,
index: &'i Index, index: &'i Index,
operation: IndexOperation, operation: IndexOperation,
progress: Progress, progress: &Progress,
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> { ) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let started_processing_at = std::time::Instant::now(); let started_processing_at = std::time::Instant::now();
@@ -186,7 +186,7 @@ impl IndexScheduler {
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&progress, progress,
) )
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?, .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?,
); );
@@ -307,7 +307,7 @@ impl IndexScheduler {
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&progress, progress,
) )
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
); );
@@ -465,7 +465,7 @@ impl IndexScheduler {
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&progress, progress,
) )
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
); );
@@ -520,7 +520,7 @@ impl IndexScheduler {
index_uid: index_uid.clone(), index_uid: index_uid.clone(),
tasks: cleared_tasks, tasks: cleared_tasks,
}, },
progress.clone(), progress,
)?; )?;
let (settings_tasks, _congestion) = self.apply_index_operation( let (settings_tasks, _congestion) = self.apply_index_operation(

View File

@@ -64,4 +64,6 @@ pub struct BatchStats {
pub progress_trace: serde_json::Map<String, serde_json::Value>, pub progress_trace: serde_json::Map<String, serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub write_channel_congestion: Option<serde_json::Map<String, serde_json::Value>>, pub write_channel_congestion: Option<serde_json::Map<String, serde_json::Value>>,
#[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
pub internal_database_sizes: serde_json::Map<String, serde_json::Value>,
} }

View File

@@ -454,7 +454,10 @@ impl ErrorCode for milli::Error {
} }
UserError::CriterionError(_) => Code::InvalidSettingsRankingRules, UserError::CriterionError(_) => Code::InvalidSettingsRankingRules,
UserError::InvalidGeoField { .. } => Code::InvalidDocumentGeoField, UserError::InvalidGeoField { .. } => Code::InvalidDocumentGeoField,
UserError::InvalidVectorDimensions { .. } => Code::InvalidVectorDimensions, UserError::InvalidVectorDimensions { .. }
| UserError::InvalidIndexingVectorDimensions { .. } => {
Code::InvalidVectorDimensions
}
UserError::InvalidVectorsMapType { .. } UserError::InvalidVectorsMapType { .. }
| UserError::InvalidVectorsEmbedderConf { .. } => Code::InvalidVectorsType, | UserError::InvalidVectorsEmbedderConf { .. } => Code::InvalidVectorsType,
UserError::TooManyVectors(_, _) => Code::TooManyVectors, UserError::TooManyVectors(_, _) => Code::TooManyVectors,

View File

@@ -30,11 +30,7 @@ actix-web = { version = "4.9.0", default-features = false, features = [
anyhow = { version = "1.0.95", features = ["backtrace"] } anyhow = { version = "1.0.95", features = ["backtrace"] }
async-trait = "0.1.85" async-trait = "0.1.85"
bstr = "1.11.3" bstr = "1.11.3"
byte-unit = { version = "5.1.6", default-features = false, features = [ byte-unit = { version = "5.1.6", features = ["serde"] }
"std",
"byte",
"serde",
] }
bytes = "1.9.0" bytes = "1.9.0"
clap = { version = "4.5.24", features = ["derive", "env"] } clap = { version = "4.5.24", features = ["derive", "env"] }
crossbeam-channel = "0.5.15" crossbeam-channel = "0.5.15"
@@ -140,7 +136,7 @@ reqwest = { version = "0.12.12", features = [
sha-1 = { version = "0.10.1", optional = true } sha-1 = { version = "0.10.1", optional = true }
static-files = { version = "0.2.4", optional = true } static-files = { version = "0.2.4", optional = true }
tempfile = { version = "3.15.0", optional = true } tempfile = { version = "3.15.0", optional = true }
zip = { version = "2.2.2", optional = true } zip = { version = "2.3.0", optional = true }
[features] [features]
default = ["meilisearch-types/all-tokenizations", "mini-dashboard"] default = ["meilisearch-types/all-tokenizations", "mini-dashboard"]
@@ -170,5 +166,5 @@ german = ["meilisearch-types/german"]
turkish = ["meilisearch-types/turkish"] turkish = ["meilisearch-types/turkish"]
[package.metadata.mini-dashboard] [package.metadata.mini-dashboard]
assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.18/build.zip" assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.19/build.zip"
sha1 = "b408a30dcb6e20cddb0c153c23385bcac4c8e912" sha1 = "7974430d5277c97f67cf6e95eec6faaac2788834"

View File

@@ -329,7 +329,8 @@ impl Infos {
http_addr: http_addr != default_http_addr(), http_addr: http_addr != default_http_addr(),
http_payload_size_limit, http_payload_size_limit,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
experimental_limit_batched_tasks_total_size, experimental_limit_batched_tasks_total_size:
experimental_limit_batched_tasks_total_size.into(),
task_queue_webhook: task_webhook_url.is_some(), task_queue_webhook: task_webhook_url.is_some(),
task_webhook_authorization_header: task_webhook_authorization_header.is_some(), task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
log_level: log_level.to_string(), log_level: log_level.to_string(),

View File

@@ -228,7 +228,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
cleanup_enabled: !opt.experimental_replication_parameters, cleanup_enabled: !opt.experimental_replication_parameters,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size, batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size.into(),
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
instance_features: opt.to_instance_features(), instance_features: opt.to_instance_features(),

View File

@@ -445,7 +445,7 @@ pub struct Opt {
/// see: <https://github.com/orgs/meilisearch/discussions/801> /// see: <https://github.com/orgs/meilisearch/discussions/801>
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())] #[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())]
#[serde(default = "default_limit_batched_tasks_total_size")] #[serde(default = "default_limit_batched_tasks_total_size")]
pub experimental_limit_batched_tasks_total_size: u64, pub experimental_limit_batched_tasks_total_size: Byte,
/// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each /// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each
/// distinct embedder. /// distinct embedder.
@@ -968,8 +968,8 @@ fn default_limit_batched_tasks() -> usize {
usize::MAX usize::MAX
} }
fn default_limit_batched_tasks_total_size() -> u64 { fn default_limit_batched_tasks_total_size() -> Byte {
u64::MAX Byte::from_u64(u64::MAX)
} }
fn default_embedding_cache_entries() -> usize { fn default_embedding_cache_entries() -> usize {

View File

@@ -518,7 +518,7 @@ impl From<index_scheduler::IndexStats> for IndexStats {
.inner_stats .inner_stats
.number_of_documents .number_of_documents
.unwrap_or(stats.inner_stats.documents_database_stats.number_of_entries()), .unwrap_or(stats.inner_stats.documents_database_stats.number_of_entries()),
raw_document_db_size: stats.inner_stats.documents_database_stats.total_value_size(), raw_document_db_size: stats.inner_stats.documents_database_stats.total_size(),
avg_document_size: stats.inner_stats.documents_database_stats.average_value_size(), avg_document_size: stats.inner_stats.documents_database_stats.average_value_size(),
is_indexing: stats.is_indexing, is_indexing: stats.is_indexing,
number_of_embeddings: stats.inner_stats.number_of_embeddings, number_of_embeddings: stats.inner_stats.number_of_embeddings,

View File

@@ -281,7 +281,8 @@ async fn test_summarized_document_addition_or_update() {
".startedAt" => "[date]", ".startedAt" => "[date]",
".finishedAt" => "[date]", ".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]", ".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]" ".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
}, },
@r###" @r###"
{ {
@@ -303,7 +304,8 @@ async fn test_summarized_document_addition_or_update() {
"test": 1 "test": 1
}, },
"progressTrace": "[progressTrace]", "progressTrace": "[progressTrace]",
"writeChannelCongestion": "[writeChannelCongestion]" "writeChannelCongestion": "[writeChannelCongestion]",
"internalDatabaseSizes": "[internalDatabaseSizes]"
}, },
"duration": "[duration]", "duration": "[duration]",
"startedAt": "[date]", "startedAt": "[date]",
@@ -322,7 +324,8 @@ async fn test_summarized_document_addition_or_update() {
".startedAt" => "[date]", ".startedAt" => "[date]",
".finishedAt" => "[date]", ".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]", ".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]" ".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
}, },
@r###" @r###"
{ {
@@ -407,7 +410,8 @@ async fn test_summarized_delete_documents_by_batch() {
".startedAt" => "[date]", ".startedAt" => "[date]",
".finishedAt" => "[date]", ".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]", ".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]" ".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
}, },
@r###" @r###"
{ {
@@ -495,7 +499,8 @@ async fn test_summarized_delete_documents_by_filter() {
".startedAt" => "[date]", ".startedAt" => "[date]",
".finishedAt" => "[date]", ".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]", ".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]" ".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
}, },
@r###" @r###"
{ {
@@ -537,7 +542,8 @@ async fn test_summarized_delete_documents_by_filter() {
".startedAt" => "[date]", ".startedAt" => "[date]",
".finishedAt" => "[date]", ".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]", ".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]" ".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
}, },
@r#" @r#"
{ {
@@ -623,7 +629,8 @@ async fn test_summarized_delete_document_by_id() {
".startedAt" => "[date]", ".startedAt" => "[date]",
".finishedAt" => "[date]", ".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]", ".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]" ".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
}, },
@r#" @r#"
{ {
@@ -679,7 +686,8 @@ async fn test_summarized_settings_update() {
".startedAt" => "[date]", ".startedAt" => "[date]",
".finishedAt" => "[date]", ".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]", ".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]" ".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
}, },
@r###" @r###"
{ {

View File

@@ -1897,11 +1897,11 @@ async fn update_documents_with_geo_field() {
}, },
{ {
"id": "3", "id": "3",
"_geo": { "lat": 1, "lng": 1 }, "_geo": { "lat": 3, "lng": 0 },
}, },
{ {
"id": "4", "id": "4",
"_geo": { "lat": "1", "lng": "1" }, "_geo": { "lat": "4", "lng": "0" },
}, },
]); ]);
@@ -1928,9 +1928,7 @@ async fn update_documents_with_geo_field() {
} }
"###); "###);
let (response, code) = index let (response, code) = index.search_post(json!({"sort": ["_geoPoint(10,0):asc"]})).await;
.search_post(json!({"sort": ["_geoPoint(50.629973371633746,3.0569447399419567):desc"]}))
.await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
// we are expecting docs 4 and 3 first as they have geo // we are expecting docs 4 and 3 first as they have geo
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]" }), snapshot!(json_string!(response, { ".processingTimeMs" => "[time]" }),
@@ -1940,18 +1938,18 @@ async fn update_documents_with_geo_field() {
{ {
"id": "4", "id": "4",
"_geo": { "_geo": {
"lat": "1", "lat": "4",
"lng": "1" "lng": "0"
}, },
"_geoDistance": 5522018 "_geoDistance": 667170
}, },
{ {
"id": "3", "id": "3",
"_geo": { "_geo": {
"lat": 1, "lat": 3,
"lng": 1 "lng": 0
}, },
"_geoDistance": 5522018 "_geoDistance": 778364
}, },
{ {
"id": "1" "id": "1"
@@ -1969,10 +1967,13 @@ async fn update_documents_with_geo_field() {
} }
"###); "###);
let updated_documents = json!([{ let updated_documents = json!([
"id": "3", {
"doggo": "kefir", "id": "3",
}]); "doggo": "kefir",
"_geo": { "lat": 5, "lng": 0 },
}
]);
let (task, _status_code) = index.update_documents(updated_documents, None).await; let (task, _status_code) = index.update_documents(updated_documents, None).await;
let response = index.wait_task(task.uid()).await; let response = index.wait_task(task.uid()).await;
snapshot!(json_string!(response, { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }), snapshot!(json_string!(response, { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@@ -2012,16 +2013,16 @@ async fn update_documents_with_geo_field() {
{ {
"id": "3", "id": "3",
"_geo": { "_geo": {
"lat": 1, "lat": 5,
"lng": 1 "lng": 0
}, },
"doggo": "kefir" "doggo": "kefir"
}, },
{ {
"id": "4", "id": "4",
"_geo": { "_geo": {
"lat": "1", "lat": "4",
"lng": "1" "lng": "0"
} }
} }
], ],
@@ -2031,31 +2032,29 @@ async fn update_documents_with_geo_field() {
} }
"###); "###);
let (response, code) = index let (response, code) = index.search_post(json!({"sort": ["_geoPoint(10,0):asc"]})).await;
.search_post(json!({"sort": ["_geoPoint(50.629973371633746,3.0569447399419567):desc"]}))
.await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
// the search response should not have changed: we are expecting docs 4 and 3 first as they have geo // the search response should not have changed: we are expecting docs 4 and 3 first as they have geo
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]" }), snapshot!(json_string!(response, { ".processingTimeMs" => "[time]" }),
@r###" @r###"
{ {
"hits": [ "hits": [
{
"id": "4",
"_geo": {
"lat": "1",
"lng": "1"
},
"_geoDistance": 5522018
},
{ {
"id": "3", "id": "3",
"_geo": { "_geo": {
"lat": 1, "lat": 5,
"lng": 1 "lng": 0
}, },
"doggo": "kefir", "doggo": "kefir",
"_geoDistance": 5522018 "_geoDistance": 555975
},
{
"id": "4",
"_geo": {
"lat": "4",
"lng": "0"
},
"_geoDistance": 667170
}, },
{ {
"id": "1" "id": "1"

View File

@@ -157,11 +157,14 @@ async fn delete_document_by_filter() {
index.wait_task(task.uid()).await.succeeded(); index.wait_task(task.uid()).await.succeeded();
let (stats, _) = index.stats().await; let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 4, "numberOfDocuments": 4,
"rawDocumentDbSize": 42, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 10, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -208,11 +211,14 @@ async fn delete_document_by_filter() {
"###); "###);
let (stats, _) = index.stats().await; let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 2, "numberOfDocuments": 2,
"rawDocumentDbSize": 16, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 8, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -278,11 +284,14 @@ async fn delete_document_by_filter() {
"###); "###);
let (stats, _) = index.stats().await; let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 1, "numberOfDocuments": 1,
"rawDocumentDbSize": 12, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 12, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,

View File

@@ -28,12 +28,15 @@ async fn import_dump_v1_movie_raw() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 21965, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 414, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -185,12 +188,15 @@ async fn import_dump_v1_movie_with_settings() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 21965, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 414, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -355,12 +361,15 @@ async fn import_dump_v1_rubygems_with_settings() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 8606, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 162, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -522,12 +531,15 @@ async fn import_dump_v2_movie_raw() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 21965, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 414, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -679,12 +691,15 @@ async fn import_dump_v2_movie_with_settings() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 21965, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 414, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -846,12 +861,15 @@ async fn import_dump_v2_rubygems_with_settings() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 8606, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 162, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -1010,12 +1028,15 @@ async fn import_dump_v3_movie_raw() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 21965, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 414, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -1167,12 +1188,15 @@ async fn import_dump_v3_movie_with_settings() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 21965, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 414, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -1334,12 +1358,15 @@ async fn import_dump_v3_rubygems_with_settings() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 8606, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 162, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -1498,12 +1525,15 @@ async fn import_dump_v4_movie_raw() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 21965, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 414, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -1655,12 +1685,15 @@ async fn import_dump_v4_movie_with_settings() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 21965, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 414, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -1822,12 +1855,15 @@ async fn import_dump_v4_rubygems_with_settings() {
let (stats, code) = index.stats().await; let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 53, "numberOfDocuments": 53,
"rawDocumentDbSize": 8606, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 162, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -1994,11 +2030,14 @@ async fn import_dump_v5() {
let (stats, code) = index1.stats().await; let (stats, code) = index1.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 10, "numberOfDocuments": 10,
"rawDocumentDbSize": 6782, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 678, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -2031,12 +2070,15 @@ async fn import_dump_v5() {
let (stats, code) = index2.stats().await; let (stats, code) = index2.stats().await;
snapshot!(code, @"200 OK"); snapshot!(code, @"200 OK");
snapshot!( snapshot!(
json_string!(stats), json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###" @r###"
{ {
"numberOfDocuments": 10, "numberOfDocuments": 10,
"rawDocumentDbSize": 6782, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 678, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -2237,6 +2279,7 @@ async fn import_dump_v6_containing_batches_and_enqueued_tasks() {
".results[0].duration" => "[date]", ".results[0].duration" => "[date]",
".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.progressTrace" => "[progressTrace]",
".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]",
".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]",
}), name: "batches"); }), name: "batches");
let (indexes, code) = server.list_indexes(None, None).await; let (indexes, code) = server.list_indexes(None, None).await;

View File

@@ -1783,6 +1783,146 @@ async fn test_nested_fields() {
.await; .await;
} }
#[actix_rt::test]
async fn test_typo_settings() {
let documents = json!([
{
"id": 0,
"title": "The zeroth document",
},
{
"id": 1,
"title": "The first document",
"nested": {
"object": "field",
"machin": "bidule",
},
},
{
"id": 2,
"title": "The second document",
"nested": [
"array",
{
"object": "field",
},
{
"prout": "truc",
"machin": "lol",
},
],
},
{
"id": 3,
"title": "The third document",
"nested": "I lied",
},
]);
test_settings_documents_indexing_swapping_and_search(
&documents,
&json!({
"searchableAttributes": ["title", "nested.object", "nested.machin"],
"typoTolerance": {
"enabled": true,
"disableOnAttributes": ["title"]
}
}),
&json!({"q": "document"}),
|response, code| {
assert_eq!(code, 200, "{}", response);
snapshot!(json_string!(response["hits"]), @r###"
[
{
"id": 0,
"title": "The zeroth document"
},
{
"id": 1,
"title": "The first document",
"nested": {
"object": "field",
"machin": "bidule"
}
},
{
"id": 2,
"title": "The second document",
"nested": [
"array",
{
"object": "field"
},
{
"prout": "truc",
"machin": "lol"
}
]
},
{
"id": 3,
"title": "The third document",
"nested": "I lied"
}
]
"###);
},
)
.await;
// Test prefix search
test_settings_documents_indexing_swapping_and_search(
&documents,
&json!({
"searchableAttributes": ["title", "nested.object", "nested.machin"],
"typoTolerance": {
"enabled": true,
"disableOnAttributes": ["title"]
}
}),
&json!({"q": "docume"}),
|response, code| {
assert_eq!(code, 200, "{}", response);
snapshot!(json_string!(response["hits"]), @r###"
[
{
"id": 0,
"title": "The zeroth document"
},
{
"id": 1,
"title": "The first document",
"nested": {
"object": "field",
"machin": "bidule"
}
},
{
"id": 2,
"title": "The second document",
"nested": [
"array",
{
"object": "field"
},
{
"prout": "truc",
"machin": "lol"
}
]
},
{
"id": 3,
"title": "The third document",
"nested": "I lied"
}
]
"###);
},
)
.await;
}
/// Modifying facets with different casing should work correctly /// Modifying facets with different casing should work correctly
#[actix_rt::test] #[actix_rt::test]
async fn change_facet_casing() { async fn change_facet_casing() {

View File

@@ -110,11 +110,14 @@ async fn add_remove_embeddings() {
index.wait_task(response.uid()).await.succeeded(); index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await; let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 2, "numberOfDocuments": 2,
"rawDocumentDbSize": 27, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 13, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 5, "numberOfEmbeddings": 5,
"numberOfEmbeddedDocuments": 2, "numberOfEmbeddedDocuments": 2,
@@ -135,11 +138,14 @@ async fn add_remove_embeddings() {
index.wait_task(response.uid()).await.succeeded(); index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await; let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 2, "numberOfDocuments": 2,
"rawDocumentDbSize": 27, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 13, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 3, "numberOfEmbeddings": 3,
"numberOfEmbeddedDocuments": 2, "numberOfEmbeddedDocuments": 2,
@@ -160,11 +166,14 @@ async fn add_remove_embeddings() {
index.wait_task(response.uid()).await.succeeded(); index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await; let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 2, "numberOfDocuments": 2,
"rawDocumentDbSize": 27, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 13, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 2, "numberOfEmbeddings": 2,
"numberOfEmbeddedDocuments": 2, "numberOfEmbeddedDocuments": 2,
@@ -186,11 +195,14 @@ async fn add_remove_embeddings() {
index.wait_task(response.uid()).await.succeeded(); index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await; let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 2, "numberOfDocuments": 2,
"rawDocumentDbSize": 27, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 13, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 2, "numberOfEmbeddings": 2,
"numberOfEmbeddedDocuments": 1, "numberOfEmbeddedDocuments": 1,
@@ -236,11 +248,14 @@ async fn add_remove_embedded_documents() {
index.wait_task(response.uid()).await.succeeded(); index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await; let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 2, "numberOfDocuments": 2,
"rawDocumentDbSize": 27, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 13, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 5, "numberOfEmbeddings": 5,
"numberOfEmbeddedDocuments": 2, "numberOfEmbeddedDocuments": 2,
@@ -257,11 +272,14 @@ async fn add_remove_embedded_documents() {
index.wait_task(response.uid()).await.succeeded(); index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await; let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 1, "numberOfDocuments": 1,
"rawDocumentDbSize": 13, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 13, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 3, "numberOfEmbeddings": 3,
"numberOfEmbeddedDocuments": 1, "numberOfEmbeddedDocuments": 1,
@@ -290,11 +308,14 @@ async fn update_embedder_settings() {
index.wait_task(response.uid()).await.succeeded(); index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await; let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 2, "numberOfDocuments": 2,
"rawDocumentDbSize": 108, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 54, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -326,11 +347,14 @@ async fn update_embedder_settings() {
server.wait_task(response.uid()).await.succeeded(); server.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await; let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{ {
"numberOfDocuments": 2, "numberOfDocuments": 2,
"rawDocumentDbSize": 108, "rawDocumentDbSize": "[size]",
"avgDocumentSize": 54, "avgDocumentSize": "[size]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 3, "numberOfEmbeddings": 3,
"numberOfEmbeddedDocuments": 2, "numberOfEmbeddedDocuments": 2,

View File

@@ -133,7 +133,9 @@ async fn check_the_index_scheduler(server: &Server) {
let (stats, _) = server.stats().await; let (stats, _) = server.stats().await;
assert_json_snapshot!(stats, { assert_json_snapshot!(stats, {
".databaseSize" => "[bytes]", ".databaseSize" => "[bytes]",
".usedDatabaseSize" => "[bytes]" ".usedDatabaseSize" => "[bytes]",
".indexes.kefir.rawDocumentDbSize" => "[bytes]",
".indexes.kefir.avgDocumentSize" => "[bytes]",
}, },
@r###" @r###"
{ {
@@ -143,8 +145,8 @@ async fn check_the_index_scheduler(server: &Server) {
"indexes": { "indexes": {
"kefir": { "kefir": {
"numberOfDocuments": 1, "numberOfDocuments": 1,
"rawDocumentDbSize": 109, "rawDocumentDbSize": "[bytes]",
"avgDocumentSize": 109, "avgDocumentSize": "[bytes]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -193,31 +195,33 @@ async fn check_the_index_scheduler(server: &Server) {
// Tests all the batches query parameters // Tests all the batches query parameters
let (batches, _) = server.batches_filter("uids=10").await; let (batches, _) = server.batches_filter("uids=10").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_uids_equal_10"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_uids_equal_10");
let (batches, _) = server.batches_filter("batchUids=10").await; let (batches, _) = server.batches_filter("batchUids=10").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_batchUids_equal_10"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_batchUids_equal_10");
let (batches, _) = server.batches_filter("statuses=canceled").await; let (batches, _) = server.batches_filter("statuses=canceled").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_statuses_equal_canceled"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_statuses_equal_canceled");
// types has already been tested above to retrieve the upgrade database // types has already been tested above to retrieve the upgrade database
let (batches, _) = server.batches_filter("canceledBy=19").await; let (batches, _) = server.batches_filter("canceledBy=19").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_canceledBy_equal_19"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_canceledBy_equal_19");
let (batches, _) = server.batches_filter("beforeEnqueuedAt=2025-01-16T16:47:41Z").await; let (batches, _) = server.batches_filter("beforeEnqueuedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("afterEnqueuedAt=2025-01-16T16:47:41Z").await; let (batches, _) = server.batches_filter("afterEnqueuedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("beforeStartedAt=2025-01-16T16:47:41Z").await; let (batches, _) = server.batches_filter("beforeStartedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("afterStartedAt=2025-01-16T16:47:41Z").await; let (batches, _) = server.batches_filter("afterStartedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("beforeFinishedAt=2025-01-16T16:47:41Z").await; let (batches, _) = server.batches_filter("beforeFinishedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("afterFinishedAt=2025-01-16T16:47:41Z").await; let (batches, _) = server.batches_filter("afterFinishedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41");
let (stats, _) = server.stats().await; let (stats, _) = server.stats().await;
assert_json_snapshot!(stats, { assert_json_snapshot!(stats, {
".databaseSize" => "[bytes]", ".databaseSize" => "[bytes]",
".usedDatabaseSize" => "[bytes]" ".usedDatabaseSize" => "[bytes]",
".indexes.kefir.rawDocumentDbSize" => "[bytes]",
".indexes.kefir.avgDocumentSize" => "[bytes]",
}, },
@r###" @r###"
{ {
@@ -227,8 +231,8 @@ async fn check_the_index_scheduler(server: &Server) {
"indexes": { "indexes": {
"kefir": { "kefir": {
"numberOfDocuments": 1, "numberOfDocuments": 1,
"rawDocumentDbSize": 109, "rawDocumentDbSize": "[bytes]",
"avgDocumentSize": 109, "avgDocumentSize": "[bytes]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,
@@ -245,11 +249,14 @@ async fn check_the_index_scheduler(server: &Server) {
"###); "###);
let index = server.index("kefir"); let index = server.index("kefir");
let (stats, _) = index.stats().await; let (stats, _) = index.stats().await;
snapshot!(stats, @r###" snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[bytes]",
".avgDocumentSize" => "[bytes]",
}), @r###"
{ {
"numberOfDocuments": 1, "numberOfDocuments": 1,
"rawDocumentDbSize": 109, "rawDocumentDbSize": "[bytes]",
"avgDocumentSize": 109, "avgDocumentSize": "[bytes]",
"isIndexing": false, "isIndexing": false,
"numberOfEmbeddings": 0, "numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0, "numberOfEmbeddedDocuments": 0,

View File

@@ -188,7 +188,7 @@ async fn user_provide_mismatched_embedding_dimension() {
let (value, code) = index.add_documents(documents, None).await; let (value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted"); snapshot!(code, @"202 Accepted");
let task = index.wait_task(value.uid()).await; let task = index.wait_task(value.uid()).await;
snapshot!(task, @r#" snapshot!(task, @r###"
{ {
"uid": "[uid]", "uid": "[uid]",
"batchUid": "[batch_uid]", "batchUid": "[batch_uid]",
@@ -201,7 +201,7 @@ async fn user_provide_mismatched_embedding_dimension() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Index `doggo`: Invalid vector dimensions: expected: `3`, found: `2`.", "message": "Index `doggo`: Invalid vector dimensions in document with id `0` in `._vectors.manual`.\n - note: embedding #0 has dimensions 2\n - note: embedder `manual` requires 3",
"code": "invalid_vector_dimensions", "code": "invalid_vector_dimensions",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vector_dimensions" "link": "https://docs.meilisearch.com/errors#invalid_vector_dimensions"
@@ -211,46 +211,36 @@ async fn user_provide_mismatched_embedding_dimension() {
"startedAt": "[date]", "startedAt": "[date]",
"finishedAt": "[date]" "finishedAt": "[date]"
} }
"#); "###);
// FIXME: /!\ Case where number of embeddings is divisor of `dimensions` would still pass
let new_document = json!([ let new_document = json!([
{"id": 0, "name": "kefir", "_vectors": { "manual": [[0, 0], [1, 1], [2, 2]] }}, {"id": 0, "name": "kefir", "_vectors": { "manual": [[0, 0], [1, 1], [2, 2]] }},
]); ]);
let (response, code) = index.add_documents(new_document, None).await; let (response, code) = index.add_documents(new_document, None).await;
snapshot!(code, @"202 Accepted"); snapshot!(code, @"202 Accepted");
index.wait_task(response.uid()).await.succeeded(); let task = index.wait_task(response.uid()).await;
let (documents, _code) = index snapshot!(task, @r###"
.get_all_documents(GetAllDocumentsOptions { retrieve_vectors: true, ..Default::default() })
.await;
snapshot!(json_string!(documents), @r###"
{ {
"results": [ "uid": "[uid]",
{ "batchUid": "[batch_uid]",
"id": 0, "indexUid": "doggo",
"name": "kefir", "status": "failed",
"_vectors": { "type": "documentAdditionOrUpdate",
"manual": { "canceledBy": null,
"embeddings": [ "details": {
[ "receivedDocuments": 1,
0.0, "indexedDocuments": 0
0.0, },
1.0 "error": {
], "message": "Index `doggo`: Invalid vector dimensions in document with id `0` in `._vectors.manual`.\n - note: embedding #0 has dimensions 2\n - note: embedder `manual` requires 3",
[ "code": "invalid_vector_dimensions",
1.0, "type": "invalid_request",
2.0, "link": "https://docs.meilisearch.com/errors#invalid_vector_dimensions"
2.0 },
] "duration": "[duration]",
], "enqueuedAt": "[date]",
"regenerate": false "startedAt": "[date]",
} "finishedAt": "[date]"
}
}
],
"offset": 0,
"limit": 20,
"total": 1
} }
"###); "###);
} }

View File

@@ -1,8 +1,13 @@
use heed::types::Bytes; use std::mem;
use heed::Database; use heed::Database;
use heed::DatabaseStat;
use heed::RoTxn; use heed::RoTxn;
use heed::Unspecified;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::BEU32;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
/// The stats of a database. /// The stats of a database.
@@ -20,58 +25,24 @@ impl DatabaseStats {
/// ///
/// This function iterates over the whole database and computes the stats. /// This function iterates over the whole database and computes the stats.
/// It is not efficient and should be cached somewhere. /// It is not efficient and should be cached somewhere.
pub(crate) fn new(database: Database<Bytes, Bytes>, rtxn: &RoTxn<'_>) -> heed::Result<Self> { pub(crate) fn new(
let mut database_stats = database: Database<BEU32, Unspecified>,
Self { number_of_entries: 0, total_key_size: 0, total_value_size: 0 }; rtxn: &RoTxn<'_>,
) -> heed::Result<Self> {
let DatabaseStat { page_size, depth: _, branch_pages, leaf_pages, overflow_pages, entries } =
database.stat(rtxn)?;
let mut iter = database.iter(rtxn)?; // We first take the total size without overflow pages as the overflow pages contains the values and only that.
while let Some((key, value)) = iter.next().transpose()? { let total_size = (branch_pages + leaf_pages + overflow_pages) * page_size as usize;
let key_size = key.len() as u64; // We compute an estimated size for the keys.
let value_size = value.len() as u64; let total_key_size = entries * (mem::size_of::<u32>() + 4);
database_stats.total_key_size += key_size; let total_value_size = total_size - total_key_size;
database_stats.total_value_size += value_size;
}
database_stats.number_of_entries = database.len(rtxn)?; Ok(Self {
number_of_entries: entries as u64,
Ok(database_stats) total_key_size: total_key_size as u64,
} total_value_size: total_value_size as u64,
})
/// Recomputes the stats of the database and returns the new stats.
///
/// This function is used to update the stats of the database when some keys are modified.
/// It is more efficient than the `new` function because it does not iterate over the whole database but only the modified keys comparing the before and after states.
pub(crate) fn recompute<I, K>(
mut stats: Self,
database: Database<Bytes, Bytes>,
before_rtxn: &RoTxn<'_>,
after_rtxn: &RoTxn<'_>,
modified_keys: I,
) -> heed::Result<Self>
where
I: IntoIterator<Item = K>,
K: AsRef<[u8]>,
{
for key in modified_keys {
let key = key.as_ref();
if let Some(value) = database.get(after_rtxn, key)? {
let key_size = key.len() as u64;
let value_size = value.len() as u64;
stats.total_key_size = stats.total_key_size.saturating_add(key_size);
stats.total_value_size = stats.total_value_size.saturating_add(value_size);
}
if let Some(value) = database.get(before_rtxn, key)? {
let key_size = key.len() as u64;
let value_size = value.len() as u64;
stats.total_key_size = stats.total_key_size.saturating_sub(key_size);
stats.total_value_size = stats.total_value_size.saturating_sub(value_size);
}
}
stats.number_of_entries = database.len(after_rtxn)?;
Ok(stats)
} }
pub fn average_key_size(&self) -> u64 { pub fn average_key_size(&self) -> u64 {
@@ -86,6 +57,10 @@ impl DatabaseStats {
self.number_of_entries self.number_of_entries
} }
pub fn total_size(&self) -> u64 {
self.total_key_size + self.total_value_size
}
pub fn total_key_size(&self) -> u64 { pub fn total_key_size(&self) -> u64 {
self.total_key_size self.total_key_size
} }

View File

@@ -154,6 +154,14 @@ and can not be more than 511 bytes.", .document_id.to_string()
InvalidGeoField(#[from] Box<GeoError>), InvalidGeoField(#[from] Box<GeoError>),
#[error("Invalid vector dimensions: expected: `{}`, found: `{}`.", .expected, .found)] #[error("Invalid vector dimensions: expected: `{}`, found: `{}`.", .expected, .found)]
InvalidVectorDimensions { expected: usize, found: usize }, InvalidVectorDimensions { expected: usize, found: usize },
#[error("Invalid vector dimensions in document with id `{document_id}` in `._vectors.{embedder_name}`.\n - note: embedding #{embedding_index} has dimensions {found}\n - note: embedder `{embedder_name}` requires {expected}")]
InvalidIndexingVectorDimensions {
embedder_name: String,
document_id: String,
embedding_index: usize,
expected: usize,
found: usize,
},
#[error("The `_vectors` field in the document with id: `{document_id}` is not an object. Was expecting an object with a key for each embedder with manually provided vectors, but instead got `{value}`")] #[error("The `_vectors` field in the document with id: `{document_id}` is not an object. Was expecting an object with a key for each embedder with manually provided vectors, but instead got `{value}`")]
InvalidVectorsMapType { document_id: String, value: Value }, InvalidVectorsMapType { document_id: String, value: Value },
#[error("Bad embedder configuration in the document with id: `{document_id}`. {error}")] #[error("Bad embedder configuration in the document with id: `{document_id}`. {error}")]

View File

@@ -3,8 +3,9 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fs::File; use std::fs::File;
use std::path::Path; use std::path::Path;
use heed::{types::*, WithoutTls}; use heed::{types::*, DatabaseStat, WithoutTls};
use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified}; use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified};
use indexmap::IndexMap;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use rstar::RTree; use rstar::RTree;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -410,38 +411,6 @@ impl Index {
Ok(count.unwrap_or_default()) Ok(count.unwrap_or_default())
} }
/// Updates the stats of the documents database based on the previous stats and the modified docids.
pub fn update_documents_stats(
&self,
wtxn: &mut RwTxn<'_>,
modified_docids: roaring::RoaringBitmap,
) -> Result<()> {
let before_rtxn = self.read_txn()?;
let document_stats = match self.documents_stats(&before_rtxn)? {
Some(before_stats) => DatabaseStats::recompute(
before_stats,
self.documents.remap_types(),
&before_rtxn,
wtxn,
modified_docids.iter().map(|docid| docid.to_be_bytes()),
)?,
None => {
// This should never happen when there are already documents in the index, the documents stats should be present.
// If it happens, it means that the index was not properly initialized/upgraded.
debug_assert_eq!(
self.documents.len(&before_rtxn)?,
0,
"The documents stats should be present when there are documents in the index"
);
tracing::warn!("No documents stats found, creating new ones");
DatabaseStats::new(self.documents.remap_types(), &*wtxn)?
}
};
self.put_documents_stats(wtxn, document_stats)?;
Ok(())
}
/// Writes the stats of the documents database. /// Writes the stats of the documents database.
pub fn put_documents_stats( pub fn put_documents_stats(
&self, &self,
@@ -1755,6 +1724,122 @@ impl Index {
} }
Ok(stats) Ok(stats)
} }
/// Check if the word is indexed in the index.
///
/// This function checks if the word is indexed in the index by looking at the word_docids and exact_word_docids.
///
/// # Arguments
///
/// * `rtxn`: The read transaction.
/// * `word`: The word to check.
pub fn contains_word(&self, rtxn: &RoTxn<'_>, word: &str) -> Result<bool> {
Ok(self.word_docids.remap_data_type::<DecodeIgnore>().get(rtxn, word)?.is_some()
|| self.exact_word_docids.remap_data_type::<DecodeIgnore>().get(rtxn, word)?.is_some())
}
/// Returns the sizes in bytes of each of the index database at the given rtxn.
pub fn database_sizes(&self, rtxn: &RoTxn<'_>) -> heed::Result<IndexMap<&'static str, usize>> {
let Self {
env: _,
main,
external_documents_ids,
word_docids,
exact_word_docids,
word_prefix_docids,
exact_word_prefix_docids,
word_pair_proximity_docids,
word_position_docids,
word_fid_docids,
word_prefix_position_docids,
word_prefix_fid_docids,
field_id_word_count_docids,
facet_id_f64_docids,
facet_id_string_docids,
facet_id_normalized_string_strings,
facet_id_string_fst,
facet_id_exists_docids,
facet_id_is_null_docids,
facet_id_is_empty_docids,
field_id_docid_facet_f64s,
field_id_docid_facet_strings,
vector_arroy,
embedder_category_id,
documents,
} = self;
fn compute_size(stats: DatabaseStat) -> usize {
let DatabaseStat {
page_size,
depth: _,
branch_pages,
leaf_pages,
overflow_pages,
entries: _,
} = stats;
(branch_pages + leaf_pages + overflow_pages) * page_size as usize
}
let mut sizes = IndexMap::new();
sizes.insert("main", main.stat(rtxn).map(compute_size)?);
sizes
.insert("external_documents_ids", external_documents_ids.stat(rtxn).map(compute_size)?);
sizes.insert("word_docids", word_docids.stat(rtxn).map(compute_size)?);
sizes.insert("exact_word_docids", exact_word_docids.stat(rtxn).map(compute_size)?);
sizes.insert("word_prefix_docids", word_prefix_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"exact_word_prefix_docids",
exact_word_prefix_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert(
"word_pair_proximity_docids",
word_pair_proximity_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert("word_position_docids", word_position_docids.stat(rtxn).map(compute_size)?);
sizes.insert("word_fid_docids", word_fid_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"word_prefix_position_docids",
word_prefix_position_docids.stat(rtxn).map(compute_size)?,
);
sizes
.insert("word_prefix_fid_docids", word_prefix_fid_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"field_id_word_count_docids",
field_id_word_count_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert("facet_id_f64_docids", facet_id_f64_docids.stat(rtxn).map(compute_size)?);
sizes
.insert("facet_id_string_docids", facet_id_string_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"facet_id_normalized_string_strings",
facet_id_normalized_string_strings.stat(rtxn).map(compute_size)?,
);
sizes.insert("facet_id_string_fst", facet_id_string_fst.stat(rtxn).map(compute_size)?);
sizes
.insert("facet_id_exists_docids", facet_id_exists_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"facet_id_is_null_docids",
facet_id_is_null_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert(
"facet_id_is_empty_docids",
facet_id_is_empty_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert(
"field_id_docid_facet_f64s",
field_id_docid_facet_f64s.stat(rtxn).map(compute_size)?,
);
sizes.insert(
"field_id_docid_facet_strings",
field_id_docid_facet_strings.stat(rtxn).map(compute_size)?,
);
sizes.insert("vector_arroy", vector_arroy.stat(rtxn).map(compute_size)?);
sizes.insert("embedder_category_id", embedder_category_id.stat(rtxn).map(compute_size)?);
sizes.insert("documents", documents.stat(rtxn).map(compute_size)?);
Ok(sizes)
}
} }
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]

View File

@@ -190,8 +190,18 @@ macro_rules! make_atomic_progress {
}; };
} }
make_atomic_progress!(Document alias AtomicDocumentStep => "document" ); make_atomic_progress!(Document alias AtomicDocumentStep => "document");
make_atomic_progress!(Payload alias AtomicPayloadStep => "payload" ); make_atomic_progress!(Payload alias AtomicPayloadStep => "payload");
make_enum_progress! {
pub enum MergingWordCache {
WordDocids,
WordFieldIdDocids,
ExactWordDocids,
WordPositionDocids,
FieldIdWordCountDocids,
}
}
#[derive(Debug, Serialize, Clone, ToSchema)] #[derive(Debug, Serialize, Clone, ToSchema)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]

View File

@@ -173,16 +173,18 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
ranking_rule_scores.push(ScoreDetails::Skipped); ranking_rule_scores.push(ScoreDetails::Skipped);
// remove candidates from the universe without adding them to result if their score is below the threshold // remove candidates from the universe without adding them to result if their score is below the threshold
if let Some(ranking_score_threshold) = ranking_score_threshold { let is_below_threshold =
let current_score = ScoreDetails::global_score(ranking_rule_scores.iter()); ranking_score_threshold.is_some_and(|ranking_score_threshold| {
if current_score < ranking_score_threshold { let current_score = ScoreDetails::global_score(ranking_rule_scores.iter());
all_candidates -= bucket | &ranking_rule_universes[cur_ranking_rule_index]; current_score < ranking_score_threshold
back!(); });
continue;
}
}
maybe_add_to_results!(bucket); if is_below_threshold {
all_candidates -= &bucket;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(bucket);
}
ranking_rule_scores.pop(); ranking_rule_scores.pop();
@@ -237,23 +239,24 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
); );
// remove candidates from the universe without adding them to result if their score is below the threshold // remove candidates from the universe without adding them to result if their score is below the threshold
if let Some(ranking_score_threshold) = ranking_score_threshold { let is_below_threshold = ranking_score_threshold.is_some_and(|ranking_score_threshold| {
let current_score = ScoreDetails::global_score(ranking_rule_scores.iter()); let current_score = ScoreDetails::global_score(ranking_rule_scores.iter());
if current_score < ranking_score_threshold { current_score < ranking_score_threshold
all_candidates -= });
next_bucket.candidates | &ranking_rule_universes[cur_ranking_rule_index];
back!();
continue;
}
}
ranking_rule_universes[cur_ranking_rule_index] -= &next_bucket.candidates; ranking_rule_universes[cur_ranking_rule_index] -= &next_bucket.candidates;
if cur_ranking_rule_index == ranking_rules_len - 1 if cur_ranking_rule_index == ranking_rules_len - 1
|| (scoring_strategy == ScoringStrategy::Skip && next_bucket.candidates.len() <= 1) || (scoring_strategy == ScoringStrategy::Skip && next_bucket.candidates.len() <= 1)
|| cur_offset + (next_bucket.candidates.len() as usize) < from || cur_offset + (next_bucket.candidates.len() as usize) < from
|| is_below_threshold
{ {
maybe_add_to_results!(next_bucket.candidates); if is_below_threshold {
all_candidates -= &next_bucket.candidates;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(next_bucket.candidates);
}
ranking_rule_scores.pop(); ranking_rule_scores.pop();
continue; continue;
} }

View File

@@ -1,10 +1,12 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::ops::ControlFlow; use std::ops::ControlFlow;
use fst::automaton::Str; use fst::automaton::Str;
use fst::{Automaton, IntoStreamer, Streamer}; use fst::{IntoStreamer, Streamer};
use heed::types::DecodeIgnore; use heed::types::DecodeIgnore;
use itertools::{merge_join_by, EitherOrBoth};
use super::{OneTypoTerm, Phrase, QueryTerm, ZeroTypoTerm}; use super::{OneTypoTerm, Phrase, QueryTerm, ZeroTypoTerm};
use crate::search::fst_utils::{Complement, Intersection, StartsWith, Union}; use crate::search::fst_utils::{Complement, Intersection, StartsWith, Union};
@@ -16,16 +18,10 @@ use crate::{Result, MAX_WORD_LENGTH};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum NumberOfTypos { pub enum NumberOfTypos {
Zero,
One, One,
Two, Two,
} }
pub enum ZeroOrOneTypo {
Zero,
One,
}
impl Interned<QueryTerm> { impl Interned<QueryTerm> {
pub fn compute_fully_if_needed(self, ctx: &mut SearchContext<'_>) -> Result<()> { pub fn compute_fully_if_needed(self, ctx: &mut SearchContext<'_>) -> Result<()> {
let s = ctx.term_interner.get_mut(self); let s = ctx.term_interner.get_mut(self);
@@ -47,34 +43,45 @@ impl Interned<QueryTerm> {
} }
fn find_zero_typo_prefix_derivations( fn find_zero_typo_prefix_derivations(
ctx: &mut SearchContext<'_>,
word_interned: Interned<String>, word_interned: Interned<String>,
fst: fst::Set<Cow<'_, [u8]>>,
word_interner: &mut DedupInterner<String>,
mut visit: impl FnMut(Interned<String>) -> Result<ControlFlow<()>>, mut visit: impl FnMut(Interned<String>) -> Result<ControlFlow<()>>,
) -> Result<()> { ) -> Result<()> {
let word = word_interner.get(word_interned).to_owned(); let word = ctx.word_interner.get(word_interned).to_owned();
let word = word.as_str(); let word = word.as_str();
let prefix = Str::new(word).starts_with();
let mut stream = fst.search(prefix).into_stream();
while let Some(derived_word) = stream.next() { let words =
let derived_word = std::str::from_utf8(derived_word)?.to_owned(); ctx.index.word_docids.remap_data_type::<DecodeIgnore>().prefix_iter(ctx.txn, word)?;
let derived_word_interned = word_interner.insert(derived_word); let exact_words =
if derived_word_interned != word_interned { ctx.index.exact_word_docids.remap_data_type::<DecodeIgnore>().prefix_iter(ctx.txn, word)?;
let cf = visit(derived_word_interned)?;
if cf.is_break() { for eob in merge_join_by(words, exact_words, |lhs, rhs| match (lhs, rhs) {
break; (Ok((word, _)), Ok((exact_word, _))) => word.cmp(exact_word),
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
}) {
match eob {
EitherOrBoth::Both(kv, _) | EitherOrBoth::Left(kv) | EitherOrBoth::Right(kv) => {
let (derived_word, _) = kv?;
let derived_word = derived_word.to_string();
let derived_word_interned = ctx.word_interner.insert(derived_word);
if derived_word_interned != word_interned {
let cf = visit(derived_word_interned)?;
if cf.is_break() {
break;
}
}
} }
} }
} }
Ok(()) Ok(())
} }
fn find_zero_one_typo_derivations( fn find_one_typo_derivations(
ctx: &mut SearchContext<'_>, ctx: &mut SearchContext<'_>,
word_interned: Interned<String>, word_interned: Interned<String>,
is_prefix: bool, is_prefix: bool,
mut visit: impl FnMut(Interned<String>, ZeroOrOneTypo) -> Result<ControlFlow<()>>, mut visit: impl FnMut(Interned<String>) -> Result<ControlFlow<()>>,
) -> Result<()> { ) -> Result<()> {
let fst = ctx.get_words_fst()?; let fst = ctx.get_words_fst()?;
let word = ctx.word_interner.get(word_interned).to_owned(); let word = ctx.word_interner.get(word_interned).to_owned();
@@ -89,16 +96,9 @@ fn find_zero_one_typo_derivations(
let derived_word = ctx.word_interner.insert(derived_word.to_owned()); let derived_word = ctx.word_interner.insert(derived_word.to_owned());
let d = dfa.distance(state.1); let d = dfa.distance(state.1);
match d.to_u8() { match d.to_u8() {
0 => { 0 => (),
if derived_word != word_interned {
let cf = visit(derived_word, ZeroOrOneTypo::Zero)?;
if cf.is_break() {
break;
}
}
}
1 => { 1 => {
let cf = visit(derived_word, ZeroOrOneTypo::One)?; let cf = visit(derived_word)?;
if cf.is_break() { if cf.is_break() {
break; break;
} }
@@ -111,7 +111,7 @@ fn find_zero_one_typo_derivations(
Ok(()) Ok(())
} }
fn find_zero_one_two_typo_derivations( fn find_one_two_typo_derivations(
word_interned: Interned<String>, word_interned: Interned<String>,
is_prefix: bool, is_prefix: bool,
fst: fst::Set<Cow<'_, [u8]>>, fst: fst::Set<Cow<'_, [u8]>>,
@@ -144,14 +144,7 @@ fn find_zero_one_two_typo_derivations(
// correct distance // correct distance
let d = second_dfa.distance((state.1).0); let d = second_dfa.distance((state.1).0);
match d.to_u8() { match d.to_u8() {
0 => { 0 => (),
if derived_word_interned != word_interned {
let cf = visit(derived_word_interned, NumberOfTypos::Zero)?;
if cf.is_break() {
break;
}
}
}
1 => { 1 => {
let cf = visit(derived_word_interned, NumberOfTypos::One)?; let cf = visit(derived_word_interned, NumberOfTypos::One)?;
if cf.is_break() { if cf.is_break() {
@@ -194,8 +187,6 @@ pub fn partially_initialized_term_from_word(
}); });
} }
let fst = ctx.index.words_fst(ctx.txn)?;
let use_prefix_db = is_prefix let use_prefix_db = is_prefix
&& (ctx && (ctx
.index .index
@@ -215,24 +206,19 @@ pub fn partially_initialized_term_from_word(
let mut zero_typo = None; let mut zero_typo = None;
let mut prefix_of = BTreeSet::new(); let mut prefix_of = BTreeSet::new();
if fst.contains(word) || ctx.index.exact_word_docids.get(ctx.txn, word)?.is_some() { if ctx.index.contains_word(ctx.txn, word)? {
zero_typo = Some(word_interned); zero_typo = Some(word_interned);
} }
if is_prefix && use_prefix_db.is_none() { if is_prefix && use_prefix_db.is_none() {
find_zero_typo_prefix_derivations( find_zero_typo_prefix_derivations(ctx, word_interned, |derived_word| {
word_interned, if prefix_of.len() < limits::MAX_PREFIX_COUNT {
fst, prefix_of.insert(derived_word);
&mut ctx.word_interner, Ok(ControlFlow::Continue(()))
|derived_word| { } else {
if prefix_of.len() < limits::MAX_PREFIX_COUNT { Ok(ControlFlow::Break(()))
prefix_of.insert(derived_word); }
Ok(ControlFlow::Continue(())) })?;
} else {
Ok(ControlFlow::Break(()))
}
},
)?;
} }
let synonyms = ctx.index.synonyms(ctx.txn)?; let synonyms = ctx.index.synonyms(ctx.txn)?;
let mut synonym_word_count = 0; let mut synonym_word_count = 0;
@@ -295,18 +281,13 @@ impl Interned<QueryTerm> {
let mut one_typo_words = BTreeSet::new(); let mut one_typo_words = BTreeSet::new();
if *max_nbr_typos > 0 { if *max_nbr_typos > 0 {
find_zero_one_typo_derivations(ctx, original, is_prefix, |derived_word, nbr_typos| { find_one_typo_derivations(ctx, original, is_prefix, |derived_word| {
match nbr_typos { if one_typo_words.len() < limits::MAX_ONE_TYPO_COUNT {
ZeroOrOneTypo::Zero => {} one_typo_words.insert(derived_word);
ZeroOrOneTypo::One => { Ok(ControlFlow::Continue(()))
if one_typo_words.len() < limits::MAX_ONE_TYPO_COUNT { } else {
one_typo_words.insert(derived_word); Ok(ControlFlow::Break(()))
} else {
return Ok(ControlFlow::Break(()));
}
}
} }
Ok(ControlFlow::Continue(()))
})?; })?;
} }
@@ -357,7 +338,7 @@ impl Interned<QueryTerm> {
let mut two_typo_words = BTreeSet::new(); let mut two_typo_words = BTreeSet::new();
if *max_nbr_typos > 0 { if *max_nbr_typos > 0 {
find_zero_one_two_typo_derivations( find_one_two_typo_derivations(
*original, *original,
*is_prefix, *is_prefix,
ctx.index.words_fst(ctx.txn)?, ctx.index.words_fst(ctx.txn)?,
@@ -370,7 +351,6 @@ impl Interned<QueryTerm> {
return Ok(ControlFlow::Break(())); return Ok(ControlFlow::Break(()));
} }
match nbr_typos { match nbr_typos {
NumberOfTypos::Zero => {}
NumberOfTypos::One => { NumberOfTypos::One => {
if one_typo_words.len() < limits::MAX_ONE_TYPO_COUNT { if one_typo_words.len() < limits::MAX_ONE_TYPO_COUNT {
one_typo_words.insert(derived_word); one_typo_words.insert(derived_word);

View File

@@ -28,6 +28,7 @@ pub use self::helpers::*;
pub use self::transform::{Transform, TransformOutput}; pub use self::transform::{Transform, TransformOutput};
use super::facet::clear_facet_levels_based_on_settings_diff; use super::facet::clear_facet_levels_based_on_settings_diff;
use super::new::StdResult; use super::new::StdResult;
use crate::database_stats::DatabaseStats;
use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError}; use crate::error::{Error, InternalError};
use crate::index::{PrefixSearch, PrefixSettings}; use crate::index::{PrefixSearch, PrefixSettings};
@@ -476,7 +477,8 @@ where
if !settings_diff.settings_update_only { if !settings_diff.settings_update_only {
// Update the stats of the documents database when there is a document update. // Update the stats of the documents database when there is a document update.
self.index.update_documents_stats(self.wtxn, modified_docids)?; let stats = DatabaseStats::new(self.index.documents.remap_data_type(), self.wtxn)?;
self.index.put_documents_stats(self.wtxn, stats)?;
} }
// We write the field distribution into the main database // We write the field distribution into the main database
self.index.put_field_distribution(self.wtxn, &field_distribution)?; self.index.put_field_distribution(self.wtxn, &field_distribution)?;

View File

@@ -1,5 +1,6 @@
use bumpalo::Bump; use bumpalo::Bump;
use heed::RoTxn; use heed::RoTxn;
use serde_json::Value;
use super::document::{ use super::document::{
Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions, Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
@@ -10,7 +11,7 @@ use super::vector_document::{
use crate::attribute_patterns::PatternMatch; use crate::attribute_patterns::PatternMatch;
use crate::documents::FieldIdMapper; use crate::documents::FieldIdMapper;
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::{DocumentId, Index, Result}; use crate::{DocumentId, Index, InternalError, Result};
pub enum DocumentChange<'doc> { pub enum DocumentChange<'doc> {
Deletion(Deletion<'doc>), Deletion(Deletion<'doc>),
@@ -243,6 +244,29 @@ impl<'doc> Update<'doc> {
Ok(has_deleted_fields) Ok(has_deleted_fields)
} }
/// Returns `true` if the geo fields have changed.
pub fn has_changed_for_geo_fields<'t, Mapper: FieldIdMapper>(
&self,
rtxn: &'t RoTxn,
index: &'t Index,
mapper: &'t Mapper,
) -> Result<bool> {
let current = self.current(rtxn, index, mapper)?;
let current_geo = current.geo_field()?;
let updated_geo = self.only_changed_fields().geo_field()?;
match (current_geo, updated_geo) {
(Some(current_geo), Some(updated_geo)) => {
let current: Value =
serde_json::from_str(current_geo.get()).map_err(InternalError::SerdeJson)?;
let updated: Value =
serde_json::from_str(updated_geo.get()).map_err(InternalError::SerdeJson)?;
Ok(current != updated)
}
(None, None) => Ok(false),
_ => Ok(true),
}
}
pub fn only_changed_vectors( pub fn only_changed_vectors(
&self, &self,
doc_alloc: &'doc Bump, doc_alloc: &'doc Bump,

View File

@@ -117,7 +117,7 @@ impl FacetedDocidsExtractor {
}, },
), ),
DocumentChange::Update(inner) => { DocumentChange::Update(inner) => {
if !inner.has_changed_for_fields( let has_changed = inner.has_changed_for_fields(
&mut |field_name| { &mut |field_name| {
match_faceted_field( match_faceted_field(
field_name, field_name,
@@ -130,7 +130,10 @@ impl FacetedDocidsExtractor {
rtxn, rtxn,
index, index,
context.db_fields_ids_map, context.db_fields_ids_map,
)? { )?;
let has_changed_for_geo_fields =
inner.has_changed_for_geo_fields(rtxn, index, context.db_fields_ids_map)?;
if !has_changed && !has_changed_for_geo_fields {
return Ok(()); return Ok(());
} }

View File

@@ -121,6 +121,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
// do we have set embeddings? // do we have set embeddings?
if let Some(embeddings) = new_vectors.embeddings { if let Some(embeddings) = new_vectors.embeddings {
chunks.set_vectors( chunks.set_vectors(
update.external_document_id(),
update.docid(), update.docid(),
embeddings embeddings
.into_vec(&context.doc_alloc, embedder_name) .into_vec(&context.doc_alloc, embedder_name)
@@ -128,7 +129,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
document_id: update.external_document_id().to_string(), document_id: update.external_document_id().to_string(),
error: error.to_string(), error: error.to_string(),
})?, })?,
); )?;
} else if new_vectors.regenerate { } else if new_vectors.regenerate {
let new_rendered = prompt.render_document( let new_rendered = prompt.render_document(
update.external_document_id(), update.external_document_id(),
@@ -209,6 +210,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
chunks.set_regenerate(insertion.docid(), new_vectors.regenerate); chunks.set_regenerate(insertion.docid(), new_vectors.regenerate);
if let Some(embeddings) = new_vectors.embeddings { if let Some(embeddings) = new_vectors.embeddings {
chunks.set_vectors( chunks.set_vectors(
insertion.external_document_id(),
insertion.docid(), insertion.docid(),
embeddings embeddings
.into_vec(&context.doc_alloc, embedder_name) .into_vec(&context.doc_alloc, embedder_name)
@@ -218,7 +220,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
.to_string(), .to_string(),
error: error.to_string(), error: error.to_string(),
})?, })?,
); )?;
} else if new_vectors.regenerate { } else if new_vectors.regenerate {
let rendered = prompt.render_document( let rendered = prompt.render_document(
insertion.external_document_id(), insertion.external_document_id(),
@@ -273,6 +275,7 @@ struct Chunks<'a, 'b, 'extractor> {
embedder: &'a Embedder, embedder: &'a Embedder,
embedder_id: u8, embedder_id: u8,
embedder_name: &'a str, embedder_name: &'a str,
dimensions: usize,
prompt: &'a Prompt, prompt: &'a Prompt,
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>, user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
@@ -297,6 +300,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint(); let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
let texts = BVec::with_capacity_in(capacity, doc_alloc); let texts = BVec::with_capacity_in(capacity, doc_alloc);
let ids = BVec::with_capacity_in(capacity, doc_alloc); let ids = BVec::with_capacity_in(capacity, doc_alloc);
let dimensions = embedder.dimensions();
Self { Self {
texts, texts,
ids, ids,
@@ -309,6 +313,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
embedder_name, embedder_name,
user_provided, user_provided,
has_manual_generation: None, has_manual_generation: None,
dimensions,
} }
} }
@@ -490,7 +495,25 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
} }
} }
fn set_vectors(&self, docid: DocumentId, embeddings: Vec<Embedding>) { fn set_vectors(
&self,
external_docid: &'a str,
docid: DocumentId,
embeddings: Vec<Embedding>,
) -> Result<()> {
for (embedding_index, embedding) in embeddings.iter().enumerate() {
if embedding.len() != self.dimensions {
return Err(UserError::InvalidIndexingVectorDimensions {
expected: self.dimensions,
found: embedding.len(),
embedder_name: self.embedder_name.to_string(),
document_id: external_docid.to_string(),
embedding_index,
}
.into());
}
}
self.sender.set_vectors(docid, self.embedder_id, embeddings).unwrap(); self.sender.set_vectors(docid, self.embedder_id, embeddings).unwrap();
Ok(())
} }
} }

View File

@@ -13,6 +13,7 @@ use super::super::thread_local::{FullySend, ThreadLocal};
use super::super::FacetFieldIdsDelta; use super::super::FacetFieldIdsDelta;
use super::document_changes::{extract, DocumentChanges, IndexingContext}; use super::document_changes::{extract, DocumentChanges, IndexingContext};
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::progress::MergingWordCache;
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::merger::merge_and_send_rtree;
@@ -96,6 +97,7 @@ where
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted");
let _entered = span.enter(); let _entered = span.enter();
indexing_context.progress.update_progress(IndexingStep::MergingFacetCaches);
facet_field_ids_delta = merge_and_send_facet_docids( facet_field_ids_delta = merge_and_send_facet_docids(
caches, caches,
@@ -117,7 +119,6 @@ where
} = { } = {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
WordDocidsExtractors::run_extraction( WordDocidsExtractors::run_extraction(
document_changes, document_changes,
indexing_context, indexing_context,
@@ -126,9 +127,13 @@ where
)? )?
}; };
indexing_context.progress.update_progress(IndexingStep::MergingWordCaches);
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordDocids);
merge_and_send_docids( merge_and_send_docids(
word_docids, word_docids,
index.word_docids.remap_types(), index.word_docids.remap_types(),
@@ -142,6 +147,8 @@ where
let span = let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter(); let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordFieldIdDocids);
merge_and_send_docids( merge_and_send_docids(
word_fid_docids, word_fid_docids,
index.word_fid_docids.remap_types(), index.word_fid_docids.remap_types(),
@@ -155,6 +162,8 @@ where
let span = let span =
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter(); let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::ExactWordDocids);
merge_and_send_docids( merge_and_send_docids(
exact_word_docids, exact_word_docids,
index.exact_word_docids.remap_types(), index.exact_word_docids.remap_types(),
@@ -168,6 +177,8 @@ where
let span = let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter(); let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordPositionDocids);
merge_and_send_docids( merge_and_send_docids(
word_position_docids, word_position_docids,
index.word_position_docids.remap_types(), index.word_position_docids.remap_types(),
@@ -181,6 +192,8 @@ where
let span = let span =
tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter(); let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::FieldIdWordCountDocids);
merge_and_send_docids( merge_and_send_docids(
fid_word_count_docids, fid_word_count_docids,
index.field_id_word_count_docids.remap_types(), index.field_id_word_count_docids.remap_types(),
@@ -210,6 +223,7 @@ where
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
let _entered = span.enter(); let _entered = span.enter();
indexing_context.progress.update_progress(IndexingStep::MergingWordProximity);
merge_and_send_docids( merge_and_send_docids(
caches, caches,

View File

@@ -234,7 +234,6 @@ where
embedders, embedders,
field_distribution, field_distribution,
document_ids, document_ids,
modified_docids,
)?; )?;
Ok(congestion) Ok(congestion)

View File

@@ -7,12 +7,13 @@ use itertools::{merge_join_by, EitherOrBoth};
use super::document_changes::IndexingContext; use super::document_changes::IndexingContext;
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::progress::Progress;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::facet::new_incremental::FacetsUpdateIncremental; use crate::update::facet::new_incremental::FacetsUpdateIncremental;
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE}; use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
use crate::update::new::facet_search_builder::FacetSearchBuilder; use crate::update::new::facet_search_builder::FacetSearchBuilder;
use crate::update::new::merger::FacetFieldIdDelta; use crate::update::new::merger::FacetFieldIdDelta;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::{IndexingStep, PostProcessingFacets, PostProcessingWords};
use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
use crate::update::new::words_prefix_docids::{ use crate::update::new::words_prefix_docids::{
compute_exact_word_prefix_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_exact_word_prefix_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids,
@@ -33,11 +34,23 @@ where
{ {
let index = indexing_context.index; let index = indexing_context.index;
indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets); indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets);
compute_facet_level_database(index, wtxn, facet_field_ids_delta, &mut global_fields_ids_map)?; compute_facet_level_database(
compute_facet_search_database(index, wtxn, global_fields_ids_map)?; index,
wtxn,
facet_field_ids_delta,
&mut global_fields_ids_map,
indexing_context.progress,
)?;
compute_facet_search_database(index, wtxn, global_fields_ids_map, indexing_context.progress)?;
indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); indexing_context.progress.update_progress(IndexingStep::PostProcessingWords);
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { if let Some(prefix_delta) = compute_word_fst(index, wtxn, indexing_context.progress)? {
compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?; compute_prefix_database(
index,
wtxn,
prefix_delta,
indexing_context.grenad_parameters,
indexing_context.progress,
)?;
}; };
Ok(()) Ok(())
} }
@@ -48,21 +61,32 @@ fn compute_prefix_database(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
prefix_delta: PrefixDelta, prefix_delta: PrefixDelta,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
progress: &Progress,
) -> Result<()> { ) -> Result<()> {
let PrefixDelta { modified, deleted } = prefix_delta; let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids
progress.update_progress(PostProcessingWords::WordPrefixDocids);
compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute exact word prefix docids
progress.update_progress(PostProcessingWords::ExactWordPrefixDocids);
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute word prefix fid docids
progress.update_progress(PostProcessingWords::WordPrefixFieldIdDocids);
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute word prefix position docids
progress.update_progress(PostProcessingWords::WordPrefixPositionDocids);
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters) compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters)
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing")] #[tracing::instrument(level = "trace", skip_all, target = "indexing")]
fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> { fn compute_word_fst(
index: &Index,
wtxn: &mut RwTxn,
progress: &Progress,
) -> Result<Option<PrefixDelta>> {
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
progress.update_progress(PostProcessingWords::WordFst);
let words_fst = index.words_fst(&rtxn)?; let words_fst = index.words_fst(&rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
let prefix_settings = index.prefix_settings(&rtxn)?; let prefix_settings = index.prefix_settings(&rtxn)?;
@@ -112,8 +136,10 @@ fn compute_facet_search_database(
index: &Index, index: &Index,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
global_fields_ids_map: GlobalFieldsIdsMap, global_fields_ids_map: GlobalFieldsIdsMap,
progress: &Progress,
) -> Result<()> { ) -> Result<()> {
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
progress.update_progress(PostProcessingFacets::FacetSearch);
// if the facet search is not enabled, we can skip the rest of the function // if the facet search is not enabled, we can skip the rest of the function
if !index.facet_search(wtxn)? { if !index.facet_search(wtxn)? {
@@ -171,10 +197,16 @@ fn compute_facet_level_database(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
mut facet_field_ids_delta: FacetFieldIdsDelta, mut facet_field_ids_delta: FacetFieldIdsDelta,
global_fields_ids_map: &mut GlobalFieldsIdsMap, global_fields_ids_map: &mut GlobalFieldsIdsMap,
progress: &Progress,
) -> Result<()> { ) -> Result<()> {
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
let filterable_attributes_rules = index.filterable_attributes_rules(&rtxn)?; let filterable_attributes_rules = index.filterable_attributes_rules(&rtxn)?;
for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() { let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_string_delta().collect();
// We move all bulks at the front and incrementals (others) at the end.
deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 });
for (fid, delta) in deltas {
// skip field ids that should not be facet leveled // skip field ids that should not be facet leveled
let Some(metadata) = global_fields_ids_map.metadata(fid) else { let Some(metadata) = global_fields_ids_map.metadata(fid) else {
continue; continue;
@@ -187,11 +219,13 @@ fn compute_facet_level_database(
let _entered = span.enter(); let _entered = span.enter();
match delta { match delta {
FacetFieldIdDelta::Bulk => { FacetFieldIdDelta::Bulk => {
progress.update_progress(PostProcessingFacets::StringsBulk);
tracing::debug!(%fid, "bulk string facet processing"); tracing::debug!(%fid, "bulk string facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String) FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
.execute(wtxn)? .execute(wtxn)?
} }
FacetFieldIdDelta::Incremental(delta_data) => { FacetFieldIdDelta::Incremental(delta_data) => {
progress.update_progress(PostProcessingFacets::StringsIncremental);
tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing"); tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing");
FacetsUpdateIncremental::new( FacetsUpdateIncremental::new(
index, index,
@@ -207,16 +241,22 @@ fn compute_facet_level_database(
} }
} }
for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() { let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_number_delta().collect();
// We move all bulks at the front and incrementals (others) at the end.
deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 });
for (fid, delta) in deltas {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
let _entered = span.enter(); let _entered = span.enter();
match delta { match delta {
FacetFieldIdDelta::Bulk => { FacetFieldIdDelta::Bulk => {
progress.update_progress(PostProcessingFacets::NumbersBulk);
tracing::debug!(%fid, "bulk number facet processing"); tracing::debug!(%fid, "bulk number facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number) FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number)
.execute(wtxn)? .execute(wtxn)?
} }
FacetFieldIdDelta::Incremental(delta_data) => { FacetFieldIdDelta::Incremental(delta_data) => {
progress.update_progress(PostProcessingFacets::NumbersIncremental);
tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing"); tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing");
FacetsUpdateIncremental::new( FacetsUpdateIncremental::new(
index, index,

View File

@@ -7,6 +7,7 @@ use rand::SeedableRng as _;
use time::OffsetDateTime; use time::OffsetDateTime;
use super::super::channel::*; use super::super::channel::*;
use crate::database_stats::DatabaseStats;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
@@ -142,7 +143,6 @@ pub(super) fn update_index(
embedders: EmbeddingConfigs, embedders: EmbeddingConfigs,
field_distribution: std::collections::BTreeMap<String, u64>, field_distribution: std::collections::BTreeMap<String, u64>,
document_ids: roaring::RoaringBitmap, document_ids: roaring::RoaringBitmap,
modified_docids: roaring::RoaringBitmap,
) -> Result<()> { ) -> Result<()> {
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
if let Some(new_primary_key) = new_primary_key { if let Some(new_primary_key) = new_primary_key {
@@ -153,7 +153,8 @@ pub(super) fn update_index(
index.put_field_distribution(wtxn, &field_distribution)?; index.put_field_distribution(wtxn, &field_distribution)?;
index.put_documents_ids(wtxn, &document_ids)?; index.put_documents_ids(wtxn, &document_ids)?;
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
index.update_documents_stats(wtxn, modified_docids)?; let stats = DatabaseStats::new(index.documents.remap_data_type(), wtxn)?;
index.put_documents_stats(wtxn, stats)?;
Ok(()) Ok(())
} }

View File

@@ -82,14 +82,8 @@ where
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get(&rtxn, key)?; let current = database.get(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => docids_sender.write(key, &bitmap),
docids_sender.write(key, &bitmap)?; Operation::Delete => docids_sender.delete(key),
Ok(())
}
Operation::Delete => {
docids_sender.delete(key)?;
Ok(())
}
Operation::Ignore => Ok(()), Operation::Ignore => Ok(()),
} }
}) })
@@ -130,7 +124,6 @@ pub fn merge_and_send_facet_docids<'extractor>(
Operation::Ignore => Ok(()), Operation::Ignore => Ok(()),
} }
})?; })?;
Ok(facet_field_ids_delta) Ok(facet_field_ids_delta)
}) })
.reduce( .reduce(

View File

@@ -1,52 +1,42 @@
use std::borrow::Cow; use crate::make_enum_progress;
use enum_iterator::Sequence; make_enum_progress! {
pub enum IndexingStep {
use crate::progress::Step; PreparingPayloads,
ExtractingDocuments,
#[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)] ExtractingFacets,
#[repr(u8)] ExtractingWords,
pub enum IndexingStep { ExtractingWordProximity,
PreparingPayloads, ExtractingEmbeddings,
ExtractingDocuments, MergingFacetCaches,
ExtractingFacets, MergingWordCaches,
ExtractingWords, MergingWordProximity,
ExtractingWordProximity, WritingGeoPoints,
ExtractingEmbeddings, WaitingForDatabaseWrites,
WritingGeoPoints, WaitingForExtractors,
WaitingForDatabaseWrites, WritingEmbeddingsToDatabase,
WaitingForExtractors, PostProcessingFacets,
WritingEmbeddingsToDatabase, PostProcessingWords,
PostProcessingFacets, Finalizing,
PostProcessingWords, }
Finalizing, }
}
make_enum_progress! {
impl Step for IndexingStep { pub enum PostProcessingFacets {
fn name(&self) -> Cow<'static, str> { StringsBulk,
match self { StringsIncremental,
IndexingStep::PreparingPayloads => "preparing update file", NumbersBulk,
IndexingStep::ExtractingDocuments => "extracting documents", NumbersIncremental,
IndexingStep::ExtractingFacets => "extracting facets", FacetSearch,
IndexingStep::ExtractingWords => "extracting words", }
IndexingStep::ExtractingWordProximity => "extracting word proximity", }
IndexingStep::ExtractingEmbeddings => "extracting embeddings",
IndexingStep::WritingGeoPoints => "writing geo points", make_enum_progress! {
IndexingStep::WaitingForDatabaseWrites => "waiting for database writes", pub enum PostProcessingWords {
IndexingStep::WaitingForExtractors => "waiting for extractors", WordFst,
IndexingStep::WritingEmbeddingsToDatabase => "writing embeddings to database", WordPrefixDocids,
IndexingStep::PostProcessingFacets => "post-processing facets", ExactWordPrefixDocids,
IndexingStep::PostProcessingWords => "post-processing words", WordPrefixFieldIdDocids,
IndexingStep::Finalizing => "finalizing", WordPrefixPositionDocids,
}
.into()
}
fn current(&self) -> u32 {
*self as u32
}
fn total(&self) -> u32 {
Self::CARDINALITY as u32
} }
} }

View File

@@ -1331,8 +1331,21 @@ impl InnerIndexSettingsDiff {
let cache_exact_attributes = old_settings.exact_attributes != new_settings.exact_attributes; let cache_exact_attributes = old_settings.exact_attributes != new_settings.exact_attributes;
let cache_user_defined_searchables = old_settings.user_defined_searchable_attributes // Check if any searchable field has been added or removed form the list,
!= new_settings.user_defined_searchable_attributes; // Changing the order should not be considered as a change for reindexing.
let cache_user_defined_searchables = match (
&old_settings.user_defined_searchable_attributes,
&new_settings.user_defined_searchable_attributes,
) {
(Some(old), Some(new)) => {
let old: BTreeSet<_> = old.iter().collect();
let new: BTreeSet<_> = new.iter().collect();
old != new
}
(None, None) => false,
_otherwise => true,
};
// if the user-defined searchables changed, then we need to reindex prompts. // if the user-defined searchables changed, then we need to reindex prompts.
if cache_user_defined_searchables { if cache_user_defined_searchables {