Compare commits

...

32 Commits

Author SHA1 Message Date
Clément Renault
d5a5372aba Only provide the last batch info 2025-11-20 12:02:29 +01:00
Kerollmops
a8d55562e9 Expose the three last batches timings 2025-11-03 16:01:05 +01:00
Kerollmops
40d649ec9e Update utoipa 2025-11-03 15:53:14 +01:00
Kerollmops
c272ac8204 Reset metrics values to keep current steps only 2025-11-03 15:41:54 +01:00
Kerollmops
e18c677f0e Expose the step currently running on the metrics route 2025-11-03 15:28:58 +01:00
Kerollmops
84a288da57 Simplify the auth filters 2025-11-03 15:11:28 +01:00
Kerollmops
cbfc325b56 Expose the metrics for the last finished batch and not the processing
one
2025-11-03 15:10:23 +01:00
Kerollmops
ea640b076e Expose batch progress traces on the metrics route 2025-10-24 14:36:21 +02:00
Clément Renault
6df196034e Merge pull request #5950 from meilisearch/update-version-v1.24.0
Update version to v1.24.0
2025-10-20 11:17:15 +00:00
Clément Renault
a63762737c Upgrade index scheduler 2025-10-20 12:22:27 +02:00
Clément Renault
77394bd4b9 Update insta tests 2025-10-20 10:54:16 +02:00
Clément Renault
cb87201c8b Fix dumpless upgrade and do nothing 2025-10-20 10:42:35 +02:00
Clément Renault
1a9c38794f Bump version to v1.24.0 2025-10-20 10:38:48 +02:00
Clément Renault
34233efb63 Merge pull request #5946 from meilisearch/fix-compaction-issues
Improve compaction behaviors
2025-10-16 15:42:38 +00:00
Clément Renault
af0608ebd6 Continue to the next index if index doesn't exists 2025-10-16 16:39:51 +02:00
Clément Renault
8c7e5c094e Improve the task batch stopped message 2025-10-16 16:39:50 +02:00
Clément Renault
c064737137 Remove duplicated logic in auto batching of tasks 2025-10-16 16:33:20 +02:00
Clément Renault
1d188a7ad3 Make the compaction tasks a priority over the export ones 2025-10-16 13:01:23 +02:00
Clément Renault
66a6b65716 Merge pull request #5945 from meilisearch/search-cutoff-vector-store
Search cutoff vector store
2025-10-16 09:43:20 +00:00
Louis Dureuil
326652a399 Update hannoy 2025-10-16 10:34:54 +02:00
Louis Dureuil
59316e8d5a add unit test 2025-10-16 10:34:20 +02:00
Louis Dureuil
76d7f20c87 fix snap 2025-10-16 10:34:19 +02:00
Louis Dureuil
380b2797a5 Share the same budget for all queries of a given index in federated search 2025-10-16 10:34:19 +02:00
Clémentine
1dd58f9bec Merge pull request #5866 from PedroTroller/build/alpine3.22
Bump Dockerfile alpine version to 3.22
2025-10-16 07:22:43 +00:00
Kerollmops
ddc76ad0dc Delete the leftover compaction files from canceled operations 2025-10-15 16:49:25 +02:00
Kerollmops
ffacf1c002 Introduce the new IndexMapper index path method 2025-10-15 16:49:25 +02:00
Kerollmops
5a49b93b77 Use constant tempfile name to reuse tempfile 2025-10-15 16:49:25 +02:00
Louis Dureuil
918a6eaec9 Implement for vector store ranking rule 2025-10-15 16:31:47 +02:00
Louis Dureuil
1e6ce70e3e "Uninteresting" ranking rule implementations 2025-10-15 16:31:47 +02:00
Louis Dureuil
b418054ee4 Change bucket_sort logic to pass the time budget and allow for retrieving non-blocking buckets 2025-10-15 16:31:47 +02:00
Louis Dureuil
58f30e9d8a Change RankingRule trait to account for budget 2025-10-15 16:31:46 +02:00
PedroTroller
9f4dcd04e9 Bump alpine version to 3.22 2025-09-18 17:08:36 +02:00
39 changed files with 801 additions and 172 deletions

45
Cargo.lock generated
View File

@@ -589,7 +589,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
[[package]]
name = "benchmarks"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"anyhow",
"bumpalo",
@@ -799,7 +799,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"anyhow",
"time",
@@ -1829,7 +1829,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"anyhow",
"big_s",
@@ -2072,7 +2072,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-store"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"tempfile",
"thiserror 2.0.16",
@@ -2094,7 +2094,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"insta",
"levenshtein_automata",
@@ -2122,7 +2122,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"criterion",
"serde_json",
@@ -2279,7 +2279,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"arbitrary",
"bumpalo",
@@ -2758,9 +2758,9 @@ dependencies = [
[[package]]
name = "hannoy"
version = "0.0.9-nested-rtxns"
version = "0.0.9-nested-rtxns-2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc5a945b92b063e677d658cfcc7cb6dec2502fe44631f017084938f14d6ce30e"
checksum = "06eda090938d9dcd568c8c2a5de383047ed9191578ebf4a342d2975d16e621f2"
dependencies = [
"bytemuck",
"byteorder",
@@ -2967,6 +2967,12 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]]
name = "hyper"
version = "1.7.0"
@@ -3233,7 +3239,7 @@ dependencies = [
[[package]]
name = "index-scheduler"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"anyhow",
"backoff",
@@ -3487,7 +3493,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"criterion",
"serde_json",
@@ -3996,7 +4002,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"insta",
"md5",
@@ -4007,7 +4013,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"actix-cors",
"actix-http",
@@ -4036,6 +4042,7 @@ dependencies = [
"futures",
"futures-util",
"hex",
"humantime",
"index-scheduler",
"indexmap",
"insta",
@@ -4104,7 +4111,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@@ -4123,7 +4130,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"actix-web",
"anyhow",
@@ -4158,7 +4165,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"anyhow",
"clap",
@@ -4192,7 +4199,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"allocator-api2 0.3.1",
"arroy",
@@ -4773,7 +4780,7 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "permissive-json-pointer"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"big_s",
"serde_json",
@@ -7820,7 +7827,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.23.0"
version = "1.24.0"
dependencies = [
"anyhow",
"build-info",

View File

@@ -23,7 +23,7 @@ members = [
]
[workspace.package]
version = "1.23.0"
version = "1.24.0"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@@ -1,5 +1,5 @@
# Compile
FROM rust:1.89-alpine3.20 AS compiler
FROM rust:1.89-alpine3.22 AS compiler
RUN apk add -q --no-cache build-base openssl-dev
@@ -20,7 +20,7 @@ RUN set -eux; \
cargo build --release -p meilisearch -p meilitool
# Run
FROM alpine:3.20
FROM alpine:3.22
LABEL org.opencontainers.image.source="https://github.com/meilisearch/meilisearch"
ENV MEILI_HTTP_ADDR 0.0.0.0:7700

View File

@@ -199,7 +199,7 @@ impl IndexMapper {
let uuid = Uuid::new_v4();
self.index_mapping.put(&mut wtxn, name, &uuid)?;
let index_path = self.base_path.join(uuid.to_string());
let index_path = self.index_path(uuid);
fs::create_dir_all(&index_path)?;
// Error if the UUIDv4 somehow already exists in the map, since it should be fresh.
@@ -286,7 +286,7 @@ impl IndexMapper {
};
let index_map = self.index_map.clone();
let index_path = self.base_path.join(uuid.to_string());
let index_path = self.index_path(uuid);
let index_name = name.to_string();
thread::Builder::new()
.name(String::from("index_deleter"))
@@ -408,7 +408,7 @@ impl IndexMapper {
} else {
continue;
};
let index_path = self.base_path.join(uuid.to_string());
let index_path = self.index_path(uuid);
// take the lock to reopen the environment.
reopen
.reopen(&mut self.index_map.write().unwrap(), &index_path)
@@ -425,7 +425,7 @@ impl IndexMapper {
// if it's not already there.
match index_map.get(&uuid) {
Missing => {
let index_path = self.base_path.join(uuid.to_string());
let index_path = self.index_path(uuid);
break index_map
.create(
@@ -452,6 +452,14 @@ impl IndexMapper {
Ok(index)
}
/// Returns the path of the index.
///
/// The folder located at this path is containing the data.mdb,
/// the lock.mdb and an optional data.mdb.cpy file.
pub fn index_path(&self, uuid: Uuid) -> PathBuf {
self.base_path.join(uuid.to_string())
}
pub fn rollback_index(
&self,
rtxn: &RoTxn,
@@ -492,7 +500,7 @@ impl IndexMapper {
};
}
let index_path = self.base_path.join(uuid.to_string());
let index_path = self.index_path(uuid);
Index::rollback(milli::heed::EnvOpenOptions::new().read_txn_without_tls(), index_path, to)
.map_err(|err| crate::Error::from_milli(err, Some(name.to_string())))
}

View File

@@ -75,6 +75,7 @@ make_enum_progress! {
pub enum TaskCancelationProgress {
RetrievingTasks,
CancelingUpgrade,
CleaningCompactionLeftover,
UpdatingTasks,
}
}

View File

@@ -25,7 +25,6 @@ enum AutobatchKind {
IndexDeletion,
IndexUpdate,
IndexSwap,
IndexCompaction,
}
impl AutobatchKind {
@@ -69,14 +68,14 @@ impl From<KindWithContent> for AutobatchKind {
KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation,
KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate,
KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap,
KindWithContent::IndexCompaction { .. } => AutobatchKind::IndexCompaction,
KindWithContent::TaskCancelation { .. }
KindWithContent::IndexCompaction { .. }
| KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. }
| KindWithContent::Export { .. }
| KindWithContent::UpgradeDatabase { .. }
| KindWithContent::SnapshotCreation => {
panic!("The autobatcher should never be called with tasks that don't apply to an index.")
panic!("The autobatcher should never be called with tasks with special priority or that don't apply to an index.")
}
}
}
@@ -120,9 +119,6 @@ pub enum BatchKind {
IndexSwap {
id: TaskId,
},
IndexCompaction {
id: TaskId,
},
}
impl BatchKind {
@@ -188,13 +184,6 @@ impl BatchKind {
)),
false,
),
K::IndexCompaction => (
Break((
BatchKind::IndexCompaction { id: task_id },
BatchStopReason::TaskCannotBeBatched { kind, id: task_id },
)),
false,
),
K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false),
K::DocumentImport { allow_index_creation, primary_key: pk }
if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() =>
@@ -300,7 +289,7 @@ impl BatchKind {
match (self, autobatch_kind) {
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::IndexCompaction) => {
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => {
Break((this, BatchStopReason::TaskCannotBeBatched { kind, id }))
},
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
@@ -497,7 +486,6 @@ impl BatchKind {
| BatchKind::IndexDeletion { .. }
| BatchKind::IndexUpdate { .. }
| BatchKind::IndexSwap { .. }
| BatchKind::IndexCompaction { .. }
| BatchKind::DocumentEdition { .. },
_,
) => {

View File

@@ -437,12 +437,6 @@ impl IndexScheduler {
current_batch.processing(Some(&mut task));
Ok(Some(Batch::IndexSwap { task }))
}
BatchKind::IndexCompaction { id } => {
let mut task =
self.queue.tasks.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task));
Ok(Some(Batch::IndexCompaction { index_uid, task }))
}
}
}
@@ -525,17 +519,33 @@ impl IndexScheduler {
return Ok(Some((Batch::TaskDeletions(tasks), current_batch)));
}
// 3. we batch the export.
// 3. we get the next task to compact
let to_compact = self.queue.tasks.get_kind(rtxn, Kind::IndexCompaction)? & enqueued;
if let Some(task_id) = to_compact.min() {
let mut task =
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task));
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::IndexCompaction,
id: task_id,
});
let index_uid =
task.index_uid().expect("Compaction task must have an index uid").to_owned();
return Ok(Some((Batch::IndexCompaction { index_uid, task }, current_batch)));
}
// 4. we batch the export.
let to_export = self.queue.tasks.get_kind(rtxn, Kind::Export)? & enqueued;
if !to_export.is_empty() {
let task_id = to_export.iter().next().expect("There must be at least one export task");
let mut task = self.queue.tasks.get_task(rtxn, task_id)?.unwrap();
current_batch.processing([&mut task]);
current_batch.reason(BatchStopReason::TaskKindCannotBeBatched { kind: Kind::Export });
current_batch
.reason(BatchStopReason::TaskCannotBeBatched { kind: Kind::Export, id: task_id });
return Ok(Some((Batch::Export { task }, current_batch)));
}
// 4. we batch the snapshot.
// 5. we batch the snapshot.
let to_snapshot = self.queue.tasks.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued;
if !to_snapshot.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?;
@@ -545,7 +555,7 @@ impl IndexScheduler {
return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
}
// 5. we batch the dumps.
// 6. we batch the dumps.
let to_dump = self.queue.tasks.get_kind(rtxn, Kind::DumpCreation)? & enqueued;
if let Some(to_dump) = to_dump.min() {
let mut task =
@@ -558,7 +568,7 @@ impl IndexScheduler {
return Ok(Some((Batch::Dump(task), current_batch)));
}
// 6. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
// 7. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
let mut task =
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;

View File

@@ -1,5 +1,6 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::io::{Seek, SeekFrom};
use std::fs::{remove_file, File};
use std::io::{ErrorKind, Seek, SeekFrom};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering;
@@ -13,7 +14,7 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use milli::update::Settings as MilliSettings;
use roaring::RoaringBitmap;
use tempfile::PersistError;
use tempfile::{PersistError, TempPath};
use time::OffsetDateTime;
use super::create_batch::Batch;
@@ -28,6 +29,9 @@ use crate::utils::{
};
use crate::{Error, IndexScheduler, Result, TaskId};
/// The name of the copy of the data.mdb file used during compaction.
const DATA_MDB_COPY_NAME: &str = "data.mdb.cpy";
#[derive(Debug, Default)]
pub struct ProcessBatchInfo {
/// The write channel congestion. None when unavailable: settings update.
@@ -558,11 +562,12 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.to_string(), index.clone())));
progress.update_progress(IndexCompaction::CreateTemporaryFile);
let pre_size = std::fs::metadata(index.path().join("data.mdb"))?.len();
let mut file = tempfile::Builder::new()
.suffix("data.")
.prefix(".mdb.cpy")
.tempfile_in(index.path())?;
let src_path = index.path().join("data.mdb");
let pre_size = std::fs::metadata(&src_path)?.len();
let dst_path = TempPath::from_path(index.path().join(DATA_MDB_COPY_NAME));
let file = File::create(&dst_path)?;
let mut file = tempfile::NamedTempFile::from_parts(file, dst_path);
// 3. We copy the index data to the temporary file
progress.update_progress(IndexCompaction::CopyAndCompactTheIndex);
@@ -574,7 +579,7 @@ impl IndexScheduler {
// 4. We replace the index data file with the temporary file
progress.update_progress(IndexCompaction::PersistTheCompactedIndex);
match file.persist(index.path().join("data.mdb")) {
match file.persist(src_path) {
Ok(file) => file.sync_all()?,
// TODO see if we have a _resource busy_ error and probably handle this by:
// 1. closing the index, 2. replacing and 3. reopening it
@@ -910,9 +915,10 @@ impl IndexScheduler {
let enqueued_tasks = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
// 0. Check if any upgrade task was matched.
// 0. Check if any upgrade or compaction tasks were matched.
// If so, we cancel all the failed or enqueued upgrade tasks.
let upgrade_tasks = &self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)?;
let compaction_tasks = &self.queue.tasks.get_kind(rtxn, Kind::IndexCompaction)?;
let is_canceling_upgrade = !matched_tasks.is_disjoint(upgrade_tasks);
if is_canceling_upgrade {
let failed_tasks = self.queue.tasks.get_status(rtxn, Status::Failed)?;
@@ -977,7 +983,33 @@ impl IndexScheduler {
}
}
// 3. We now have a list of tasks to cancel, cancel them
// 3. If we are cancelling a compaction task, remove the tempfiles after incomplete compactions
for compaction_task in &tasks_to_cancel & compaction_tasks {
progress.update_progress(TaskCancelationProgress::CleaningCompactionLeftover);
let task = self.queue.tasks.get_task(rtxn, compaction_task)?.unwrap();
let Some(Details::IndexCompaction {
index_uid,
pre_compaction_size: _,
post_compaction_size: _,
}) = task.details
else {
unreachable!("wrong details for compaction task {compaction_task}")
};
let index_path = match self.index_mapper.index_mapping.get(rtxn, &index_uid)? {
Some(index_uuid) => self.index_mapper.index_path(index_uuid),
None => continue,
};
if let Err(e) = remove_file(index_path.join(DATA_MDB_COPY_NAME)) {
match e.kind() {
ErrorKind::NotFound => (),
_ => return Err(Error::IoError(e)),
}
}
}
// 4. We now have a list of tasks to cancel, cancel them
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
progress.update_progress(progress_obj);

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 23, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -57,7 +57,7 @@ girafo: { number_of_documents: 0, field_distribution: {} }
[timestamp] [4,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.23.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
1 {uid: 1, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, stop reason: "created batch containing only task with id 1 of type `indexCreation` that cannot be batched with any other task.", }
2 {uid: 2, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 2 of type `indexCreation` that cannot be batched with any other task.", }
3 {uid: 3, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 3 of type `indexCreation` that cannot be batched with any other task.", }

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 23, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
----------------------------------------------------------------------
### Status:
enqueued [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 23, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 23, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:
@@ -37,7 +37,7 @@ catto [1,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.23.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 23, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
----------------------------------------------------------------------
@@ -40,7 +40,7 @@ doggo [2,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.23.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 23, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -43,7 +43,7 @@ doggo [2,3,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.23.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -47,6 +47,7 @@ pub fn upgrade_index_scheduler(
(1, 21, _) => 0,
(1, 22, _) => 0,
(1, 23, _) => 0,
(1, 24, _) => 0,
(major, minor, patch) => {
if major > current_major
|| (major == current_major && minor > current_minor)

View File

@@ -117,7 +117,7 @@ secrecy = "0.10.3"
actix-web-lab = { version = "0.24.1", default-features = false }
urlencoding = "2.1.3"
backoff = { version = "0.4.0", features = ["tokio"] }
humantime = { version = "2.3.0", default-features = false }
[dev-dependencies]
actix-rt = "2.10.0"

View File

@@ -1,7 +1,8 @@
use lazy_static::lazy_static;
use prometheus::{
opts, register_gauge, register_histogram_vec, register_int_counter_vec, register_int_gauge,
register_int_gauge_vec, Gauge, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
opts, register_gauge, register_gauge_vec, register_histogram_vec, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, Gauge, GaugeVec, HistogramVec, IntCounterVec,
IntGauge, IntGaugeVec,
};
lazy_static! {
@@ -73,6 +74,20 @@ lazy_static! {
&["kind", "value"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE: GaugeVec = register_gauge_vec!(
opts!("meilisearch_batch_running_progress_trace", "The currently running progress trace"),
&["batch_uid", "step_name"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS: IntGaugeVec =
register_int_gauge_vec!(
opts!(
"meilisearch_last_finished_batches_progress_trace_ms",
"The last few batches progress trace in milliseconds"
),
&["batch_uid", "step_name"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_LAST_UPDATE: IntGauge =
register_int_gauge!(opts!("meilisearch_last_update", "Meilisearch Last Update"))
.expect("Can't create a metric");

View File

@@ -4,6 +4,7 @@ use index_scheduler::{IndexScheduler, Query};
use meilisearch_auth::AuthController;
use meilisearch_types::error::ResponseError;
use meilisearch_types::keys::actions;
use meilisearch_types::milli::progress::ProgressStepView;
use meilisearch_types::tasks::Status;
use prometheus::{Encoder, TextEncoder};
use time::OffsetDateTime;
@@ -38,6 +39,12 @@ pub fn configure(config: &mut web::ServiceConfig) {
# HELP meilisearch_db_size_bytes Meilisearch DB Size In Bytes
# TYPE meilisearch_db_size_bytes gauge
meilisearch_db_size_bytes 1130496
# HELP meilisearch_batch_running_progress_trace The currently running progress trace
# TYPE meilisearch_batch_running_progress_trace gauge
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="document"} 0.710618582519409
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="extracting word proximity"} 0.2222222222222222
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="indexing"} 0.6666666666666666
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="processing tasks"} 0
# HELP meilisearch_http_requests_total Meilisearch HTTP requests total
# TYPE meilisearch_http_requests_total counter
meilisearch_http_requests_total{method="GET",path="/metrics",status="400"} 1
@@ -61,6 +68,13 @@ meilisearch_http_response_time_seconds_bucket{method="GET",path="/metrics",le="1
meilisearch_http_response_time_seconds_bucket{method="GET",path="/metrics",le="+Inf"} 0
meilisearch_http_response_time_seconds_sum{method="GET",path="/metrics"} 0
meilisearch_http_response_time_seconds_count{method="GET",path="/metrics"} 0
# HELP meilisearch_last_finished_batches_progress_trace_ms The last few batches progress trace in milliseconds
# TYPE meilisearch_last_finished_batches_progress_trace_ms gauge
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks"} 19360
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes"} 368
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes > preparing payloads"} 367
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes > preparing payloads > payload"} 367
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > indexing"} 18970
# HELP meilisearch_index_count Meilisearch Index Count
# TYPE meilisearch_index_count gauge
meilisearch_index_count 1
@@ -148,6 +162,46 @@ pub async fn get_metrics(
}
}
// Fetch and expose the current progressing step
crate::metrics::MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE.reset();
let (batches, _total) = index_scheduler.get_batches_from_authorized_indexes(
&Query { statuses: Some(vec![Status::Processing]), ..Query::default() },
auth_filters,
)?;
if let Some(batch) = batches.into_iter().next() {
let batch_uid = batch.uid.to_string();
if let Some(progress) = batch.progress {
for ProgressStepView { current_step, finished, total } in progress.steps {
crate::metrics::MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE
.with_label_values(&[batch_uid.as_str(), current_step.as_ref()])
// We return the completion ratio of the current step
.set(finished as f64 / total as f64);
}
}
}
crate::metrics::MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS.reset();
let (batches, _total) = index_scheduler.get_batches_from_authorized_indexes(
// Fetch the finished batches...
&Query { statuses: Some(vec![Status::Succeeded, Status::Failed]), ..Query::default() },
auth_filters,
)?;
// ...and get the last batch only.
if let Some(batch) = batches.into_iter().next() {
let batch_uid = batch.uid.to_string();
for (step_name, duration_str) in batch.stats.progress_trace {
let Some(duration_str) = duration_str.as_str() else { continue };
match humantime::parse_duration(duration_str) {
Ok(duration) => {
crate::metrics::MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS
.with_label_values(&[&batch_uid, &step_name])
.set(duration.as_millis() as i64);
}
Err(e) => tracing::error!("Failed to parse duration: {e}"),
}
}
}
if let Some(last_update) = response.last_update {
crate::metrics::MEILISEARCH_LAST_UPDATE.set(last_update.unix_timestamp());
}

View File

@@ -20,10 +20,10 @@ use tokio::task::JoinHandle;
use uuid::Uuid;
use super::super::ranking_rules::{self, RankingRules};
use super::super::SearchMetadata;
use super::super::{
compute_facet_distribution_stats, prepare_search, AttributesFormat, ComputedFacets, HitMaker,
HitsInfo, RetrieveVectors, SearchHit, SearchKind, SearchQuery, SearchQueryWithIndex,
HitsInfo, RetrieveVectors, SearchHit, SearchKind, SearchMetadata, SearchQuery,
SearchQueryWithIndex,
};
use super::proxy::{proxy_search, ProxySearchError, ProxySearchParams};
use super::types::{
@@ -870,6 +870,12 @@ impl SearchByIndex {
return Err(error);
}
let mut results_by_query = Vec::with_capacity(queries.len());
// all queries for an index share the same budget
let time_budget = match cutoff {
Some(cutoff) => TimeBudget::new(Duration::from_millis(cutoff)),
None => TimeBudget::default(),
};
for QueryByIndex { query, weight, query_index } in queries {
// use an immediately invoked lambda to capture the result without returning from the function
@@ -939,17 +945,13 @@ impl SearchByIndex {
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors);
let time_budget = match cutoff {
Some(cutoff) => TimeBudget::new(Duration::from_millis(cutoff)),
None => TimeBudget::default(),
};
let (mut search, _is_finite_pagination, _max_total_hits, _offset) = prepare_search(
&index,
&rtxn,
&query,
&search_kind,
time_budget,
// clones of `TimeBudget` share the budget rather than restart it
time_budget.clone(),
params.features,
)?;

View File

@@ -43,7 +43,7 @@ async fn version_too_old() {
std::fs::write(db_path.join("VERSION"), "1.11.9999").unwrap();
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.23.0");
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.24.0");
}
#[actix_rt::test]
@@ -58,7 +58,7 @@ async fn version_requires_downgrade() {
std::fs::write(db_path.join("VERSION"), format!("{major}.{minor}.{patch}")).unwrap();
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
snapshot!(err, @"Database version 1.23.1 is higher than the Meilisearch version 1.23.0. Downgrade is not supported");
snapshot!(err, @"Database version 1.24.1 is higher than the Meilisearch version 1.24.0. Downgrade is not supported");
}
#[actix_rt::test]

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.23.0"
"upgradeTo": "v1.24.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.23.0"
"upgradeTo": "v1.24.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.23.0"
"upgradeTo": "v1.24.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.23.0"
"upgradeTo": "v1.24.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.23.0"
"upgradeTo": "v1.24.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.23.0"
"upgradeTo": "v1.24.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.23.0"
"upgradeTo": "v1.24.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.23.0"
"upgradeTo": "v1.24.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -90,7 +90,7 @@ rhai = { version = "1.22.2", features = [
"sync",
] }
arroy = "0.6.4-nested-rtxns"
hannoy = { version = "0.0.9-nested-rtxns", features = ["arroy"] }
hannoy = { version = "0.0.9-nested-rtxns-2", features = ["arroy"] }
rand = "0.8.5"
tracing = "0.1.41"
ureq = { version = "2.12.1", features = ["json"] }

View File

@@ -97,7 +97,7 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
logger.start_iteration_ranking_rule(0, ranking_rules[0].as_ref(), query, universe);
ranking_rules[0].start_iteration(ctx, logger, universe, query)?;
ranking_rules[0].start_iteration(ctx, logger, universe, query, &time_budget)?;
let mut ranking_rule_scores: Vec<ScoreDetails> = vec![];
@@ -168,42 +168,6 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
};
while valid_docids.len() < max_len_to_evaluate {
if time_budget.exceeded() {
loop {
let bucket = std::mem::take(&mut ranking_rule_universes[cur_ranking_rule_index]);
ranking_rule_scores.push(ScoreDetails::Skipped);
// remove candidates from the universe without adding them to result if their score is below the threshold
let is_below_threshold =
ranking_score_threshold.is_some_and(|ranking_score_threshold| {
let current_score = ScoreDetails::global_score(ranking_rule_scores.iter());
current_score < ranking_score_threshold
});
if is_below_threshold {
all_candidates -= &bucket;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(bucket);
}
ranking_rule_scores.pop();
if cur_ranking_rule_index == 0 {
break;
}
back!();
}
return Ok(BucketSortOutput {
scores: valid_scores,
docids: valid_docids,
all_candidates,
degraded: true,
});
}
// The universe for this bucket is zero, so we don't need to sort
// anything, just go back to the parent ranking rule.
if ranking_rule_universes[cur_ranking_rule_index].is_empty()
@@ -216,14 +180,63 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
continue;
}
let Some(next_bucket) = ranking_rules[cur_ranking_rule_index].next_bucket(
ctx,
logger,
&ranking_rule_universes[cur_ranking_rule_index],
)?
else {
back!();
continue;
let next_bucket = if time_budget.exceeded() {
match ranking_rules[cur_ranking_rule_index].non_blocking_next_bucket(
ctx,
logger,
&ranking_rule_universes[cur_ranking_rule_index],
)? {
std::task::Poll::Ready(bucket) => bucket,
std::task::Poll::Pending => {
loop {
let bucket =
std::mem::take(&mut ranking_rule_universes[cur_ranking_rule_index]);
ranking_rule_scores.push(ScoreDetails::Skipped);
// remove candidates from the universe without adding them to result if their score is below the threshold
let is_below_threshold =
ranking_score_threshold.is_some_and(|ranking_score_threshold| {
let current_score =
ScoreDetails::global_score(ranking_rule_scores.iter());
current_score < ranking_score_threshold
});
if is_below_threshold {
all_candidates -= &bucket;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(bucket);
}
ranking_rule_scores.pop();
if cur_ranking_rule_index == 0 {
break;
}
back!();
}
return Ok(BucketSortOutput {
scores: valid_scores,
docids: valid_docids,
all_candidates,
degraded: true,
});
}
}
} else {
let Some(next_bucket) = ranking_rules[cur_ranking_rule_index].next_bucket(
ctx,
logger,
&ranking_rule_universes[cur_ranking_rule_index],
&time_budget,
)?
else {
back!();
continue;
};
next_bucket
};
ranking_rule_scores.push(next_bucket.score);
@@ -275,6 +288,7 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
logger,
&next_bucket.candidates,
&next_bucket.query,
&time_budget,
)?;
}

View File

@@ -6,7 +6,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput};
use crate::score_details::{self, ScoreDetails};
use crate::search::new::query_graph::QueryNodeData;
use crate::search::new::query_term::ExactTerm;
use crate::{CboRoaringBitmapCodec, Result, SearchContext, SearchLogger};
use crate::{CboRoaringBitmapCodec, Result, SearchContext, SearchLogger, TimeBudget};
/// A ranking rule that produces 3 disjoint buckets:
///
@@ -35,6 +35,7 @@ impl<'ctx> RankingRule<'ctx, QueryGraph> for ExactAttribute {
_logger: &mut dyn SearchLogger<QueryGraph>,
universe: &roaring::RoaringBitmap,
query: &QueryGraph,
_time_budget: &TimeBudget,
) -> Result<()> {
self.state = State::start_iteration(ctx, universe, query)?;
Ok(())
@@ -46,6 +47,7 @@ impl<'ctx> RankingRule<'ctx, QueryGraph> for ExactAttribute {
_ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<QueryGraph>,
universe: &roaring::RoaringBitmap,
_time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<QueryGraph>>> {
let state = std::mem::take(&mut self.state);
let (state, output) = State::next(state, universe);

View File

@@ -7,7 +7,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait
use crate::documents::geo_sort::{fill_cache, next_bucket};
use crate::documents::{GeoSortParameter, GeoSortStrategy};
use crate::score_details::{self, ScoreDetails};
use crate::{GeoPoint, Result, SearchContext, SearchLogger};
use crate::{GeoPoint, Result, SearchContext, SearchLogger, TimeBudget};
pub struct GeoSort<Q: RankingRuleQueryTrait> {
query: Option<Q>,
@@ -84,6 +84,7 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for GeoSort<Q> {
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
query: &Q,
_time_budget: &TimeBudget,
) -> Result<()> {
assert!(self.query.is_none());
@@ -110,6 +111,7 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for GeoSort<Q> {
ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
_time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<Q>>> {
let query = self.query.as_ref().unwrap().clone();

View File

@@ -53,7 +53,7 @@ use super::{QueryGraph, RankingRule, RankingRuleOutput, SearchContext};
use crate::score_details::Rank;
use crate::search::new::query_term::LocatedQueryTermSubset;
use crate::search::new::ranking_rule_graph::PathVisitor;
use crate::{Result, TermsMatchingStrategy};
use crate::{Result, TermsMatchingStrategy, TimeBudget};
pub type Words = GraphBasedRankingRule<WordsGraph>;
impl GraphBasedRankingRule<WordsGraph> {
@@ -135,6 +135,7 @@ impl<'ctx, G: RankingRuleGraphTrait> RankingRule<'ctx, QueryGraph> for GraphBase
_logger: &mut dyn SearchLogger<QueryGraph>,
_universe: &RoaringBitmap,
query_graph: &QueryGraph,
_time_budget: &TimeBudget,
) -> Result<()> {
// the `next_max_cost` is the successor integer to the maximum cost of the paths in the graph.
//
@@ -217,6 +218,7 @@ impl<'ctx, G: RankingRuleGraphTrait> RankingRule<'ctx, QueryGraph> for GraphBase
ctx: &mut SearchContext<'ctx>,
logger: &mut dyn SearchLogger<QueryGraph>,
universe: &RoaringBitmap,
_time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<QueryGraph>>> {
// Will crash if `next_bucket` is called before `start_iteration` or after `end_iteration`,
// should never happen

View File

@@ -1,9 +1,11 @@
use std::task::Poll;
use roaring::RoaringBitmap;
use super::logger::SearchLogger;
use super::{QueryGraph, SearchContext};
use crate::score_details::ScoreDetails;
use crate::Result;
use crate::{Result, TimeBudget};
/// An internal trait implemented by only [`PlaceholderQuery`] and [`QueryGraph`]
pub trait RankingRuleQueryTrait: Sized + Clone + 'static {}
@@ -28,12 +30,15 @@ pub trait RankingRule<'ctx, Query: RankingRuleQueryTrait> {
/// buckets using [`next_bucket`](RankingRule::next_bucket).
///
/// The given universe is the universe that will be given to [`next_bucket`](RankingRule::next_bucket).
///
/// If this function may take a long time, it should check the `time_budget` and return early if exceeded.
fn start_iteration(
&mut self,
ctx: &mut SearchContext<'ctx>,
logger: &mut dyn SearchLogger<Query>,
universe: &RoaringBitmap,
query: &Query,
time_budget: &TimeBudget,
) -> Result<()>;
/// Return the next bucket of this ranking rule.
@@ -43,13 +48,31 @@ pub trait RankingRule<'ctx, Query: RankingRuleQueryTrait> {
/// The universe given as argument is either:
/// - a subset of the universe given to the previous call to [`next_bucket`](RankingRule::next_bucket); OR
/// - the universe given to [`start_iteration`](RankingRule::start_iteration)
///
/// If this function may take a long time, it should check the `time_budget` and return early if exceeded.
fn next_bucket(
&mut self,
ctx: &mut SearchContext<'ctx>,
logger: &mut dyn SearchLogger<Query>,
universe: &RoaringBitmap,
time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<Query>>>;
/// Return the next bucket of this ranking rule, if doing so can be done without blocking
///
/// Even if the time budget is exceeded, when getting the next bucket is a fast operation, this should return `true`
/// to allow Meilisearch to collect the results.
///
/// Default implementation conservatively returns that it would block.
fn non_blocking_next_bucket(
&mut self,
_ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Query>,
_universe: &RoaringBitmap,
) -> Result<Poll<RankingRuleOutput<Query>>> {
Ok(Poll::Pending)
}
/// Finish iterating over the buckets, which yields control to the parent ranking rule
/// The next call to this ranking rule, if any, will be [`start_iteration`](RankingRule::start_iteration).
fn end_iteration(

View File

@@ -7,7 +7,7 @@ use crate::heed_codec::facet::{FacetGroupKeyCodec, OrderedF64Codec};
use crate::heed_codec::{BytesRefCodec, StrRefCodec};
use crate::score_details::{self, ScoreDetails};
use crate::search::facet::{ascending_facet_sort, descending_facet_sort};
use crate::{FieldId, Index, Result};
use crate::{FieldId, Index, Result, TimeBudget};
pub trait RankingRuleOutputIter<'ctx, Query> {
fn next_bucket(&mut self) -> Result<Option<RankingRuleOutput<Query>>>;
@@ -96,6 +96,7 @@ impl<'ctx, Query: RankingRuleQueryTrait> RankingRule<'ctx, Query> for Sort<'ctx,
_logger: &mut dyn SearchLogger<Query>,
parent_candidates: &RoaringBitmap,
parent_query: &Query,
_time_budget: &TimeBudget,
) -> Result<()> {
let iter: RankingRuleOutputIterWrapper<'ctx, Query> = match self.field_id {
Some(field_id) => {
@@ -194,6 +195,7 @@ impl<'ctx, Query: RankingRuleQueryTrait> RankingRule<'ctx, Query> for Sort<'ctx,
_ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Query>,
universe: &RoaringBitmap,
_time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<Query>>> {
let iter = self.iter.as_mut().unwrap();
if let Some(mut bucket) = iter.next_bucket()? {

View File

@@ -3,12 +3,17 @@
//! 2. A test that ensure the filters are affectively applied even with a cutoff of 0
//! 3. A test that ensure the cutoff works well with the ranking scores
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use meili_snap::snapshot;
use crate::index::tests::TempIndex;
use crate::score_details::{ScoreDetails, ScoringStrategy};
use crate::update::Setting;
use crate::vector::settings::EmbeddingSettings;
use crate::vector::{Embedder, EmbedderOptions};
use crate::{Criterion, Filter, FilterableAttributesRule, Search, TimeBudget};
fn create_index() -> TempIndex {
@@ -361,9 +366,8 @@ fn degraded_search_and_score_details() {
]
"###);
// After SIX loop iteration. The words ranking rule gave us a new bucket.
// Since we reached the limit we were able to early exit without checking the typo ranking rule.
search.time_budget(TimeBudget::max().with_stop_after(6));
// After FIVE loop iterations. The words ranking rule gave us a new bucket.
search.time_budget(TimeBudget::max().with_stop_after(5));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
@@ -424,4 +428,399 @@ fn degraded_search_and_score_details() {
],
]
"###);
// After SIX loop iterations.
// we finished
search.time_budget(TimeBudget::max().with_stop_after(6));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [4, 1, 0, 3]
Scores: 1.0000 0.9167 0.8333 0.6667
Score Details:
[
[
Words(
Words {
matching_words: 3,
max_matching_words: 3,
},
),
Typo(
Typo {
typo_count: 0,
max_typo_count: 3,
},
),
],
[
Words(
Words {
matching_words: 3,
max_matching_words: 3,
},
),
Typo(
Typo {
typo_count: 1,
max_typo_count: 3,
},
),
],
[
Words(
Words {
matching_words: 3,
max_matching_words: 3,
},
),
Typo(
Typo {
typo_count: 2,
max_typo_count: 3,
},
),
],
[
Words(
Words {
matching_words: 2,
max_matching_words: 3,
},
),
Typo(
Typo {
typo_count: 0,
max_typo_count: 2,
},
),
],
]
"###);
}
#[test]
fn degraded_search_and_score_details_vector() {
let index = create_index();
index
.add_documents(documents!([
{
"id": 4,
"text": "hella puppo kefir",
"_vectors": {
"default": [0.1, 0.1]
}
},
{
"id": 3,
"text": "hella puppy kefir",
"_vectors": {
"default": [-0.1, 0.1]
}
},
{
"id": 2,
"text": "hello",
"_vectors": {
"default": [0.1, -0.1]
}
},
{
"id": 1,
"text": "hello puppy",
"_vectors": {
"default": [-0.1, -0.1]
}
},
{
"id": 0,
"text": "hello puppy kefir",
"_vectors": {
"default": null
}
},
]))
.unwrap();
index
.update_settings(|settings| {
let mut embedders = BTreeMap::new();
embedders.insert(
"default".into(),
Setting::Set(EmbeddingSettings {
source: Setting::Set(crate::vector::settings::EmbedderSource::UserProvided),
dimensions: Setting::Set(2),
..Default::default()
}),
);
settings.set_embedder_settings(embedders);
settings.set_vector_store(crate::vector::VectorStoreBackend::Hannoy);
})
.unwrap();
let rtxn = index.read_txn().unwrap();
let mut search = Search::new(&rtxn, &index);
let embedder = Arc::new(
Embedder::new(
EmbedderOptions::UserProvided(crate::vector::embedder::manual::EmbedderOptions {
dimensions: 2,
distribution: None,
}),
0,
)
.unwrap(),
);
search.semantic("default".into(), embedder, false, Some(vec![1., -1.]), None);
search.limit(4);
search.scoring_strategy(ScoringStrategy::Detailed);
search.time_budget(TimeBudget::max());
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [2, 0, 3, 1]
Scores: 1.0000 0.5000 0.5000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
1.0,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
]
"###);
// Do ONE loop iteration. Not much can be deduced, almost everyone matched the words first bucket.
search.time_budget(TimeBudget::max().with_stop_after(1));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [0, 1, 2, 3]
Scores: 0.5000 0.0000 0.0000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Skipped,
],
[
Skipped,
],
[
Skipped,
],
]
"###);
search.time_budget(TimeBudget::max().with_stop_after(2));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [0, 1, 2, 3]
Scores: 0.5000 0.0000 0.0000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
[
Skipped,
],
[
Skipped,
],
]
"###);
search.time_budget(TimeBudget::max().with_stop_after(3));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [2, 0, 1, 3]
Scores: 1.0000 0.5000 0.0000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
1.0,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
[
Skipped,
],
]
"###);
search.time_budget(TimeBudget::max().with_stop_after(4));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [2, 0, 3, 1]
Scores: 1.0000 0.5000 0.5000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
1.0,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
]
"###);
search.time_budget(TimeBudget::max().with_stop_after(5));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [2, 0, 3, 1]
Scores: 1.0000 0.5000 0.5000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
1.0,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
]
"###);
}

View File

@@ -1,4 +1,5 @@
use std::iter::FromIterator;
use std::task::Poll;
use std::time::Instant;
use roaring::RoaringBitmap;
@@ -7,7 +8,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait
use super::VectorStoreStats;
use crate::score_details::{self, ScoreDetails};
use crate::vector::{DistributionShift, Embedder, VectorStore};
use crate::{DocumentId, Result, SearchContext, SearchLogger};
use crate::{DocumentId, Result, SearchContext, SearchLogger, TimeBudget};
pub struct VectorSort<Q: RankingRuleQueryTrait> {
query: Option<Q>,
@@ -52,6 +53,7 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
&mut self,
ctx: &mut SearchContext<'_>,
vector_candidates: &RoaringBitmap,
time_budget: &TimeBudget,
) -> Result<()> {
let target = &self.target;
let backend = ctx.index.get_vector_store(ctx.txn)?.unwrap_or_default();
@@ -59,7 +61,13 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
let before = Instant::now();
let reader =
VectorStore::new(backend, ctx.index.vector_store, self.embedder_index, self.quantized);
let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?;
let results = reader.nns_by_vector(
ctx.txn,
target,
self.limit,
Some(vector_candidates),
time_budget,
)?;
self.cached_sorted_docids = results.into_iter();
*ctx.vector_store_stats.get_or_insert_default() += VectorStoreStats {
total_time: before.elapsed(),
@@ -69,6 +77,20 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
Ok(())
}
fn next_result(&mut self, vector_candidates: &RoaringBitmap) -> Option<(DocumentId, f32)> {
for (docid, distance) in self.cached_sorted_docids.by_ref() {
if vector_candidates.contains(docid) {
let score = 1.0 - distance;
let score = self
.distribution_shift
.map(|distribution| distribution.shift(score))
.unwrap_or(score);
return Some((docid, score));
}
}
None
}
}
impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
@@ -83,12 +105,13 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
query: &Q,
time_budget: &TimeBudget,
) -> Result<()> {
assert!(self.query.is_none());
self.query = Some(query.clone());
let vector_candidates = &self.vector_candidates & universe;
self.fill_buffer(ctx, &vector_candidates)?;
self.fill_buffer(ctx, &vector_candidates, time_budget)?;
Ok(())
}
@@ -99,6 +122,7 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<Q>>> {
let query = self.query.as_ref().unwrap().clone();
let vector_candidates = &self.vector_candidates & universe;
@@ -111,24 +135,17 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
}));
}
for (docid, distance) in self.cached_sorted_docids.by_ref() {
if vector_candidates.contains(docid) {
let score = 1.0 - distance;
let score = self
.distribution_shift
.map(|distribution| distribution.shift(score))
.unwrap_or(score);
return Ok(Some(RankingRuleOutput {
query,
candidates: RoaringBitmap::from_iter([docid]),
score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }),
}));
}
if let Some((docid, score)) = self.next_result(&vector_candidates) {
return Ok(Some(RankingRuleOutput {
query,
candidates: RoaringBitmap::from_iter([docid]),
score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }),
}));
}
// if we got out of this loop it means we've exhausted our cache.
// we need to refill it and run the function again.
self.fill_buffer(ctx, &vector_candidates)?;
self.fill_buffer(ctx, &vector_candidates, time_budget)?;
// we tried filling the buffer, but it remained empty 😢
// it means we don't actually have any document remaining in the universe with a vector.
@@ -141,11 +158,39 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
}));
}
self.next_bucket(ctx, _logger, universe)
self.next_bucket(ctx, _logger, universe, time_budget)
}
#[tracing::instrument(level = "trace", skip_all, target = "search::vector_sort")]
fn end_iteration(&mut self, _ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger<Q>) {
self.query = None;
}
fn non_blocking_next_bucket(
&mut self,
_ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
) -> Result<Poll<RankingRuleOutput<Q>>> {
let query = self.query.as_ref().unwrap().clone();
let vector_candidates = &self.vector_candidates & universe;
if vector_candidates.is_empty() {
return Ok(Poll::Ready(RankingRuleOutput {
query,
candidates: universe.clone(),
score: ScoreDetails::Vector(score_details::Vector { similarity: None }),
}));
}
if let Some((docid, score)) = self.next_result(&vector_candidates) {
Ok(Poll::Ready(RankingRuleOutput {
query,
candidates: RoaringBitmap::from_iter([docid]),
score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }),
}))
} else {
Ok(Poll::Pending)
}
}
}

View File

@@ -41,6 +41,7 @@ const UPGRADE_FUNCTIONS: &[&dyn UpgradeIndex] = &[
&ToTargetNoOp { target: (1, 21, 0) },
&ToTargetNoOp { target: (1, 22, 0) },
&ToTargetNoOp { target: (1, 23, 0) },
&ToTargetNoOp { target: (1, 24, 0) },
// This is the last upgrade function, it will be called when the index is up to date.
// any other upgrade function should be added before this one.
&ToCurrentNoOp {},
@@ -75,6 +76,7 @@ const fn start(from: (u32, u32, u32)) -> Option<usize> {
(1, 21, _) => function_index!(11),
(1, 22, _) => function_index!(12),
(1, 23, _) => function_index!(13),
(1, 24, _) => function_index!(14),
// We deliberately don't add a placeholder with (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) here to force manually
// considering dumpless upgrade.
(_major, _minor, _patch) => return None,

View File

@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use crate::progress::Progress;
use crate::vector::Embeddings;
use crate::TimeBudget;
const HANNOY_EF_CONSTRUCTION: usize = 125;
const HANNOY_M: usize = 16;
@@ -591,6 +592,7 @@ impl VectorStore {
vector: &[f32],
limit: usize,
filter: Option<&RoaringBitmap>,
time_budget: &TimeBudget,
) -> crate::Result<Vec<(ItemId, f32)>> {
if self.backend == VectorStoreBackend::Arroy {
if self.quantized {
@@ -601,11 +603,25 @@ impl VectorStore {
.map_err(Into::into)
}
} else if self.quantized {
self._hannoy_nns_by_vector(rtxn, self._hannoy_quantized_db(), vector, limit, filter)
.map_err(Into::into)
self._hannoy_nns_by_vector(
rtxn,
self._hannoy_quantized_db(),
vector,
limit,
filter,
time_budget,
)
.map_err(Into::into)
} else {
self._hannoy_nns_by_vector(rtxn, self._hannoy_angular_db(), vector, limit, filter)
.map_err(Into::into)
self._hannoy_nns_by_vector(
rtxn,
self._hannoy_angular_db(),
vector,
limit,
filter,
time_budget,
)
.map_err(Into::into)
}
}
pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result<Vec<Vec<f32>>> {
@@ -1000,6 +1016,7 @@ impl VectorStore {
vector: &[f32],
limit: usize,
filter: Option<&RoaringBitmap>,
time_budget: &TimeBudget,
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> {
let mut results = Vec::new();
@@ -1011,7 +1028,10 @@ impl VectorStore {
searcher.candidates(filter);
}
results.append(&mut searcher.by_vector(rtxn, vector)?);
let (res, _degraded) =
&mut searcher
.by_vector_with_cancellation(rtxn, vector, || time_budget.exceeded())?;
results.append(res);
}
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));