Compare commits

..

2 Commits

Author SHA1 Message Date
curquiza
75f5375e32 Change Dockerfile because of Alpine change with /usr-merge 2025-10-11 11:04:29 +02:00
PedroTroller
2137606e9d Bump alpine version to 3.22 2025-10-06 16:49:28 +02:00
78 changed files with 368 additions and 2030 deletions

29
.github/release-draft-template.yml vendored Normal file
View File

@@ -0,0 +1,29 @@
name-template: 'v$RESOLVED_VERSION'
tag-template: 'v$RESOLVED_VERSION'
exclude-labels:
- 'skip changelog'
version-resolver:
minor:
labels:
- 'enhancement'
default: patch
categories:
- title: '⚠️ Breaking changes'
label: 'breaking-change'
- title: '🚀 Enhancements'
label: 'enhancement'
- title: '🐛 Bug Fixes'
label: 'bug'
- title: '🔒 Security'
label: 'security'
- title: '⚙️ Maintenance/misc'
label:
- 'dependencies'
- 'maintenance'
- 'documentation'
template: |
$CHANGES
❤️ Huge thanks to our contributors: $CONTRIBUTORS.
no-changes-template: 'Changes are coming soon 😎'
sort-direction: 'ascending'

20
.github/workflows/release-drafter.yml vendored Normal file
View File

@@ -0,0 +1,20 @@
name: Release Drafter
permissions:
contents: read
pull-requests: write
on:
push:
branches:
- main
jobs:
update_release_draft:
runs-on: ubuntu-latest
steps:
- uses: release-drafter/release-drafter@v6
with:
config-name: release-draft-template.yml
env:
GITHUB_TOKEN: ${{ secrets.RELEASE_DRAFTER_TOKEN }}

42
Cargo.lock generated
View File

@@ -589,7 +589,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
[[package]]
name = "benchmarks"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"anyhow",
"bumpalo",
@@ -799,7 +799,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"anyhow",
"time",
@@ -1075,9 +1075,9 @@ dependencies = [
[[package]]
name = "cellulite"
version = "0.3.1-nested-rtxns-2"
version = "0.3.1-nested-rtxns"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f22d721963ead1a144f10cb8b53dc9469e760723b069123c7c7bc675c7354270"
checksum = "db298d57a80b9284327800b394ee3921307c2fdda87c6d37202f5cf400478981"
dependencies = [
"crossbeam",
"geo",
@@ -1829,7 +1829,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"anyhow",
"big_s",
@@ -2072,7 +2072,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-store"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"tempfile",
"thiserror 2.0.16",
@@ -2094,7 +2094,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"insta",
"levenshtein_automata",
@@ -2122,7 +2122,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"criterion",
"serde_json",
@@ -2279,7 +2279,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"arbitrary",
"bumpalo",
@@ -2758,9 +2758,9 @@ dependencies = [
[[package]]
name = "hannoy"
version = "0.0.9-nested-rtxns-2"
version = "0.0.9-nested-rtxns"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06eda090938d9dcd568c8c2a5de383047ed9191578ebf4a342d2975d16e621f2"
checksum = "cc5a945b92b063e677d658cfcc7cb6dec2502fe44631f017084938f14d6ce30e"
dependencies = [
"bytemuck",
"byteorder",
@@ -3233,7 +3233,7 @@ dependencies = [
[[package]]
name = "index-scheduler"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"anyhow",
"backoff",
@@ -3487,7 +3487,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"criterion",
"serde_json",
@@ -3996,7 +3996,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"insta",
"md5",
@@ -4007,7 +4007,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"actix-cors",
"actix-http",
@@ -4104,7 +4104,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@@ -4123,7 +4123,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"actix-web",
"anyhow",
@@ -4158,7 +4158,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"anyhow",
"clap",
@@ -4192,7 +4192,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"allocator-api2 0.3.1",
"arroy",
@@ -4773,7 +4773,7 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "permissive-json-pointer"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"big_s",
"serde_json",
@@ -7820,7 +7820,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.24.0"
version = "1.22.1"
dependencies = [
"anyhow",
"build-info",

View File

@@ -23,7 +23,7 @@ members = [
]
[workspace.package]
version = "1.24.0"
version = "1.22.1"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@@ -30,11 +30,11 @@ RUN apk add -q --no-cache libgcc tini curl
# add meilisearch and meilitool to the `/bin` so you can run it from anywhere
# and it's easy to find.
COPY --from=compiler /target/release/meilisearch /bin/meilisearch
COPY --from=compiler /target/release/meilitool /bin/meilitool
COPY --from=compiler /target/release/meilisearch /usr/bin/meilisearch
COPY --from=compiler /target/release/meilitool /usr/bin/meilitool
# To stay compatible with the older version of the container (pre v0.27.0) we're
# going to symlink the meilisearch binary in the path to `/meilisearch`
RUN ln -s /bin/meilisearch /meilisearch
RUN ln -s /usr/bin/meilisearch /meilisearch
# This directory should hold all the data related to meilisearch so we're going
# to move our PWD in there.
@@ -45,4 +45,4 @@ WORKDIR /meili_data
EXPOSE 7700/tcp
ENTRYPOINT ["tini", "--"]
CMD /bin/meilisearch
CMD /usr/bin/meilisearch

28
LICENSE
View File

@@ -1,9 +1,29 @@
# License
MIT License
Copyright (c) 2019-2025 Meili SAS
Part of this work fall under the Meilisearch Enterprise Edition (EE) and are licensed under the Business Source License 1.1, please refer to [LICENSE-EE](./LICENSE-EE) for details.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The other parts of this work are licensed under the [MIT license](./LICENSE-MIT).
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
`SPDX-License-Identifier: MIT AND BUSL-1.1`
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
---
🔒 Meilisearch Enterprise Edition (EE)
Certain parts of this codebase are not licensed under the MIT license and governed by the Business Source License 1.1.
See the LICENSE-EE file for details.

View File

@@ -1,21 +0,0 @@
MIT License
Copyright (c) 2019-2025 Meili SAS
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -158,9 +158,6 @@ pub enum KindDump {
UpgradeDatabase {
from: (u32, u32, u32),
},
IndexCompaction {
index_uid: String,
},
}
impl From<Task> for TaskDump {
@@ -243,9 +240,6 @@ impl From<KindWithContent> for KindDump {
KindWithContent::UpgradeDatabase { from: version } => {
KindDump::UpgradeDatabase { from: version }
}
KindWithContent::IndexCompaction { index_uid } => {
KindDump::IndexCompaction { index_uid }
}
}
}
}

View File

@@ -234,9 +234,6 @@ impl<'a> Dump<'a> {
}
}
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
KindDump::IndexCompaction { index_uid } => {
KindWithContent::IndexCompaction { index_uid }
}
},
};

View File

@@ -199,7 +199,7 @@ impl IndexMapper {
let uuid = Uuid::new_v4();
self.index_mapping.put(&mut wtxn, name, &uuid)?;
let index_path = self.index_path(uuid);
let index_path = self.base_path.join(uuid.to_string());
fs::create_dir_all(&index_path)?;
// Error if the UUIDv4 somehow already exists in the map, since it should be fresh.
@@ -286,7 +286,7 @@ impl IndexMapper {
};
let index_map = self.index_map.clone();
let index_path = self.index_path(uuid);
let index_path = self.base_path.join(uuid.to_string());
let index_name = name.to_string();
thread::Builder::new()
.name(String::from("index_deleter"))
@@ -341,26 +341,6 @@ impl IndexMapper {
Ok(())
}
/// Closes the specified index.
///
/// This operation involves closing the underlying environment and so can take a long time to complete.
///
/// # Panics
///
/// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the
/// in memory hash map.
pub fn close_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> {
let uuid = self
.index_mapping
.get(rtxn, name)?
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
// We remove the index from the in-memory index map.
self.index_map.write().unwrap().close_for_resize(&uuid, self.enable_mdb_writemap, 0);
Ok(())
}
/// Return an index, may open it if it wasn't already opened.
pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result<Index> {
if let Some((current_name, current_index)) =
@@ -408,7 +388,7 @@ impl IndexMapper {
} else {
continue;
};
let index_path = self.index_path(uuid);
let index_path = self.base_path.join(uuid.to_string());
// take the lock to reopen the environment.
reopen
.reopen(&mut self.index_map.write().unwrap(), &index_path)
@@ -425,7 +405,7 @@ impl IndexMapper {
// if it's not already there.
match index_map.get(&uuid) {
Missing => {
let index_path = self.index_path(uuid);
let index_path = self.base_path.join(uuid.to_string());
break index_map
.create(
@@ -452,14 +432,6 @@ impl IndexMapper {
Ok(index)
}
/// Returns the path of the index.
///
/// The folder located at this path is containing the data.mdb,
/// the lock.mdb and an optional data.mdb.cpy file.
pub fn index_path(&self, uuid: Uuid) -> PathBuf {
self.base_path.join(uuid.to_string())
}
pub fn rollback_index(
&self,
rtxn: &RoTxn,
@@ -500,7 +472,7 @@ impl IndexMapper {
};
}
let index_path = self.index_path(uuid);
let index_path = self.base_path.join(uuid.to_string());
Index::rollback(milli::heed::EnvOpenOptions::new().read_txn_without_tls(), index_path, to)
.map_err(|err| crate::Error::from_milli(err, Some(name.to_string())))
}

View File

@@ -317,9 +317,6 @@ fn snapshot_details(d: &Details) -> String {
Details::UpgradeDatabase { from, to } => {
format!("{{ from: {from:?}, to: {to:?} }}")
}
Details::IndexCompaction { index_uid, pre_compaction_size, post_compaction_size } => {
format!("{{ index_uid: {index_uid:?}, pre_compaction_size: {pre_compaction_size:?}, post_compaction_size: {post_compaction_size:?} }}")
}
}
}

View File

@@ -75,7 +75,6 @@ make_enum_progress! {
pub enum TaskCancelationProgress {
RetrievingTasks,
CancelingUpgrade,
CleaningCompactionLeftover,
UpdatingTasks,
}
}
@@ -139,17 +138,6 @@ make_enum_progress! {
}
}
make_enum_progress! {
pub enum IndexCompaction {
RetrieveTheIndex,
CreateTemporaryFile,
CopyAndCompactTheIndex,
PersistTheCompactedIndex,
CloseTheIndex,
ReopenTheIndex,
}
}
make_enum_progress! {
pub enum InnerSwappingTwoIndexes {
RetrieveTheTasks,

View File

@@ -68,14 +68,13 @@ impl From<KindWithContent> for AutobatchKind {
KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation,
KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate,
KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap,
KindWithContent::IndexCompaction { .. }
| KindWithContent::TaskCancelation { .. }
KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. }
| KindWithContent::Export { .. }
| KindWithContent::UpgradeDatabase { .. }
| KindWithContent::SnapshotCreation => {
panic!("The autobatcher should never be called with tasks with special priority or that don't apply to an index.")
panic!("The autobatcher should never be called with tasks that don't apply to an index.")
}
}
}
@@ -288,10 +287,8 @@ impl BatchKind {
};
match (self, autobatch_kind) {
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => {
Break((this, BatchStopReason::TaskCannotBeBatched { kind, id }))
},
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break((this, BatchStopReason::TaskCannotBeBatched { kind, id })),
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
(this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => {
Break((this, BatchStopReason::IndexCreationMismatch { id }))

View File

@@ -55,10 +55,6 @@ pub(crate) enum Batch {
UpgradeDatabase {
tasks: Vec<Task>,
},
IndexCompaction {
index_uid: String,
task: Task,
},
}
#[derive(Debug)]
@@ -114,8 +110,7 @@ impl Batch {
| Batch::Dump(task)
| Batch::IndexCreation { task, .. }
| Batch::Export { task }
| Batch::IndexUpdate { task, .. }
| Batch::IndexCompaction { task, .. } => {
| Batch::IndexUpdate { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
Batch::SnapshotCreation(tasks)
@@ -160,8 +155,7 @@ impl Batch {
IndexOperation { op, .. } => Some(op.index_uid()),
IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid, .. }
| IndexCompaction { index_uid, .. } => Some(index_uid),
| IndexDeletion { index_uid, .. } => Some(index_uid),
}
}
}
@@ -181,7 +175,6 @@ impl fmt::Display for Batch {
Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?,
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
Batch::IndexCompaction { .. } => f.write_str("IndexCompaction")?,
Batch::Export { .. } => f.write_str("Export")?,
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
};
@@ -519,33 +512,17 @@ impl IndexScheduler {
return Ok(Some((Batch::TaskDeletions(tasks), current_batch)));
}
// 3. we get the next task to compact
let to_compact = self.queue.tasks.get_kind(rtxn, Kind::IndexCompaction)? & enqueued;
if let Some(task_id) = to_compact.min() {
let mut task =
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task));
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::IndexCompaction,
id: task_id,
});
let index_uid =
task.index_uid().expect("Compaction task must have an index uid").to_owned();
return Ok(Some((Batch::IndexCompaction { index_uid, task }, current_batch)));
}
// 4. we batch the export.
// 3. we batch the export.
let to_export = self.queue.tasks.get_kind(rtxn, Kind::Export)? & enqueued;
if !to_export.is_empty() {
let task_id = to_export.iter().next().expect("There must be at least one export task");
let mut task = self.queue.tasks.get_task(rtxn, task_id)?.unwrap();
current_batch.processing([&mut task]);
current_batch
.reason(BatchStopReason::TaskCannotBeBatched { kind: Kind::Export, id: task_id });
current_batch.reason(BatchStopReason::TaskKindCannotBeBatched { kind: Kind::Export });
return Ok(Some((Batch::Export { task }, current_batch)));
}
// 5. we batch the snapshot.
// 4. we batch the snapshot.
let to_snapshot = self.queue.tasks.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued;
if !to_snapshot.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?;
@@ -555,7 +532,7 @@ impl IndexScheduler {
return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
}
// 6. we batch the dumps.
// 5. we batch the dumps.
let to_dump = self.queue.tasks.get_kind(rtxn, Kind::DumpCreation)? & enqueued;
if let Some(to_dump) = to_dump.min() {
let mut task =
@@ -568,7 +545,7 @@ impl IndexScheduler {
return Ok(Some((Batch::Dump(task), current_batch)));
}
// 7. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
// 6. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
let mut task =
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;

View File

@@ -1,27 +1,22 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fs::{remove_file, File};
use std::io::{ErrorKind, Seek, SeekFrom};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering;
use byte_unit::Byte;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self, ChannelCongestion};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use milli::update::Settings as MilliSettings;
use roaring::RoaringBitmap;
use tempfile::{PersistError, TempPath};
use time::OffsetDateTime;
use super::create_batch::Batch;
use crate::processing::{
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep,
IndexCompaction, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress,
TaskDeletionProgress, UpdateIndexProgress,
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress,
};
use crate::utils::{
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
@@ -29,9 +24,6 @@ use crate::utils::{
};
use crate::{Error, IndexScheduler, Result, TaskId};
/// The name of the copy of the data.mdb file used during compaction.
const DATA_MDB_COPY_NAME: &str = "data.mdb.cpy";
#[derive(Debug, Default)]
pub struct ProcessBatchInfo {
/// The write channel congestion. None when unavailable: settings update.
@@ -426,47 +418,6 @@ impl IndexScheduler {
task.status = Status::Succeeded;
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::IndexCompaction { index_uid: _, mut task } => {
let KindWithContent::IndexCompaction { index_uid } = &task.kind else {
unreachable!()
};
let rtxn = self.env.read_txn()?;
let ret = catch_unwind(AssertUnwindSafe(|| {
self.apply_compaction(&rtxn, &progress, index_uid)
}));
let (pre_size, post_size) = match ret {
Ok(Ok(stats)) => stats,
Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask),
Ok(Err(e)) => return Err(e),
Err(e) => {
let msg = match e.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match e.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
return Err(Error::Export(Box::new(Error::ProcessBatchPanicked(
msg.to_string(),
))));
}
};
task.status = Status::Succeeded;
if let Some(Details::IndexCompaction {
index_uid: _,
pre_compaction_size,
post_compaction_size,
}) = task.details.as_mut()
{
*pre_compaction_size = Some(Byte::from_u64(pre_size));
*post_compaction_size = Some(Byte::from_u64(post_size));
}
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::Export { mut task } => {
let KindWithContent::Export { url, api_key, payload_size, indexes } = &task.kind
else {
@@ -542,92 +493,6 @@ impl IndexScheduler {
}
}
fn apply_compaction(
&self,
rtxn: &RoTxn,
progress: &Progress,
index_uid: &str,
) -> Result<(u64, u64)> {
// 1. Verify that the index exists
if !self.index_mapper.index_exists(rtxn, index_uid)? {
return Err(Error::IndexNotFound(index_uid.to_owned()));
}
// 2. We retrieve the index and create a temporary file in the index directory
progress.update_progress(IndexCompaction::RetrieveTheIndex);
let index = self.index_mapper.index(rtxn, index_uid)?;
// the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick
self.index_mapper
.set_currently_updating_index(Some((index_uid.to_string(), index.clone())));
progress.update_progress(IndexCompaction::CreateTemporaryFile);
let src_path = index.path().join("data.mdb");
let pre_size = std::fs::metadata(&src_path)?.len();
let dst_path = TempPath::from_path(index.path().join(DATA_MDB_COPY_NAME));
let file = File::create(&dst_path)?;
let mut file = tempfile::NamedTempFile::from_parts(file, dst_path);
// 3. We copy the index data to the temporary file
progress.update_progress(IndexCompaction::CopyAndCompactTheIndex);
index
.copy_to_file(file.as_file_mut(), CompactionOption::Enabled)
.map_err(|error| Error::Milli { error, index_uid: Some(index_uid.to_string()) })?;
// ...and reset the file position as specified in the documentation
file.seek(SeekFrom::Start(0))?;
// 4. We replace the index data file with the temporary file
progress.update_progress(IndexCompaction::PersistTheCompactedIndex);
match file.persist(src_path) {
Ok(file) => file.sync_all()?,
// TODO see if we have a _resource busy_ error and probably handle this by:
// 1. closing the index, 2. replacing and 3. reopening it
Err(PersistError { error, file: _ }) => return Err(Error::IoError(error)),
};
// 5. Prepare to close the index
progress.update_progress(IndexCompaction::CloseTheIndex);
// unmark that the index is the processing one so we don't keep a handle to it, preventing its closing
self.index_mapper.set_currently_updating_index(None);
self.index_mapper.close_index(rtxn, index_uid)?;
drop(index);
progress.update_progress(IndexCompaction::ReopenTheIndex);
// 6. Reopen the index
// The index will use the compacted data file when being reopened
let index = self.index_mapper.index(rtxn, index_uid)?;
// if the update processed successfully, we're going to store the new
// stats of the index. Since the tasks have already been processed and
// this is a non-critical operation. If it fails, we should not fail
// the entire batch.
let res = || -> Result<_> {
let mut wtxn = self.env.write_txn()?;
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
self.index_mapper.store_stats_of(&mut wtxn, index_uid, &stats)?;
wtxn.commit()?;
Ok(stats.database_size)
}();
let post_size = match res {
Ok(post_size) => post_size,
Err(e) => {
tracing::error!(
error = &e as &dyn std::error::Error,
"Could not write the stats of the index"
);
0
}
};
Ok((pre_size, post_size))
}
/// Swap the index `lhs` with the index `rhs`.
fn apply_index_swap(
&self,
@@ -915,10 +780,9 @@ impl IndexScheduler {
let enqueued_tasks = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
// 0. Check if any upgrade or compaction tasks were matched.
// 0. Check if any upgrade task was matched.
// If so, we cancel all the failed or enqueued upgrade tasks.
let upgrade_tasks = &self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)?;
let compaction_tasks = &self.queue.tasks.get_kind(rtxn, Kind::IndexCompaction)?;
let is_canceling_upgrade = !matched_tasks.is_disjoint(upgrade_tasks);
if is_canceling_upgrade {
let failed_tasks = self.queue.tasks.get_status(rtxn, Status::Failed)?;
@@ -983,33 +847,7 @@ impl IndexScheduler {
}
}
// 3. If we are cancelling a compaction task, remove the tempfiles after incomplete compactions
for compaction_task in &tasks_to_cancel & compaction_tasks {
progress.update_progress(TaskCancelationProgress::CleaningCompactionLeftover);
let task = self.queue.tasks.get_task(rtxn, compaction_task)?.unwrap();
let Some(Details::IndexCompaction {
index_uid,
pre_compaction_size: _,
post_compaction_size: _,
}) = task.details
else {
unreachable!("wrong details for compaction task {compaction_task}")
};
let index_path = match self.index_mapper.index_mapping.get(rtxn, &index_uid)? {
Some(index_uuid) => self.index_mapper.index_path(index_uuid),
None => continue,
};
if let Err(e) = remove_file(index_path.join(DATA_MDB_COPY_NAME)) {
match e.kind() {
ErrorKind::NotFound => (),
_ => return Err(Error::IoError(e)),
}
}
}
// 4. We now have a list of tasks to cancel, cancel them
// 3. We now have a list of tasks to cancel, cancel them
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
progress.update_progress(progress_obj);

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 22, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -57,7 +57,7 @@ girafo: { number_of_documents: 0, field_distribution: {} }
[timestamp] [4,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.22.1"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
1 {uid: 1, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, stop reason: "created batch containing only task with id 1 of type `indexCreation` that cannot be batched with any other task.", }
2 {uid: 2, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 2 of type `indexCreation` that cannot be batched with any other task.", }
3 {uid: 3, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 3 of type `indexCreation` that cannot be batched with any other task.", }

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 22, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
----------------------------------------------------------------------
### Status:
enqueued [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 22, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 22, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:
@@ -37,7 +37,7 @@ catto [1,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.22.1"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 22, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
----------------------------------------------------------------------
@@ -40,7 +40,7 @@ doggo [2,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.22.1"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 22, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -43,7 +43,7 @@ doggo [2,3,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.22.1"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -722,7 +722,7 @@ fn basic_get_stats() {
let kind = index_creation_task("whalo", "fish");
let _task = index_scheduler.register(kind, None, false).unwrap();
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{
"indexes": {
"catto": 1,
@@ -742,7 +742,6 @@ fn basic_get_stats() {
"documentEdition": 0,
"dumpCreation": 0,
"export": 0,
"indexCompaction": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
@@ -754,10 +753,10 @@ fn basic_get_stats() {
"upgradeDatabase": 0
}
}
"###);
"#);
handle.advance_till([Start, BatchCreated]);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{
"indexes": {
"catto": 1,
@@ -777,7 +776,6 @@ fn basic_get_stats() {
"documentEdition": 0,
"dumpCreation": 0,
"export": 0,
"indexCompaction": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
@@ -789,7 +787,7 @@ fn basic_get_stats() {
"upgradeDatabase": 0
}
}
"###);
"#);
handle.advance_till([
InsideProcessBatch,
@@ -799,7 +797,7 @@ fn basic_get_stats() {
Start,
BatchCreated,
]);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{
"indexes": {
"catto": 1,
@@ -819,7 +817,6 @@ fn basic_get_stats() {
"documentEdition": 0,
"dumpCreation": 0,
"export": 0,
"indexCompaction": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
@@ -831,7 +828,7 @@ fn basic_get_stats() {
"upgradeDatabase": 0
}
}
"###);
"#);
// now we make one more batch, the started_at field of the new tasks will be past `second_start_time`
handle.advance_till([
@@ -842,7 +839,7 @@ fn basic_get_stats() {
Start,
BatchCreated,
]);
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r###"
snapshot!(json_string!(index_scheduler.get_stats().unwrap()), @r#"
{
"indexes": {
"catto": 1,
@@ -862,7 +859,6 @@ fn basic_get_stats() {
"documentEdition": 0,
"dumpCreation": 0,
"export": 0,
"indexCompaction": 0,
"indexCreation": 3,
"indexDeletion": 0,
"indexSwap": 0,
@@ -874,7 +870,7 @@ fn basic_get_stats() {
"upgradeDatabase": 0
}
}
"###);
"#);
}
#[test]

View File

@@ -46,8 +46,6 @@ pub fn upgrade_index_scheduler(
(1, 20, _) => 0,
(1, 21, _) => 0,
(1, 22, _) => 0,
(1, 23, _) => 0,
(1, 24, _) => 0,
(major, minor, patch) => {
if major > current_major
|| (major == current_major && minor > current_minor)

View File

@@ -256,15 +256,14 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
use KindWithContent as K;
let mut index_uids = vec![];
match &mut task.kind {
K::DocumentAdditionOrUpdate { index_uid, .. }
| K::DocumentEdition { index_uid, .. }
| K::DocumentDeletion { index_uid, .. }
| K::DocumentDeletionByFilter { index_uid, .. }
| K::DocumentClear { index_uid }
| K::SettingsUpdate { index_uid, .. }
| K::IndexDeletion { index_uid }
| K::IndexCreation { index_uid, .. }
| K::IndexCompaction { index_uid, .. } => index_uids.push(index_uid),
K::DocumentAdditionOrUpdate { index_uid, .. } => index_uids.push(index_uid),
K::DocumentEdition { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletionByFilter { index_uid, .. } => index_uids.push(index_uid),
K::DocumentClear { index_uid } => index_uids.push(index_uid),
K::SettingsUpdate { index_uid, .. } => index_uids.push(index_uid),
K::IndexDeletion { index_uid } => index_uids.push(index_uid),
K::IndexCreation { index_uid, .. } => index_uids.push(index_uid),
K::IndexUpdate { index_uid, new_index_uid, .. } => {
index_uids.push(index_uid);
if let Some(new_uid) = new_index_uid {
@@ -619,13 +618,6 @@ impl crate::IndexScheduler {
Details::UpgradeDatabase { from: _, to: _ } => {
assert_eq!(kind.as_kind(), Kind::UpgradeDatabase);
}
Details::IndexCompaction {
index_uid: _,
pre_compaction_size: _,
post_compaction_size: _,
} => {
assert_eq!(kind.as_kind(), Kind::IndexCompaction);
}
}
}

View File

@@ -109,7 +109,6 @@ impl HeedAuthStore {
Action::IndexesGet,
Action::IndexesUpdate,
Action::IndexesSwap,
Action::IndexesCompact,
]
.iter(),
);

View File

@@ -380,9 +380,6 @@ pub enum Action {
#[serde(rename = "webhooks.*")]
#[deserr(rename = "webhooks.*")]
WebhooksAll,
#[serde(rename = "indexes.compact")]
#[deserr(rename = "indexes.compact")]
IndexesCompact,
}
impl Action {
@@ -401,7 +398,6 @@ impl Action {
INDEXES_UPDATE => Some(Self::IndexesUpdate),
INDEXES_DELETE => Some(Self::IndexesDelete),
INDEXES_SWAP => Some(Self::IndexesSwap),
INDEXES_COMPACT => Some(Self::IndexesCompact),
TASKS_ALL => Some(Self::TasksAll),
TASKS_CANCEL => Some(Self::TasksCancel),
TASKS_DELETE => Some(Self::TasksDelete),
@@ -466,7 +462,6 @@ impl Action {
IndexesUpdate => false,
IndexesDelete => false,
IndexesSwap => false,
IndexesCompact => false,
TasksCancel => false,
TasksDelete => false,
TasksGet => true,
@@ -518,7 +513,6 @@ pub mod actions {
pub const INDEXES_UPDATE: u8 = IndexesUpdate.repr();
pub const INDEXES_DELETE: u8 = IndexesDelete.repr();
pub const INDEXES_SWAP: u8 = IndexesSwap.repr();
pub const INDEXES_COMPACT: u8 = IndexesCompact.repr();
pub const TASKS_ALL: u8 = TasksAll.repr();
pub const TASKS_CANCEL: u8 = TasksCancel.repr();
pub const TASKS_DELETE: u8 = TasksDelete.repr();
@@ -620,7 +614,6 @@ pub(crate) mod test {
assert!(WebhooksDelete.repr() == 47 && WEBHOOKS_DELETE == 47);
assert!(WebhooksCreate.repr() == 48 && WEBHOOKS_CREATE == 48);
assert!(WebhooksAll.repr() == 49 && WEBHOOKS_ALL == 49);
assert!(IndexesCompact.repr() == 50 && INDEXES_COMPACT == 50);
}
#[test]

View File

@@ -142,11 +142,6 @@ pub struct DetailsView {
pub old_index_uid: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub new_index_uid: Option<String>,
// index compaction
#[serde(skip_serializing_if = "Option::is_none")]
pub pre_compaction_size: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub post_compaction_size: Option<String>,
}
impl DetailsView {
@@ -319,24 +314,6 @@ impl DetailsView {
// We should never be able to batch multiple renames at the same time.
(Some(left), Some(_right)) => Some(left),
},
pre_compaction_size: match (
self.pre_compaction_size.clone(),
other.pre_compaction_size.clone(),
) {
(None, None) => None,
(None, Some(size)) | (Some(size), None) => Some(size),
// We should never be able to batch multiple compactions at the same time.
(Some(left), Some(_right)) => Some(left),
},
post_compaction_size: match (
self.post_compaction_size.clone(),
other.post_compaction_size.clone(),
) {
(None, None) => None,
(None, Some(size)) | (Some(size), None) => Some(size),
// We should never be able to batch multiple compactions at the same time.
(Some(left), Some(_right)) => Some(left),
},
}
}
}
@@ -438,15 +415,6 @@ impl From<Details> for DetailsView {
upgrade_to: Some(format!("v{}.{}.{}", to.0, to.1, to.2)),
..Default::default()
},
Details::IndexCompaction { pre_compaction_size, post_compaction_size, .. } => {
DetailsView {
pre_compaction_size: pre_compaction_size
.map(|size| size.get_appropriate_unit(UnitType::Both).to_string()),
post_compaction_size: post_compaction_size
.map(|size| size.get_appropriate_unit(UnitType::Both).to_string()),
..Default::default()
}
}
}
}
}

View File

@@ -67,8 +67,7 @@ impl Task {
| SettingsUpdate { index_uid, .. }
| IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid }
| IndexCompaction { index_uid } => Some(index_uid),
| IndexDeletion { index_uid } => Some(index_uid),
}
}
@@ -95,8 +94,7 @@ impl Task {
| KindWithContent::DumpCreation { .. }
| KindWithContent::SnapshotCreation
| KindWithContent::Export { .. }
| KindWithContent::UpgradeDatabase { .. }
| KindWithContent::IndexCompaction { .. } => None,
| KindWithContent::UpgradeDatabase { .. } => None,
}
}
}
@@ -172,9 +170,6 @@ pub enum KindWithContent {
UpgradeDatabase {
from: (u32, u32, u32),
},
IndexCompaction {
index_uid: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
@@ -211,7 +206,6 @@ impl KindWithContent {
KindWithContent::SnapshotCreation => Kind::SnapshotCreation,
KindWithContent::Export { .. } => Kind::Export,
KindWithContent::UpgradeDatabase { .. } => Kind::UpgradeDatabase,
KindWithContent::IndexCompaction { .. } => Kind::IndexCompaction,
}
}
@@ -232,8 +226,7 @@ impl KindWithContent {
| DocumentClear { index_uid }
| SettingsUpdate { index_uid, .. }
| IndexCreation { index_uid, .. }
| IndexDeletion { index_uid }
| IndexCompaction { index_uid } => vec![index_uid],
| IndexDeletion { index_uid } => vec![index_uid],
IndexUpdate { index_uid, new_index_uid, .. } => {
let mut indexes = vec![index_uid.as_str()];
if let Some(new_uid) = new_index_uid {
@@ -332,11 +325,6 @@ impl KindWithContent {
versioning::VERSION_PATCH,
),
}),
KindWithContent::IndexCompaction { index_uid } => Some(Details::IndexCompaction {
index_uid: index_uid.clone(),
pre_compaction_size: None,
post_compaction_size: None,
}),
}
}
@@ -419,11 +407,6 @@ impl KindWithContent {
versioning::VERSION_PATCH,
),
}),
KindWithContent::IndexCompaction { index_uid } => Some(Details::IndexCompaction {
index_uid: index_uid.clone(),
pre_compaction_size: None,
post_compaction_size: None,
}),
}
}
}
@@ -486,11 +469,6 @@ impl From<&KindWithContent> for Option<Details> {
versioning::VERSION_PATCH,
),
}),
KindWithContent::IndexCompaction { index_uid } => Some(Details::IndexCompaction {
index_uid: index_uid.clone(),
pre_compaction_size: None,
post_compaction_size: None,
}),
}
}
}
@@ -601,7 +579,6 @@ pub enum Kind {
SnapshotCreation,
Export,
UpgradeDatabase,
IndexCompaction,
}
impl Kind {
@@ -613,8 +590,7 @@ impl Kind {
| Kind::SettingsUpdate
| Kind::IndexCreation
| Kind::IndexDeletion
| Kind::IndexUpdate
| Kind::IndexCompaction => true,
| Kind::IndexUpdate => true,
Kind::IndexSwap
| Kind::TaskCancelation
| Kind::TaskDeletion
@@ -642,7 +618,6 @@ impl Display for Kind {
Kind::SnapshotCreation => write!(f, "snapshotCreation"),
Kind::Export => write!(f, "export"),
Kind::UpgradeDatabase => write!(f, "upgradeDatabase"),
Kind::IndexCompaction => write!(f, "indexCompaction"),
}
}
}
@@ -678,8 +653,6 @@ impl FromStr for Kind {
Ok(Kind::Export)
} else if kind.eq_ignore_ascii_case("upgradeDatabase") {
Ok(Kind::UpgradeDatabase)
} else if kind.eq_ignore_ascii_case("indexCompaction") {
Ok(Kind::IndexCompaction)
} else {
Err(ParseTaskKindError(kind.to_owned()))
}
@@ -765,11 +738,6 @@ pub enum Details {
from: (u32, u32, u32),
to: (u32, u32, u32),
},
IndexCompaction {
index_uid: String,
pre_compaction_size: Option<Byte>,
post_compaction_size: Option<Byte>,
},
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
@@ -832,10 +800,6 @@ impl Details {
Self::ClearAll { deleted_documents } => *deleted_documents = Some(0),
Self::TaskCancelation { canceled_tasks, .. } => *canceled_tasks = Some(0),
Self::TaskDeletion { deleted_tasks, .. } => *deleted_tasks = Some(0),
Self::IndexCompaction { pre_compaction_size, post_compaction_size, .. } => {
*pre_compaction_size = None;
*post_compaction_size = None;
}
Self::SettingsUpdate { .. }
| Self::IndexInfo { .. }
| Self::Dump { .. }

View File

@@ -1,84 +0,0 @@
use actix_web::web::{self, Data};
use actix_web::{HttpRequest, HttpResponse};
use index_scheduler::IndexScheduler;
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::keys::actions;
use meilisearch_types::tasks::KindWithContent;
use tracing::debug;
use utoipa::OpenApi;
use super::ActionPolicy;
use crate::analytics::Analytics;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::SummarizedTaskView;
#[derive(OpenApi)]
#[openapi(
paths(compact),
tags(
(
name = "Compact an index",
description = "The /compact route uses compacts the database to reorganize and make it smaller and more efficient.",
external_docs(url = "https://www.meilisearch.com/docs/reference/api/compact"),
),
),
)]
pub struct CompactApi;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(compact))));
}
/// Compact an index
#[utoipa::path(
post,
path = "{indexUid}/compact",
tag = "Compact an index",
security(("Bearer" = ["search", "*"])),
params(("indexUid" = String, Path, example = "movies", description = "Index Unique Identifier", nullable = false)),
responses(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": null,
"status": "enqueued",
"type": "documentDeletion",
"enqueuedAt": "2024-08-08T17:05:55.791772Z"
}
)),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
"message": "The Authorization header is missing. It must use the bearer authorization method.",
"code": "missing_authorization_header",
"type": "auth",
"link": "https://docs.meilisearch.com/errors#missing_authorization_header"
}
)),
)
)]
pub async fn compact(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_COMPACT }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
req: HttpRequest,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
analytics.publish(IndexCompacted::default(), &req);
let task = KindWithContent::IndexCompaction { index_uid: index_uid.to_string() };
let task =
match tokio::task::spawn_blocking(move || index_scheduler.register(task, None, false))
.await?
{
Ok(task) => task,
Err(e) => return Err(e.into()),
};
debug!(returns = ?task, "Compact the {index_uid} index");
Ok(HttpResponse::Accepted().json(SummarizedTaskView::from(task)))
}
crate::empty_analytics!(IndexCompacted, "Index Compacted");

View File

@@ -28,7 +28,6 @@ use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::is_dry_run;
use crate::Opt;
pub mod compact;
pub mod documents;
mod enterprise_edition;
pub mod facet_search;
@@ -50,9 +49,8 @@ pub use enterprise_edition::proxy::{PROXY_ORIGIN_REMOTE_HEADER, PROXY_ORIGIN_TAS
(path = "/", api = facet_search::FacetSearchApi),
(path = "/", api = similar::SimilarApi),
(path = "/", api = settings::SettingsApi),
(path = "/", api = compact::CompactApi),
),
paths(list_indexes, create_index, get_index, update_index, delete_index, get_index_stats, compact::compact),
paths(list_indexes, create_index, get_index, update_index, delete_index, get_index_stats),
tags(
(
name = "Indexes",
@@ -82,8 +80,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::scope("/search").configure(search::configure))
.service(web::scope("/facet-search").configure(facet_search::configure))
.service(web::scope("/similar").configure(similar::configure))
.service(web::scope("/settings").configure(settings::configure))
.service(web::scope("/compact").configure(compact::configure)),
.service(web::scope("/settings").configure(settings::configure)),
);
}

View File

@@ -22,12 +22,11 @@ use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::metrics::MEILISEARCH_DEGRADED_SEARCH_REQUESTS;
use crate::routes::indexes::search_analytics::{SearchAggregator, SearchGET, SearchPOST};
use crate::routes::parse_include_metadata_header;
use crate::search::{
add_search_rules, perform_search, HybridQuery, MatchingStrategy, RankingScoreThreshold,
RetrieveVectors, SearchKind, SearchParams, SearchQuery, SearchResult, SemanticRatio,
DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, DEFAULT_SEMANTIC_RATIO,
RetrieveVectors, SearchKind, SearchQuery, SearchResult, SemanticRatio, DEFAULT_CROP_LENGTH,
DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG,
DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET, DEFAULT_SEMANTIC_RATIO,
};
use crate::search_queue::SearchQueue;
@@ -346,20 +345,15 @@ pub async fn search_with_url_query(
search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors);
let permit = search_queue.try_get_search_permit().await?;
let include_metadata = parse_include_metadata_header(&req);
let search_result = tokio::task::spawn_blocking(move || {
perform_search(
SearchParams {
index_uid: index_uid.to_string(),
query,
search_kind,
retrieve_vectors: retrieve_vector,
features: index_scheduler.features(),
request_uid,
include_metadata,
},
index_uid.to_string(),
&index,
query,
search_kind,
retrieve_vector,
index_scheduler.features(),
request_uid,
)
})
.await;
@@ -459,21 +453,16 @@ pub async fn search_with_post(
search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index)?;
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors);
let include_metadata = parse_include_metadata_header(&req);
let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || {
perform_search(
SearchParams {
index_uid: index_uid.to_string(),
query,
search_kind,
retrieve_vectors,
features: index_scheduler.features(),
request_uid,
include_metadata,
},
index_uid.to_string(),
&index,
query,
search_kind,
retrieve_vectors,
index_scheduler.features(),
request_uid,
)
})
.await;

View File

@@ -235,7 +235,6 @@ impl<Method: AggregateMethod> SearchAggregator<Method> {
degraded,
used_negative_operator,
request_uid: _,
metadata: _,
} = result;
self.total_succeeded = self.total_succeeded.saturating_add(1);

View File

@@ -45,7 +45,6 @@ use crate::routes::webhooks::{WebhookResults, WebhookSettings, WebhookWithMetada
use crate::search::{
FederatedSearch, FederatedSearchResult, Federation, FederationOptions, MergeFacets,
SearchQueryWithIndex, SearchResultWithIndex, SimilarQuery, SimilarResult,
INCLUDE_METADATA_HEADER,
};
use crate::search_queue::SearchQueue;
use crate::Opt;
@@ -185,18 +184,6 @@ pub fn is_dry_run(req: &HttpRequest, opt: &Opt) -> Result<bool, ResponseError> {
.is_some_and(|s| s.to_lowercase() == "true"))
}
/// Parse the `Meili-Include-Metadata` header from an HTTP request.
///
/// Returns `true` if the header is present and set to "true" or "1" (case-insensitive).
/// Returns `false` if the header is not present or has any other value.
pub fn parse_include_metadata_header(req: &HttpRequest) -> bool {
req.headers()
.get(INCLUDE_METADATA_HEADER)
.and_then(|h| h.to_str().ok())
.map(|v| matches!(v.to_lowercase().as_str(), "true" | "1"))
.unwrap_or(false)
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct SummarizedTaskView {

View File

@@ -18,11 +18,10 @@ use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::indexes::search::search_kind;
use crate::routes::parse_include_metadata_header;
use crate::search::{
add_search_rules, perform_federated_search, perform_search, FederatedSearch,
FederatedSearchResult, RetrieveVectors, SearchParams, SearchQueryWithIndex,
SearchResultWithIndex, PROXY_SEARCH_HEADER, PROXY_SEARCH_HEADER_VALUE,
FederatedSearchResult, RetrieveVectors, SearchQueryWithIndex, SearchResultWithIndex,
PROXY_SEARCH_HEADER, PROXY_SEARCH_HEADER_VALUE,
};
use crate::search_queue::SearchQueue;
@@ -189,7 +188,6 @@ pub async fn multi_search_with_post(
err
})?;
let include_metadata = parse_include_metadata_header(&req);
let response = match federation {
Some(federation) => {
debug!(
@@ -211,7 +209,6 @@ pub async fn multi_search_with_post(
features,
is_proxy,
request_uid,
include_metadata,
)
.await;
permit.drop().await;
@@ -282,16 +279,13 @@ pub async fn multi_search_with_post(
let search_result = tokio::task::spawn_blocking(move || {
perform_search(
SearchParams {
index_uid: index_uid_str.clone(),
query,
search_kind,
retrieve_vectors: retrieve_vector,
features,
request_uid,
include_metadata,
},
index_uid_str.clone(),
&index,
query,
search_kind,
retrieve_vector,
features,
request_uid,
)
})
.await

View File

@@ -226,14 +226,14 @@ mod tests {
{
let params = "types=createIndex";
let err = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap_err();
snapshot!(meili_snap::json_string!(err), @r###"
snapshot!(meili_snap::json_string!(err), @r#"
{
"message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`, `indexCompaction`.",
"message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
}
"###);
"#);
}
}
#[test]

View File

@@ -22,8 +22,7 @@ use uuid::Uuid;
use super::super::ranking_rules::{self, RankingRules};
use super::super::{
compute_facet_distribution_stats, prepare_search, AttributesFormat, ComputedFacets, HitMaker,
HitsInfo, RetrieveVectors, SearchHit, SearchKind, SearchMetadata, SearchQuery,
SearchQueryWithIndex,
HitsInfo, RetrieveVectors, SearchHit, SearchKind, SearchQuery, SearchQueryWithIndex,
};
use super::proxy::{proxy_search, ProxySearchError, ProxySearchParams};
use super::types::{
@@ -42,57 +41,32 @@ pub async fn perform_federated_search(
features: RoFeatures,
is_proxy: bool,
request_uid: Uuid,
include_metadata: bool,
) -> Result<FederatedSearchResult, ResponseError> {
if is_proxy {
features.check_network("Performing a remote federated search")?;
}
let before_search = std::time::Instant::now();
let timeout = std::env::var("MEILI_EXPERIMENTAL_REMOTE_SEARCH_TIMEOUT_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(25);
let deadline = before_search + std::time::Duration::from_secs(timeout);
let deadline = before_search + std::time::Duration::from_secs(9);
let required_hit_count = federation.limit + federation.offset;
let retrieve_vectors = queries.iter().any(|q| q.retrieve_vectors);
let network = index_scheduler.network();
// Preconstruct metadata keeping the original queries order for later metadata building
let precomputed_query_metadata: Option<Vec<_>> = include_metadata.then(|| {
queries
.iter()
.map(|q| {
(
q.index_uid.to_string(),
q.federation_options.as_ref().and_then(|o| o.remote.clone()),
)
})
.collect()
});
// this implementation partition the queries by index to guarantee an important property:
// - all the queries to a particular index use the same read transaction.
// This is an important property, otherwise we cannot guarantee the self-consistency of the results.
// 1. partition queries by host and index
let mut partitioned_queries = PartitionedQueries::new();
for (query_index, federated_query) in queries.into_iter().enumerate() {
partitioned_queries.partition(federated_query, query_index, &network, features)?
}
// 2. perform queries, merge and make hits index by index
// 2.1. start remote queries
let remote_search = RemoteSearch::start(
partitioned_queries.remote_queries_by_host,
&federation,
deadline,
include_metadata,
);
let remote_search =
RemoteSearch::start(partitioned_queries.remote_queries_by_host, &federation, deadline);
// 2.2. concurrently execute local queries
let params = SearchByIndexParams {
@@ -134,25 +108,11 @@ pub async fn perform_federated_search(
let after_waiting_remote_results = std::time::Instant::now();
// 3. merge hits and metadata across indexes and hosts
// 3.1. Build metadata in the same order as the original queries
let query_metadata = precomputed_query_metadata.map(|precomputed_query_metadata| {
// If a remote is present, set the local remote name
let local_remote_name = network.local.clone().filter(|_| partitioned_queries.has_remote);
build_query_metadata(
precomputed_query_metadata,
local_remote_name,
&remote_results,
&results_by_index,
)
});
// 3.2. merge federation metadata
// 3.1. merge metadata
let (estimated_total_hits, degraded, used_negative_operator, facets, max_remote_duration) =
merge_metadata(&mut results_by_index, &remote_results);
// 3.3. merge hits
// 3.2. merge hits
let merged_hits: Vec<_> = merge_index_global_results(results_by_index, &mut remote_results)
.skip(federation.offset)
.take(federation.limit)
@@ -167,7 +127,7 @@ pub async fn perform_federated_search(
.map(|hit| hit.hit())
.collect();
// 3.4. merge query vectors
// 3.3. merge query vectors
let query_vectors = if retrieve_vectors {
for remote_results in remote_results.iter_mut() {
if let Some(remote_vectors) = remote_results.query_vectors.take() {
@@ -186,7 +146,7 @@ pub async fn perform_federated_search(
None
};
// 3.5. merge facets
// 3.4. merge facets
let (facet_distribution, facet_stats, facets_by_index) =
facet_order.merge(federation.merge_facets, remote_results, facets);
@@ -213,7 +173,6 @@ pub async fn perform_federated_search(
facets_by_index,
remote_errors: partitioned_queries.has_remote.then_some(remote_errors),
request_uid: Some(request_uid),
metadata: query_metadata,
})
}
@@ -437,7 +396,6 @@ struct SearchHitByIndex {
struct SearchResultByIndex {
index: String,
primary_key: Option<String>,
hits: Vec<SearchHitByIndex>,
estimated_total_hits: usize,
degraded: bool,
@@ -445,61 +403,6 @@ struct SearchResultByIndex {
facets: Option<ComputedFacets>,
}
/// Builds query metadata for federated search results.
///
/// This function creates metadata for each query in the same order as the original queries,
/// combining information from both local and remote search results. It handles the mapping
/// of primary keys to their respective indexes and remotes to prevent collisions when
/// multiple remotes have the same index_uid but different primary keys.
fn build_query_metadata(
precomputed_query_metadata: Vec<(String, Option<String>)>,
local_remote_name: Option<String>,
remote_results: &[FederatedSearchResult],
results_by_index: &[SearchResultByIndex],
) -> Vec<SearchMetadata> {
// Create a map of (remote, index_uid) -> primary_key for quick lookup
// This prevents collisions when multiple remotes have the same index_uid but different primary keys
let mut primary_key_per_index = std::collections::HashMap::new();
// Build metadata for remote results
for remote_result in remote_results {
if let Some(remote_metadata) = &remote_result.metadata {
for remote_meta in remote_metadata {
if let SearchMetadata {
remote: Some(remote_name),
index_uid,
primary_key: Some(primary_key),
..
} = remote_meta
{
let key = (Some(remote_name), index_uid);
primary_key_per_index.insert(key, primary_key);
}
}
}
}
// Build metadata for local results
for local_meta in results_by_index {
if let SearchResultByIndex { index, primary_key: Some(primary_key), .. } = local_meta {
let key = (None, index);
primary_key_per_index.insert(key, primary_key);
}
}
// Build metadata in the same order as the original queries
let mut query_metadata = Vec::new();
for (index_uid, remote) in precomputed_query_metadata {
let primary_key =
primary_key_per_index.get(&(remote.as_ref(), &index_uid)).map(|pk| pk.to_string());
let query_uid = Uuid::now_v7();
// if the remote is not set, use the local remote name
let remote = remote.or_else(|| local_remote_name.clone());
query_metadata.push(SearchMetadata { query_uid, primary_key, index_uid, remote });
}
query_metadata
}
fn merge_metadata(
results_by_index: &mut Vec<SearchResultByIndex>,
remote_results: &Vec<FederatedSearchResult>,
@@ -511,7 +414,6 @@ fn merge_metadata(
let mut max_remote_duration = Duration::ZERO;
for SearchResultByIndex {
index,
primary_key: _,
hits: _,
estimated_total_hits: estimated_total_hits_by_index,
facets: facets_by_index,
@@ -540,7 +442,6 @@ fn merge_metadata(
degraded: degraded_for_host,
used_negative_operator: host_used_negative_operator,
remote_errors: _,
metadata: _,
request_uid: _,
} in remote_results
{
@@ -669,12 +570,7 @@ struct RemoteSearch {
}
impl RemoteSearch {
fn start(
queries: RemoteQueriesByHost,
federation: &Federation,
deadline: Instant,
include_metadata: bool,
) -> Self {
fn start(queries: RemoteQueriesByHost, federation: &Federation, deadline: Instant) -> Self {
let mut in_flight_remote_queries = BTreeMap::new();
let client = reqwest::ClientBuilder::new()
.connect_timeout(std::time::Duration::from_millis(200))
@@ -694,10 +590,7 @@ impl RemoteSearch {
// never merge distant facets
proxy_federation.merge_facets = None;
let params = params.clone();
async move {
proxy_search(&node, queries, proxy_federation, &params, include_metadata)
.await
}
async move { proxy_search(&node, queries, proxy_federation, &params).await }
}),
);
}
@@ -741,13 +634,6 @@ impl RemoteSearch {
continue 'remote_queries;
}
// Add remote name to metadata
if let Some(metadata) = res.metadata.as_mut() {
for meta in metadata {
meta.remote = Some(node_name.clone());
}
}
federation.insert(
FEDERATION_REMOTE.to_string(),
serde_json::Value::String(node_name.clone()),
@@ -843,7 +729,6 @@ impl SearchByIndex {
}
};
let rtxn = index.read_txn()?;
let primary_key = index.primary_key(&rtxn)?.map(|pk| pk.to_string());
let criteria = index.criteria(&rtxn)?;
let dictionary = index.dictionary(&rtxn)?;
let dictionary: Option<Vec<_>> =
@@ -870,12 +755,6 @@ impl SearchByIndex {
return Err(error);
}
let mut results_by_query = Vec::with_capacity(queries.len());
// all queries for an index share the same budget
let time_budget = match cutoff {
Some(cutoff) => TimeBudget::new(Duration::from_millis(cutoff)),
None => TimeBudget::default(),
};
for QueryByIndex { query, weight, query_index } in queries {
// use an immediately invoked lambda to capture the result without returning from the function
@@ -945,13 +824,17 @@ impl SearchByIndex {
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors);
let time_budget = match cutoff {
Some(cutoff) => TimeBudget::new(Duration::from_millis(cutoff)),
None => TimeBudget::default(),
};
let (mut search, _is_finite_pagination, _max_total_hits, _offset) = prepare_search(
&index,
&rtxn,
&query,
&search_kind,
// clones of `TimeBudget` share the budget rather than restart it
time_budget.clone(),
time_budget,
params.features,
)?;
@@ -1098,7 +981,6 @@ impl SearchByIndex {
})?;
self.results_by_index.push(SearchResultByIndex {
index: index_uid,
primary_key,
hits: merged_result,
estimated_total_hits,
degraded,

View File

@@ -7,7 +7,7 @@ use serde::de::DeserializeOwned;
use serde_json::Value;
use super::types::{FederatedSearch, FederatedSearchResult, Federation};
use crate::search::{SearchQueryWithIndex, INCLUDE_METADATA_HEADER};
use crate::search::SearchQueryWithIndex;
pub const PROXY_SEARCH_HEADER: &str = "Meili-Proxy-Search";
pub const PROXY_SEARCH_HEADER_VALUE: &str = "true";
@@ -98,7 +98,6 @@ pub async fn proxy_search(
queries: Vec<SearchQueryWithIndex>,
federation: Federation,
params: &ProxySearchParams,
include_metadata: bool,
) -> Result<FederatedSearchResult, ProxySearchError> {
let url = format!("{}/multi-search", node.url);
@@ -106,12 +105,7 @@ pub async fn proxy_search(
let search_api_key = node.search_api_key.as_deref();
let timeout = std::env::var("MEILI_EXPERIMENTAL_REMOTE_SEARCH_TIMEOUT_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(25);
let max_deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout);
let max_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
let deadline = if let Some(deadline) = params.deadline {
std::time::Instant::min(deadline, max_deadline)
@@ -120,16 +114,7 @@ pub async fn proxy_search(
};
for i in 0..params.try_count {
match try_proxy_search(
&url,
search_api_key,
&federated,
&params.client,
deadline,
include_metadata,
)
.await
{
match try_proxy_search(&url, search_api_key, &federated, &params.client, deadline).await {
Ok(response) => return Ok(response),
Err(retry) => {
let duration = retry.into_duration(i)?;
@@ -137,7 +122,7 @@ pub async fn proxy_search(
}
}
}
try_proxy_search(&url, search_api_key, &federated, &params.client, deadline, include_metadata)
try_proxy_search(&url, search_api_key, &federated, &params.client, deadline)
.await
.map_err(Retry::into_error)
}
@@ -148,7 +133,6 @@ async fn try_proxy_search(
federated: &FederatedSearch,
client: &Client,
deadline: std::time::Instant,
include_metadata: bool,
) -> Result<FederatedSearchResult, Retry> {
let timeout = deadline.saturating_duration_since(std::time::Instant::now());
@@ -159,8 +143,6 @@ async fn try_proxy_search(
request
};
let request = request.header(PROXY_SEARCH_HEADER, PROXY_SEARCH_HEADER_VALUE);
let request =
if include_metadata { request.header(INCLUDE_METADATA_HEADER, "true") } else { request };
let response = request.send().await;
let response = match response {

View File

@@ -18,8 +18,6 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
use crate::search::SearchMetadata;
use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex};
use crate::milli::vector::Embedding;
@@ -136,8 +134,6 @@ pub struct FederatedSearchResult {
pub facets_by_index: FederatedFacets,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub request_uid: Option<Uuid>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<Vec<SearchMetadata>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub remote_errors: Option<BTreeMap<String, ResponseError>>,
@@ -164,7 +160,6 @@ impl fmt::Debug for FederatedSearchResult {
facets_by_index,
remote_errors,
request_uid,
metadata,
} = self;
let mut debug = f.debug_struct("SearchResult");
@@ -200,9 +195,6 @@ impl fmt::Debug for FederatedSearchResult {
if let Some(request_uid) = request_uid {
debug.field("request_uid", &request_uid);
}
if let Some(metadata) = metadata {
debug.field("metadata", &metadata);
}
debug.finish()
}

View File

@@ -57,7 +57,6 @@ pub const DEFAULT_CROP_MARKER: fn() -> String = || "…".to_string();
pub const DEFAULT_HIGHLIGHT_PRE_TAG: fn() -> String = || "<em>".to_string();
pub const DEFAULT_HIGHLIGHT_POST_TAG: fn() -> String = || "</em>".to_string();
pub const DEFAULT_SEMANTIC_RATIO: fn() -> SemanticRatio = || SemanticRatio(0.5);
pub const INCLUDE_METADATA_HEADER: &str = "Meili-Include-Metadata";
#[derive(Clone, Default, PartialEq, Deserr, ToSchema)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
@@ -837,18 +836,6 @@ pub struct SearchHit {
pub ranking_score_details: Option<serde_json::Map<String, serde_json::Value>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct SearchMetadata {
pub query_uid: Uuid,
pub index_uid: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub primary_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub remote: Option<String>,
}
#[derive(Serialize, Clone, PartialEq, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
@@ -867,8 +854,6 @@ pub struct SearchResult {
pub facet_stats: Option<BTreeMap<String, FacetStats>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub request_uid: Option<Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<SearchMetadata>,
#[serde(skip_serializing_if = "Option::is_none")]
pub semantic_hit_count: Option<u32>,
@@ -891,7 +876,6 @@ impl fmt::Debug for SearchResult {
facet_distribution,
facet_stats,
request_uid,
metadata,
semantic_hit_count,
degraded,
used_negative_operator,
@@ -924,9 +908,6 @@ impl fmt::Debug for SearchResult {
if let Some(request_uid) = request_uid {
debug.field("request_uid", &request_uid);
}
if let Some(metadata) = metadata {
debug.field("metadata", &metadata);
}
debug.finish()
}
@@ -1139,28 +1120,16 @@ pub fn prepare_search<'t>(
Ok((search, is_finite_pagination, max_total_hits, offset))
}
pub struct SearchParams {
pub index_uid: String,
pub query: SearchQuery,
pub search_kind: SearchKind,
pub retrieve_vectors: RetrieveVectors,
pub features: RoFeatures,
pub request_uid: Uuid,
pub include_metadata: bool,
}
pub fn perform_search(params: SearchParams, index: &Index) -> Result<SearchResult, ResponseError> {
let SearchParams {
index_uid,
query,
search_kind,
retrieve_vectors,
features,
request_uid,
include_metadata,
} = params;
pub fn perform_search(
index_uid: String,
index: &Index,
query: SearchQuery,
search_kind: SearchKind,
retrieve_vectors: RetrieveVectors,
features: RoFeatures,
request_uid: Uuid,
) -> Result<SearchResult, ResponseError> {
let before_search = Instant::now();
let index_uid_for_metadata = index_uid.clone();
let rtxn = index.read_txn()?;
let time_budget = match index.search_cutoff(&rtxn)? {
Some(cutoff) => TimeBudget::new(Duration::from_millis(cutoff)),
@@ -1181,20 +1150,7 @@ pub fn perform_search(params: SearchParams, index: &Index) -> Result<SearchResul
query_vector,
},
semantic_hit_count,
) = search_from_kind(index_uid.clone(), search_kind, search)?;
let metadata = if include_metadata {
let query_uid = Uuid::now_v7();
let primary_key = index.primary_key(&rtxn)?.map(|pk| pk.to_string());
Some(SearchMetadata {
query_uid,
index_uid: index_uid_for_metadata,
primary_key,
remote: None, // Local searches don't have a remote
})
} else {
None
};
) = search_from_kind(index_uid, search_kind, search)?;
let SearchQuery {
q,
@@ -1277,6 +1233,7 @@ pub fn perform_search(params: SearchParams, index: &Index) -> Result<SearchResul
.transpose()?
.map(|ComputedFacets { distribution, stats }| (distribution, stats))
.unzip();
let result = SearchResult {
hits: documents,
hits_info,
@@ -1289,7 +1246,6 @@ pub fn perform_search(params: SearchParams, index: &Index) -> Result<SearchResul
used_negative_operator,
semantic_hit_count,
request_uid: Some(request_uid),
metadata,
};
Ok(result)
}

View File

@@ -419,14 +419,14 @@ async fn error_add_api_key_invalid_parameters_actions() {
let (response, code) = server.add_api_key(content).await;
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response, { ".createdAt" => "[ignored]", ".updatedAt" => "[ignored]" }), @r###"
meili_snap::snapshot!(meili_snap::json_string!(response, { ".createdAt" => "[ignored]", ".updatedAt" => "[ignored]" }), @r#"
{
"message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`, `*.get`, `webhooks.get`, `webhooks.update`, `webhooks.delete`, `webhooks.create`, `webhooks.*`, `indexes.compact`",
"message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`, `*.get`, `webhooks.get`, `webhooks.update`, `webhooks.delete`, `webhooks.create`, `webhooks.*`",
"code": "invalid_api_key_actions",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_api_key_actions"
}
"###);
"#);
}
#[actix_rt::test]

View File

@@ -91,14 +91,14 @@ async fn create_api_key_bad_actions() {
// can't parse
let (response, code) = server.add_api_key(json!({ "actions": ["doggo"] })).await;
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###"
snapshot!(json_string!(response), @r#"
{
"message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`, `*.get`, `webhooks.get`, `webhooks.update`, `webhooks.delete`, `webhooks.create`, `webhooks.*`, `indexes.compact`",
"message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`, `*.get`, `webhooks.get`, `webhooks.update`, `webhooks.delete`, `webhooks.create`, `webhooks.*`",
"code": "invalid_api_key_actions",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_api_key_actions"
}
"###);
"#);
}
#[actix_rt::test]

View File

@@ -40,14 +40,14 @@ async fn batch_bad_types() {
let (response, code) = server.batches_filter("types=doggo").await;
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###"
snapshot!(json_string!(response), @r#"
{
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`, `indexCompaction`.",
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
}
"###);
"#);
}
#[actix_rt::test]

View File

@@ -516,18 +516,6 @@ impl<State> Index<'_, State> {
self.service.post_encoded(url, query, self.encoder).await
}
pub async fn search_with_headers(
&self,
query: Value,
headers: Vec<(&str, &str)>,
) -> (Value, StatusCode) {
let url = format!("/indexes/{}/search", urlencode(self.uid.as_ref()));
let body = serde_json::to_string(&query).unwrap();
let mut all_headers = vec![("content-type", "application/json")];
all_headers.extend(headers);
self.service.post_str(url, body, all_headers).await
}
pub async fn search_get(&self, query: &str) -> (Value, StatusCode) {
let url = format!("/indexes/{}/search{}", urlencode(self.uid.as_ref()), query);
self.service.get(url).await

View File

@@ -390,17 +390,6 @@ impl<State> Server<State> {
self.service.post("/multi-search", queries).await
}
pub async fn multi_search_with_headers(
&self,
queries: Value,
headers: Vec<(&str, &str)>,
) -> (Value, StatusCode) {
let body = serde_json::to_string(&queries).unwrap();
let mut all_headers = vec![("content-type", "application/json")];
all_headers.extend(headers);
self.service.post_str("/multi-search", body, all_headers).await
}
pub async fn list_indexes_raw(&self, parameters: &str) -> (Value, StatusCode) {
self.service.get(format!("/indexes{parameters}")).await
}

View File

@@ -1,387 +0,0 @@
use meili_snap::{json_string, snapshot};
use crate::common::{shared_index_with_documents, Server, DOCUMENTS};
use crate::json;
#[actix_rt::test]
async fn search_without_metadata_header() {
let index = shared_index_with_documents().await;
// Test that metadata is not included by default
index
.search(json!({"q": "glass"}), |response, code| {
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".processingTimeMs" => "[duration]", ".requestUid" => "[uuid]" }), @r###"
{
"hits": [
{
"title": "Gläss",
"id": "450465",
"color": [
"blue",
"red"
]
}
],
"query": "glass",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]"
}
"###);
})
.await;
}
#[actix_rt::test]
async fn search_with_metadata_header() {
let server = Server::new_shared();
let index = server.unique_index();
let documents = DOCUMENTS.clone();
let (task, _code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
// Test with Meili-Include-Metadata header
let (response, code) = index
.search_with_headers(json!({"q": "glass"}), vec![("Meili-Include-Metadata", "true")])
.await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".processingTimeMs" => "[duration]", ".requestUid" => "[uuid]", ".metadata.queryUid" => "[uuid]" }), @r###"
{
"hits": [
{
"title": "Gläss",
"id": "450465",
"color": [
"blue",
"red"
]
}
],
"query": "glass",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]",
"metadata": {
"queryUid": "[uuid]",
"indexUid": "[uuid]",
"primaryKey": "id"
}
}
"###);
}
#[actix_rt::test]
async fn search_with_metadata_header_and_primary_key() {
let server = Server::new_shared();
let index = server.unique_index();
let documents = DOCUMENTS.clone();
let (task, _code) = index.add_documents(documents, Some("id")).await;
server.wait_task(task.uid()).await.succeeded();
// Test with Meili-Include-Metadata header
let (response, code) = index
.search_with_headers(json!({"q": "glass"}), vec![("Meili-Include-Metadata", "true")])
.await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".processingTimeMs" => "[duration]", ".requestUid" => "[uuid]", ".metadata.queryUid" => "[uuid]" }), @r###"
{
"hits": [
{
"id": "450465",
"title": "Gläss",
"color": [
"blue",
"red"
]
}
],
"query": "glass",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]",
"metadata": {
"queryUid": "[uuid]",
"indexUid": "[uuid]",
"primaryKey": "id"
}
}
"###);
}
#[actix_rt::test]
async fn multi_search_without_metadata_header() {
let server = Server::new_shared();
let index = server.unique_index();
let documents = DOCUMENTS.clone();
let (task, _code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
// Test multi-search without metadata header
let (response, code) = server
.multi_search(json!({
"queries": [
{"indexUid": index.uid, "q": "glass"},
{"indexUid": index.uid, "q": "dragon"}
]
}))
.await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".results[0].processingTimeMs" => "[duration]", ".results[0].requestUid" => "[uuid]", ".results[1].processingTimeMs" => "[duration]", ".results[1].requestUid" => "[uuid]" }), @r###"
{
"results": [
{
"indexUid": "[uuid]",
"hits": [
{
"title": "Gläss",
"id": "450465",
"color": [
"blue",
"red"
]
}
],
"query": "glass",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]"
},
{
"indexUid": "[uuid]",
"hits": [
{
"title": "How to Train Your Dragon: The Hidden World",
"id": "166428",
"color": [
"green",
"red"
]
}
],
"query": "dragon",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]"
}
]
}
"###);
}
#[actix_rt::test]
async fn multi_search_with_metadata_header() {
let server = Server::new_shared();
let index = server.unique_index();
let documents = DOCUMENTS.clone();
let (task, _code) = index.add_documents(documents, Some("id")).await;
server.wait_task(task.uid()).await.succeeded();
// Test multi-search with metadata header
let (response, code) = server
.multi_search_with_headers(
json!({
"queries": [
{"indexUid": index.uid, "q": "glass"},
{"indexUid": index.uid, "q": "dragon"}
]
}),
vec![("Meili-Include-Metadata", "true")],
)
.await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".results[0].processingTimeMs" => "[duration]", ".results[0].requestUid" => "[uuid]", ".results[0].metadata.queryUid" => "[uuid]", ".results[1].processingTimeMs" => "[duration]", ".results[1].requestUid" => "[uuid]", ".results[1].metadata.queryUid" => "[uuid]" }), @r###"
{
"results": [
{
"indexUid": "[uuid]",
"hits": [
{
"id": "450465",
"title": "Gläss",
"color": [
"blue",
"red"
]
}
],
"query": "glass",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]",
"metadata": {
"queryUid": "[uuid]",
"indexUid": "[uuid]",
"primaryKey": "id"
}
},
{
"indexUid": "[uuid]",
"hits": [
{
"id": "166428",
"title": "How to Train Your Dragon: The Hidden World",
"color": [
"green",
"red"
]
}
],
"query": "dragon",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]",
"metadata": {
"queryUid": "[uuid]",
"indexUid": "[uuid]",
"primaryKey": "id"
}
}
]
}
"###);
}
#[actix_rt::test]
async fn search_metadata_header_false_value() {
let server = Server::new_shared();
let index = server.unique_index();
let documents = DOCUMENTS.clone();
let (task, _code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
// Test with header set to false
let (response, code) = index
.search_with_headers(json!({"q": "glass"}), vec![("Meili-Include-Metadata", "false")])
.await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".processingTimeMs" => "[duration]", ".requestUid" => "[uuid]" }), @r###"
{
"hits": [
{
"title": "Gläss",
"id": "450465",
"color": [
"blue",
"red"
]
}
],
"query": "glass",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]"
}
"###);
}
#[actix_rt::test]
async fn search_metadata_uuid_format() {
let server = Server::new_shared();
let index = server.unique_index();
let documents = DOCUMENTS.clone();
let (task, _code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, code) = index
.search_with_headers(json!({"q": "glass"}), vec![("Meili-Include-Metadata", "true")])
.await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".processingTimeMs" => "[duration]", ".requestUid" => "[uuid]", ".metadata.queryUid" => "[uuid]" }), @r###"
{
"hits": [
{
"title": "Gläss",
"id": "450465",
"color": [
"blue",
"red"
]
}
],
"query": "glass",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]",
"metadata": {
"queryUid": "[uuid]",
"indexUid": "[uuid]",
"primaryKey": "id"
}
}
"###);
}
#[actix_rt::test]
async fn search_metadata_consistency_across_requests() {
let server = Server::new_shared();
let index = server.unique_index();
let documents = DOCUMENTS.clone();
let (task, _code) = index.add_documents(documents, Some("id")).await;
server.wait_task(task.uid()).await.succeeded();
// Make multiple requests and check that metadata is consistent
for _i in 0..3 {
let (response, code) = index
.search_with_headers(json!({"q": "glass"}), vec![("Meili-Include-Metadata", "true")])
.await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".processingTimeMs" => "[duration]", ".requestUid" => "[uuid]", ".metadata.queryUid" => "[uuid]" }), @r###"
{
"hits": [
{
"id": "450465",
"title": "Gläss",
"color": [
"blue",
"red"
]
}
],
"query": "glass",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1,
"requestUid": "[uuid]",
"metadata": {
"queryUid": "[uuid]",
"indexUid": "[uuid]",
"primaryKey": "id"
}
}
"###);
}
}

View File

@@ -11,7 +11,6 @@ mod hybrid;
#[cfg(not(feature = "chinese-pinyin"))]
mod locales;
mod matching_strategy;
mod metadata;
mod multi;
mod pagination;
mod restrict_searchable;
@@ -2129,102 +2128,3 @@ async fn simple_search_changing_unrelated_settings() {
})
.await;
}
#[actix_rt::test]
async fn ranking_score_bug_with_sort() {
let server = Server::new_shared();
let index = server.unique_index();
// Create documents with a "created" field for sorting
let documents = json!([
{
"id": "1",
"title": "Coffee Mug",
"created": "2023-01-01T00:00:00Z"
},
{
"id": "2",
"title": "Water Bottle",
"created": "2023-01-02T00:00:00Z"
},
{
"id": "3",
"title": "Tumbler Cup",
"created": "2023-01-03T00:00:00Z"
},
{
"id": "4",
"title": "Stainless Steel Tumbler",
"created": "2023-01-04T00:00:00Z"
}
]);
// Add documents
let (task, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202, "{task}");
server.wait_task(task.uid()).await.succeeded();
// Configure sortable attributes
let (task, code) = index
.update_settings(json!({
"sortableAttributes": ["created"]
}))
.await;
assert_eq!(code, 202, "{task}");
server.wait_task(task.uid()).await.succeeded();
// Test 1: Search without sort - should have proper ranking scores
index
.search(
json!({
"q": "tumbler",
"showRankingScore": true,
"rankingScoreThreshold": 0.0,
"attributesToRetrieve": ["title"]
}),
|response, code| {
assert_eq!(code, 200, "{response}");
snapshot!(json_string!(response["hits"]), @r###"
[
{
"title": "Tumbler Cup",
"_rankingScore": 0.9848484848484848
},
{
"title": "Stainless Steel Tumbler",
"_rankingScore": 0.8787878787878788
}
]
"###);
},
)
.await;
// Test 2: Search with sort - this is where the bug occurs
index
.search(
json!({
"q": "tumbler",
"showRankingScore": true,
"rankingScoreThreshold": 0.0,
"sort": ["created:desc"],
"attributesToRetrieve": ["title"]
}),
|response, code| {
assert_eq!(code, 200, "{response}");
snapshot!(json_string!(response["hits"]), @r###"
[
{
"title": "Tumbler Cup",
"_rankingScore": 0.9848484848484848
},
{
"title": "Stainless Steel Tumbler",
"_rankingScore": 0.8787878787878788
}
]
"###);
},
)
.await;
}

View File

@@ -2866,8 +2866,7 @@ async fn error_remote_timeout() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {},
"sharding": false
"remotes": {}
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -2875,8 +2874,7 @@ async fn error_remote_timeout() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {},
"sharding": false
"remotes": {}
}
"###);
@@ -2897,7 +2895,7 @@ async fn error_remote_timeout() {
let rms0 = LocalMeili::new(ms0.clone()).await;
let rms1 = LocalMeili::with_params(
ms1.clone(),
LocalMeiliParams { delay: Some(std::time::Duration::from_secs(31)), ..Default::default() },
LocalMeiliParams { delay: Some(std::time::Duration::from_secs(6)), ..Default::default() },
)
.await;

View File

@@ -97,7 +97,7 @@ async fn task_bad_types() {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#"
{
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`, `indexCompaction`.",
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
@@ -108,7 +108,7 @@ async fn task_bad_types() {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#"
{
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`, `indexCompaction`.",
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
@@ -119,7 +119,7 @@ async fn task_bad_types() {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#"
{
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`, `indexCompaction`.",
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types"

View File

@@ -43,7 +43,7 @@ async fn version_too_old() {
std::fs::write(db_path.join("VERSION"), "1.11.9999").unwrap();
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.24.0");
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.22.1");
}
#[actix_rt::test]
@@ -58,7 +58,7 @@ async fn version_requires_downgrade() {
std::fs::write(db_path.join("VERSION"), format!("{major}.{minor}.{patch}")).unwrap();
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
snapshot!(err, @"Database version 1.24.1 is higher than the Meilisearch version 1.24.0. Downgrade is not supported");
snapshot!(err, @"Database version 1.22.2 is higher than the Meilisearch version 1.22.1. Downgrade is not supported");
}
#[actix_rt::test]

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.22.1"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.22.1"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.22.1"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.22.1"
},
"error": null,
"duration": "[duration]",

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.22.1"
},
"error": null,
"duration": "[duration]",

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.22.1"
},
"error": null,
"duration": "[duration]",

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.22.1"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.22.1"
},
"error": null,
"duration": "[duration]",

View File

@@ -126,7 +126,7 @@ enum Command {
/// before running the copy and compaction. This way the current indexation must finish before
/// the compaction operation can start. Once the compaction is done, the big index is replaced
/// by the compacted one and the mutable transaction is released.
IndexCompaction { index_name: String },
CompactIndex { index_name: String },
/// Uses the hair dryer the dedicate pages hot in cache
///
@@ -165,7 +165,7 @@ fn main() -> anyhow::Result<()> {
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
}
Command::IndexCompaction { index_name } => compact_index(db_path, &index_name),
Command::CompactIndex { index_name } => compact_index(db_path, &index_name),
Command::HairDryer { index_name, index_part } => {
hair_dryer(db_path, &index_name, &index_part)
}

View File

@@ -19,7 +19,7 @@ bstr = "1.12.0"
bytemuck = { version = "1.23.1", features = ["extern_crate_alloc"] }
byteorder = "1.5.0"
charabia = { version = "0.9.7", default-features = false }
cellulite = "0.3.1-nested-rtxns-2"
cellulite = "0.3.1-nested-rtxns"
concat-arrays = "0.1.2"
convert_case = "0.8.0"
crossbeam-channel = "0.5.15"
@@ -90,7 +90,7 @@ rhai = { version = "1.22.2", features = [
"sync",
] }
arroy = "0.6.4-nested-rtxns"
hannoy = { version = "0.0.9-nested-rtxns-2", features = ["arroy"] }
hannoy = { version = "0.0.9-nested-rtxns", features = ["arroy"] }
rand = "0.8.5"
tracing = "0.1.41"
ureq = { version = "2.12.1", features = ["json"] }

View File

@@ -66,29 +66,15 @@ impl ScoreDetails {
}
}
/// Calculate the global score of the details.
///
/// It is computed from the ranks of the ranking rules, excluding the sort/geo sort rules.
/// If the details contain a semantic score (ScoreDetails::Vector), it is used instead of the ranking score.
///
/// note: this function expects a maximum of one semantic score, otherwise only the last one will be used.
pub fn global_score<'a>(details: impl Iterator<Item = &'a Self> + 'a) -> f64 {
// Filter out only the ranking scores (Rank values) and exclude sort/geo sort
let mut semantic_score = None;
let ranking_ranks = details.filter_map(|detail| match detail.rank_or_value() {
RankOrValue::Rank(rank) => Some(rank),
RankOrValue::Score(score) => {
semantic_score = Some(score);
None
}
RankOrValue::Sort(_) => None,
RankOrValue::GeoSort(_) => None,
});
let ranking_score = Rank::global_score(ranking_ranks);
// If we have semantic score, use it, otherwise use ranking score
semantic_score.unwrap_or(ranking_score)
Self::score_values(details)
.find_map(|x| {
let ScoreValue::Score(score) = x else {
return None;
};
Some(score)
})
.unwrap_or(1.0f64)
}
pub fn score_values<'a>(

View File

@@ -97,7 +97,7 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
logger.start_iteration_ranking_rule(0, ranking_rules[0].as_ref(), query, universe);
ranking_rules[0].start_iteration(ctx, logger, universe, query, &time_budget)?;
ranking_rules[0].start_iteration(ctx, logger, universe, query)?;
let mut ranking_rule_scores: Vec<ScoreDetails> = vec![];
@@ -168,6 +168,42 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
};
while valid_docids.len() < max_len_to_evaluate {
if time_budget.exceeded() {
loop {
let bucket = std::mem::take(&mut ranking_rule_universes[cur_ranking_rule_index]);
ranking_rule_scores.push(ScoreDetails::Skipped);
// remove candidates from the universe without adding them to result if their score is below the threshold
let is_below_threshold =
ranking_score_threshold.is_some_and(|ranking_score_threshold| {
let current_score = ScoreDetails::global_score(ranking_rule_scores.iter());
current_score < ranking_score_threshold
});
if is_below_threshold {
all_candidates -= &bucket;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(bucket);
}
ranking_rule_scores.pop();
if cur_ranking_rule_index == 0 {
break;
}
back!();
}
return Ok(BucketSortOutput {
scores: valid_scores,
docids: valid_docids,
all_candidates,
degraded: true,
});
}
// The universe for this bucket is zero, so we don't need to sort
// anything, just go back to the parent ranking rule.
if ranking_rule_universes[cur_ranking_rule_index].is_empty()
@@ -180,63 +216,14 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
continue;
}
let next_bucket = if time_budget.exceeded() {
match ranking_rules[cur_ranking_rule_index].non_blocking_next_bucket(
ctx,
logger,
&ranking_rule_universes[cur_ranking_rule_index],
)? {
std::task::Poll::Ready(bucket) => bucket,
std::task::Poll::Pending => {
loop {
let bucket =
std::mem::take(&mut ranking_rule_universes[cur_ranking_rule_index]);
ranking_rule_scores.push(ScoreDetails::Skipped);
// remove candidates from the universe without adding them to result if their score is below the threshold
let is_below_threshold =
ranking_score_threshold.is_some_and(|ranking_score_threshold| {
let current_score =
ScoreDetails::global_score(ranking_rule_scores.iter());
current_score < ranking_score_threshold
});
if is_below_threshold {
all_candidates -= &bucket;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(bucket);
}
ranking_rule_scores.pop();
if cur_ranking_rule_index == 0 {
break;
}
back!();
}
return Ok(BucketSortOutput {
scores: valid_scores,
docids: valid_docids,
all_candidates,
degraded: true,
});
}
}
} else {
let Some(next_bucket) = ranking_rules[cur_ranking_rule_index].next_bucket(
ctx,
logger,
&ranking_rule_universes[cur_ranking_rule_index],
&time_budget,
)?
else {
back!();
continue;
};
next_bucket
let Some(next_bucket) = ranking_rules[cur_ranking_rule_index].next_bucket(
ctx,
logger,
&ranking_rule_universes[cur_ranking_rule_index],
)?
else {
back!();
continue;
};
ranking_rule_scores.push(next_bucket.score);
@@ -288,7 +275,6 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
logger,
&next_bucket.candidates,
&next_bucket.query,
&time_budget,
)?;
}

View File

@@ -6,7 +6,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput};
use crate::score_details::{self, ScoreDetails};
use crate::search::new::query_graph::QueryNodeData;
use crate::search::new::query_term::ExactTerm;
use crate::{CboRoaringBitmapCodec, Result, SearchContext, SearchLogger, TimeBudget};
use crate::{CboRoaringBitmapCodec, Result, SearchContext, SearchLogger};
/// A ranking rule that produces 3 disjoint buckets:
///
@@ -35,7 +35,6 @@ impl<'ctx> RankingRule<'ctx, QueryGraph> for ExactAttribute {
_logger: &mut dyn SearchLogger<QueryGraph>,
universe: &roaring::RoaringBitmap,
query: &QueryGraph,
_time_budget: &TimeBudget,
) -> Result<()> {
self.state = State::start_iteration(ctx, universe, query)?;
Ok(())
@@ -47,7 +46,6 @@ impl<'ctx> RankingRule<'ctx, QueryGraph> for ExactAttribute {
_ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<QueryGraph>,
universe: &roaring::RoaringBitmap,
_time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<QueryGraph>>> {
let state = std::mem::take(&mut self.state);
let (state, output) = State::next(state, universe);

View File

@@ -7,7 +7,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait
use crate::documents::geo_sort::{fill_cache, next_bucket};
use crate::documents::{GeoSortParameter, GeoSortStrategy};
use crate::score_details::{self, ScoreDetails};
use crate::{GeoPoint, Result, SearchContext, SearchLogger, TimeBudget};
use crate::{GeoPoint, Result, SearchContext, SearchLogger};
pub struct GeoSort<Q: RankingRuleQueryTrait> {
query: Option<Q>,
@@ -84,7 +84,6 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for GeoSort<Q> {
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
query: &Q,
_time_budget: &TimeBudget,
) -> Result<()> {
assert!(self.query.is_none());
@@ -111,7 +110,6 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for GeoSort<Q> {
ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
_time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<Q>>> {
let query = self.query.as_ref().unwrap().clone();

View File

@@ -53,7 +53,7 @@ use super::{QueryGraph, RankingRule, RankingRuleOutput, SearchContext};
use crate::score_details::Rank;
use crate::search::new::query_term::LocatedQueryTermSubset;
use crate::search::new::ranking_rule_graph::PathVisitor;
use crate::{Result, TermsMatchingStrategy, TimeBudget};
use crate::{Result, TermsMatchingStrategy};
pub type Words = GraphBasedRankingRule<WordsGraph>;
impl GraphBasedRankingRule<WordsGraph> {
@@ -135,7 +135,6 @@ impl<'ctx, G: RankingRuleGraphTrait> RankingRule<'ctx, QueryGraph> for GraphBase
_logger: &mut dyn SearchLogger<QueryGraph>,
_universe: &RoaringBitmap,
query_graph: &QueryGraph,
_time_budget: &TimeBudget,
) -> Result<()> {
// the `next_max_cost` is the successor integer to the maximum cost of the paths in the graph.
//
@@ -218,7 +217,6 @@ impl<'ctx, G: RankingRuleGraphTrait> RankingRule<'ctx, QueryGraph> for GraphBase
ctx: &mut SearchContext<'ctx>,
logger: &mut dyn SearchLogger<QueryGraph>,
universe: &RoaringBitmap,
_time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<QueryGraph>>> {
// Will crash if `next_bucket` is called before `start_iteration` or after `end_iteration`,
// should never happen

View File

@@ -21,7 +21,7 @@ mod vector_sort;
#[cfg(test)]
mod tests;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::ops::AddAssign;
use std::time::Duration;
@@ -64,12 +64,6 @@ use crate::{
UserError, Weight,
};
/// Cache for synonyms to avoid repeated database access
#[derive(Default)]
pub struct SynonymCache {
pub cache: Option<HashMap<Vec<String>, Vec<Vec<String>>>>,
}
/// A structure used throughout the execution of a search query.
pub struct SearchContext<'ctx> {
pub index: &'ctx Index,
@@ -79,7 +73,6 @@ pub struct SearchContext<'ctx> {
pub phrase_interner: DedupInterner<Phrase>,
pub term_interner: Interner<QueryTerm>,
pub phrase_docids: PhraseDocIdsCache,
pub synonym_cache: SynonymCache,
pub restricted_fids: Option<RestrictedFids>,
pub prefix_search: PrefixSearch,
pub vector_store_stats: Option<VectorStoreStats>,
@@ -110,7 +103,6 @@ impl<'ctx> SearchContext<'ctx> {
phrase_interner: <_>::default(),
term_interner: <_>::default(),
phrase_docids: <_>::default(),
synonym_cache: <_>::default(),
restricted_fids: None,
prefix_search,
vector_store_stats: None,
@@ -121,17 +113,6 @@ impl<'ctx> SearchContext<'ctx> {
self.prefix_search != PrefixSearch::Disabled
}
/// Get synonyms with caching to avoid repeated database access
pub fn get_synonyms(&mut self) -> Result<&HashMap<Vec<String>, Vec<Vec<String>>>> {
match self.synonym_cache.cache {
Some(ref synonyms) => Ok(synonyms),
None => {
let synonyms = self.index.synonyms(self.txn)?;
Ok(self.synonym_cache.cache.insert(synonyms))
}
}
}
pub fn attributes_to_search_on(
&mut self,
attributes_to_search_on: &'ctx [String],

View File

@@ -214,7 +214,7 @@ pub fn partially_initialized_term_from_word(
if is_prefix && use_prefix_db.is_none() {
find_zero_typo_prefix_derivations(ctx, word_interned, &mut prefix_of)?;
}
let synonyms = ctx.get_synonyms()?;
let synonyms = ctx.index.synonyms(ctx.txn)?;
let mut synonym_word_count = 0;
let synonyms = synonyms
.get(&vec![word.to_owned()])

View File

@@ -258,7 +258,7 @@ pub fn make_ngram(
partially_initialized_term_from_word(ctx, &ngram_str, max_nbr_typos, is_prefix, true)?;
// Now add the synonyms
let index_synonyms = ctx.get_synonyms()?;
let index_synonyms = ctx.index.synonyms(ctx.txn)?;
term.zero_typo.synonyms.extend(
index_synonyms.get(&words).cloned().unwrap_or_default().into_iter().map(|words| {

View File

@@ -1,11 +1,9 @@
use std::task::Poll;
use roaring::RoaringBitmap;
use super::logger::SearchLogger;
use super::{QueryGraph, SearchContext};
use crate::score_details::ScoreDetails;
use crate::{Result, TimeBudget};
use crate::Result;
/// An internal trait implemented by only [`PlaceholderQuery`] and [`QueryGraph`]
pub trait RankingRuleQueryTrait: Sized + Clone + 'static {}
@@ -30,15 +28,12 @@ pub trait RankingRule<'ctx, Query: RankingRuleQueryTrait> {
/// buckets using [`next_bucket`](RankingRule::next_bucket).
///
/// The given universe is the universe that will be given to [`next_bucket`](RankingRule::next_bucket).
///
/// If this function may take a long time, it should check the `time_budget` and return early if exceeded.
fn start_iteration(
&mut self,
ctx: &mut SearchContext<'ctx>,
logger: &mut dyn SearchLogger<Query>,
universe: &RoaringBitmap,
query: &Query,
time_budget: &TimeBudget,
) -> Result<()>;
/// Return the next bucket of this ranking rule.
@@ -48,31 +43,13 @@ pub trait RankingRule<'ctx, Query: RankingRuleQueryTrait> {
/// The universe given as argument is either:
/// - a subset of the universe given to the previous call to [`next_bucket`](RankingRule::next_bucket); OR
/// - the universe given to [`start_iteration`](RankingRule::start_iteration)
///
/// If this function may take a long time, it should check the `time_budget` and return early if exceeded.
fn next_bucket(
&mut self,
ctx: &mut SearchContext<'ctx>,
logger: &mut dyn SearchLogger<Query>,
universe: &RoaringBitmap,
time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<Query>>>;
/// Return the next bucket of this ranking rule, if doing so can be done without blocking
///
/// Even if the time budget is exceeded, when getting the next bucket is a fast operation, this should return `true`
/// to allow Meilisearch to collect the results.
///
/// Default implementation conservatively returns that it would block.
fn non_blocking_next_bucket(
&mut self,
_ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Query>,
_universe: &RoaringBitmap,
) -> Result<Poll<RankingRuleOutput<Query>>> {
Ok(Poll::Pending)
}
/// Finish iterating over the buckets, which yields control to the parent ranking rule
/// The next call to this ranking rule, if any, will be [`start_iteration`](RankingRule::start_iteration).
fn end_iteration(

View File

@@ -7,7 +7,7 @@ use crate::heed_codec::facet::{FacetGroupKeyCodec, OrderedF64Codec};
use crate::heed_codec::{BytesRefCodec, StrRefCodec};
use crate::score_details::{self, ScoreDetails};
use crate::search::facet::{ascending_facet_sort, descending_facet_sort};
use crate::{FieldId, Index, Result, TimeBudget};
use crate::{FieldId, Index, Result};
pub trait RankingRuleOutputIter<'ctx, Query> {
fn next_bucket(&mut self) -> Result<Option<RankingRuleOutput<Query>>>;
@@ -96,7 +96,6 @@ impl<'ctx, Query: RankingRuleQueryTrait> RankingRule<'ctx, Query> for Sort<'ctx,
_logger: &mut dyn SearchLogger<Query>,
parent_candidates: &RoaringBitmap,
parent_query: &Query,
_time_budget: &TimeBudget,
) -> Result<()> {
let iter: RankingRuleOutputIterWrapper<'ctx, Query> = match self.field_id {
Some(field_id) => {
@@ -195,7 +194,6 @@ impl<'ctx, Query: RankingRuleQueryTrait> RankingRule<'ctx, Query> for Sort<'ctx,
_ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Query>,
universe: &RoaringBitmap,
_time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<Query>>> {
let iter = self.iter.as_mut().unwrap();
if let Some(mut bucket) = iter.next_bucket()? {

View File

@@ -3,17 +3,12 @@
//! 2. A test that ensure the filters are affectively applied even with a cutoff of 0
//! 3. A test that ensure the cutoff works well with the ranking scores
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use meili_snap::snapshot;
use crate::index::tests::TempIndex;
use crate::score_details::{ScoreDetails, ScoringStrategy};
use crate::update::Setting;
use crate::vector::settings::EmbeddingSettings;
use crate::vector::{Embedder, EmbedderOptions};
use crate::{Criterion, Filter, FilterableAttributesRule, Search, TimeBudget};
fn create_index() -> TempIndex {
@@ -366,8 +361,9 @@ fn degraded_search_and_score_details() {
]
"###);
// After FIVE loop iterations. The words ranking rule gave us a new bucket.
search.time_budget(TimeBudget::max().with_stop_after(5));
// After SIX loop iteration. The words ranking rule gave us a new bucket.
// Since we reached the limit we were able to early exit without checking the typo ranking rule.
search.time_budget(TimeBudget::max().with_stop_after(6));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
@@ -428,399 +424,4 @@ fn degraded_search_and_score_details() {
],
]
"###);
// After SIX loop iterations.
// we finished
search.time_budget(TimeBudget::max().with_stop_after(6));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [4, 1, 0, 3]
Scores: 1.0000 0.9167 0.8333 0.6667
Score Details:
[
[
Words(
Words {
matching_words: 3,
max_matching_words: 3,
},
),
Typo(
Typo {
typo_count: 0,
max_typo_count: 3,
},
),
],
[
Words(
Words {
matching_words: 3,
max_matching_words: 3,
},
),
Typo(
Typo {
typo_count: 1,
max_typo_count: 3,
},
),
],
[
Words(
Words {
matching_words: 3,
max_matching_words: 3,
},
),
Typo(
Typo {
typo_count: 2,
max_typo_count: 3,
},
),
],
[
Words(
Words {
matching_words: 2,
max_matching_words: 3,
},
),
Typo(
Typo {
typo_count: 0,
max_typo_count: 2,
},
),
],
]
"###);
}
#[test]
fn degraded_search_and_score_details_vector() {
let index = create_index();
index
.add_documents(documents!([
{
"id": 4,
"text": "hella puppo kefir",
"_vectors": {
"default": [0.1, 0.1]
}
},
{
"id": 3,
"text": "hella puppy kefir",
"_vectors": {
"default": [-0.1, 0.1]
}
},
{
"id": 2,
"text": "hello",
"_vectors": {
"default": [0.1, -0.1]
}
},
{
"id": 1,
"text": "hello puppy",
"_vectors": {
"default": [-0.1, -0.1]
}
},
{
"id": 0,
"text": "hello puppy kefir",
"_vectors": {
"default": null
}
},
]))
.unwrap();
index
.update_settings(|settings| {
let mut embedders = BTreeMap::new();
embedders.insert(
"default".into(),
Setting::Set(EmbeddingSettings {
source: Setting::Set(crate::vector::settings::EmbedderSource::UserProvided),
dimensions: Setting::Set(2),
..Default::default()
}),
);
settings.set_embedder_settings(embedders);
settings.set_vector_store(crate::vector::VectorStoreBackend::Hannoy);
})
.unwrap();
let rtxn = index.read_txn().unwrap();
let mut search = Search::new(&rtxn, &index);
let embedder = Arc::new(
Embedder::new(
EmbedderOptions::UserProvided(crate::vector::embedder::manual::EmbedderOptions {
dimensions: 2,
distribution: None,
}),
0,
)
.unwrap(),
);
search.semantic("default".into(), embedder, false, Some(vec![1., -1.]), None);
search.limit(4);
search.scoring_strategy(ScoringStrategy::Detailed);
search.time_budget(TimeBudget::max());
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [2, 0, 3, 1]
Scores: 1.0000 0.5000 0.5000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
1.0,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
]
"###);
// Do ONE loop iteration. Not much can be deduced, almost everyone matched the words first bucket.
search.time_budget(TimeBudget::max().with_stop_after(1));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [0, 1, 2, 3]
Scores: 0.5000 0.0000 0.0000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Skipped,
],
[
Skipped,
],
[
Skipped,
],
]
"###);
search.time_budget(TimeBudget::max().with_stop_after(2));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [0, 1, 2, 3]
Scores: 0.5000 0.0000 0.0000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
[
Skipped,
],
[
Skipped,
],
]
"###);
search.time_budget(TimeBudget::max().with_stop_after(3));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [2, 0, 1, 3]
Scores: 1.0000 0.5000 0.0000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
1.0,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
[
Skipped,
],
]
"###);
search.time_budget(TimeBudget::max().with_stop_after(4));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [2, 0, 3, 1]
Scores: 1.0000 0.5000 0.5000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
1.0,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
]
"###);
search.time_budget(TimeBudget::max().with_stop_after(5));
let result = search.execute().unwrap();
snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::<String>(), result.document_scores), @r###"
IDs: [2, 0, 3, 1]
Scores: 1.0000 0.5000 0.5000 0.0000
Score Details:
[
[
Vector(
Vector {
similarity: Some(
1.0,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.5,
),
},
),
],
[
Vector(
Vector {
similarity: Some(
0.0,
),
},
),
],
]
"###);
}

View File

@@ -1,5 +1,4 @@
use std::iter::FromIterator;
use std::task::Poll;
use std::time::Instant;
use roaring::RoaringBitmap;
@@ -8,7 +7,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait
use super::VectorStoreStats;
use crate::score_details::{self, ScoreDetails};
use crate::vector::{DistributionShift, Embedder, VectorStore};
use crate::{DocumentId, Result, SearchContext, SearchLogger, TimeBudget};
use crate::{DocumentId, Result, SearchContext, SearchLogger};
pub struct VectorSort<Q: RankingRuleQueryTrait> {
query: Option<Q>,
@@ -53,7 +52,6 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
&mut self,
ctx: &mut SearchContext<'_>,
vector_candidates: &RoaringBitmap,
time_budget: &TimeBudget,
) -> Result<()> {
let target = &self.target;
let backend = ctx.index.get_vector_store(ctx.txn)?.unwrap_or_default();
@@ -61,13 +59,7 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
let before = Instant::now();
let reader =
VectorStore::new(backend, ctx.index.vector_store, self.embedder_index, self.quantized);
let results = reader.nns_by_vector(
ctx.txn,
target,
self.limit,
Some(vector_candidates),
time_budget,
)?;
let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?;
self.cached_sorted_docids = results.into_iter();
*ctx.vector_store_stats.get_or_insert_default() += VectorStoreStats {
total_time: before.elapsed(),
@@ -77,20 +69,6 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
Ok(())
}
fn next_result(&mut self, vector_candidates: &RoaringBitmap) -> Option<(DocumentId, f32)> {
for (docid, distance) in self.cached_sorted_docids.by_ref() {
if vector_candidates.contains(docid) {
let score = 1.0 - distance;
let score = self
.distribution_shift
.map(|distribution| distribution.shift(score))
.unwrap_or(score);
return Some((docid, score));
}
}
None
}
}
impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
@@ -105,13 +83,12 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
query: &Q,
time_budget: &TimeBudget,
) -> Result<()> {
assert!(self.query.is_none());
self.query = Some(query.clone());
let vector_candidates = &self.vector_candidates & universe;
self.fill_buffer(ctx, &vector_candidates, time_budget)?;
self.fill_buffer(ctx, &vector_candidates)?;
Ok(())
}
@@ -122,7 +99,6 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
time_budget: &TimeBudget,
) -> Result<Option<RankingRuleOutput<Q>>> {
let query = self.query.as_ref().unwrap().clone();
let vector_candidates = &self.vector_candidates & universe;
@@ -135,17 +111,24 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
}));
}
if let Some((docid, score)) = self.next_result(&vector_candidates) {
return Ok(Some(RankingRuleOutput {
query,
candidates: RoaringBitmap::from_iter([docid]),
score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }),
}));
for (docid, distance) in self.cached_sorted_docids.by_ref() {
if vector_candidates.contains(docid) {
let score = 1.0 - distance;
let score = self
.distribution_shift
.map(|distribution| distribution.shift(score))
.unwrap_or(score);
return Ok(Some(RankingRuleOutput {
query,
candidates: RoaringBitmap::from_iter([docid]),
score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }),
}));
}
}
// if we got out of this loop it means we've exhausted our cache.
// we need to refill it and run the function again.
self.fill_buffer(ctx, &vector_candidates, time_budget)?;
self.fill_buffer(ctx, &vector_candidates)?;
// we tried filling the buffer, but it remained empty 😢
// it means we don't actually have any document remaining in the universe with a vector.
@@ -158,39 +141,11 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<Q> {
}));
}
self.next_bucket(ctx, _logger, universe, time_budget)
self.next_bucket(ctx, _logger, universe)
}
#[tracing::instrument(level = "trace", skip_all, target = "search::vector_sort")]
fn end_iteration(&mut self, _ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger<Q>) {
self.query = None;
}
fn non_blocking_next_bucket(
&mut self,
_ctx: &mut SearchContext<'ctx>,
_logger: &mut dyn SearchLogger<Q>,
universe: &RoaringBitmap,
) -> Result<Poll<RankingRuleOutput<Q>>> {
let query = self.query.as_ref().unwrap().clone();
let vector_candidates = &self.vector_candidates & universe;
if vector_candidates.is_empty() {
return Ok(Poll::Ready(RankingRuleOutput {
query,
candidates: universe.clone(),
score: ScoreDetails::Vector(score_details::Vector { similarity: None }),
}));
}
if let Some((docid, score)) = self.next_result(&vector_candidates) {
Ok(Poll::Ready(RankingRuleOutput {
query,
candidates: RoaringBitmap::from_iter([docid]),
score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }),
}))
} else {
Ok(Poll::Pending)
}
}
}

View File

@@ -1192,12 +1192,12 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
Metadata { docid, external_docid: "", extractor_id },
value,
unused_vectors_distribution,
);
)?;
}
}
// send last chunk
let on_embed = session.drain(unused_vectors_distribution);
let on_embed = session.drain(unused_vectors_distribution)?;
on_embed.finish()
}

View File

@@ -684,7 +684,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
metadata,
input,
unused_vectors_distribution,
);
)?;
}
ExtractorDiff::Unchanged => { /* nothing to do */ }
}
@@ -724,7 +724,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
if old_is_user_provided || full_reindex {
session.on_embed_mut().clear_vectors(docid);
}
session.request_embedding(metadata, input, unused_vectors_distribution);
session.request_embedding(metadata, input, unused_vectors_distribution)?;
}
ExtractorDiff::Unchanged => { /* do nothing */ }
}
@@ -764,8 +764,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
document_template,
doc_alloc,
new_fields_ids_map,
)
.ignore_errors();
);
update_autogenerated(
docid,
@@ -851,8 +850,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
document_template,
doc_alloc,
new_fields_ids_map,
)
.ignore_errors();
);
insert_autogenerated(
docid,
@@ -887,10 +885,10 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
match self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
session.drain(unused_vectors_distribution);
session.drain(unused_vectors_distribution)?;
}
ChunkType::Fragments { fragments: _, session } => {
session.drain(unused_vectors_distribution);
session.drain(unused_vectors_distribution)?;
}
}
Ok(())
@@ -1038,7 +1036,7 @@ where
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
if let Some(new_rendered) = new_rendered {
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)?
} else {
// remove any existing embedding
OnEmbed::process_embedding_response(
@@ -1074,7 +1072,7 @@ where
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() },
new_rendered,
unused_vectors_distribution,
);
)?;
}
}

View File

@@ -40,8 +40,6 @@ const UPGRADE_FUNCTIONS: &[&dyn UpgradeIndex] = &[
&ToTargetNoOp { target: (1, 20, 0) },
&ToTargetNoOp { target: (1, 21, 0) },
&ToTargetNoOp { target: (1, 22, 0) },
&ToTargetNoOp { target: (1, 23, 0) },
&ToTargetNoOp { target: (1, 24, 0) },
// This is the last upgrade function, it will be called when the index is up to date.
// any other upgrade function should be added before this one.
&ToCurrentNoOp {},
@@ -75,8 +73,6 @@ const fn start(from: (u32, u32, u32)) -> Option<usize> {
(1, 20, _) => function_index!(10),
(1, 21, _) => function_index!(11),
(1, 22, _) => function_index!(12),
(1, 23, _) => function_index!(13),
(1, 24, _) => function_index!(14),
// We deliberately don't add a placeholder with (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) here to force manually
// considering dumpless upgrade.
(_major, _minor, _patch) => return None,

View File

@@ -5,7 +5,7 @@ use serde_json::Value;
use super::error::EmbedError;
use super::{Embedder, Embedding};
use crate::progress::EmbedderStats;
use crate::{DocumentId, ThreadPoolNoAbort};
use crate::{DocumentId, Result, ThreadPoolNoAbort};
type ExtractorId = u8;
#[derive(Clone, Copy)]
@@ -108,28 +108,32 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
metadata: Metadata<'doc>,
rendered: I,
unused_vectors_distribution: &C::ErrorMetadata,
) {
) -> Result<()> {
if self.inputs.len() < self.inputs.capacity() {
self.inputs.push(rendered);
self.metadata.push(metadata);
return;
return Ok(());
}
self.embed_chunks(unused_vectors_distribution)
}
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> C {
self.embed_chunks(unused_vectors_distribution);
self.on_embed
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<C> {
self.embed_chunks(unused_vectors_distribution)?;
Ok(self.on_embed)
}
#[allow(clippy::too_many_arguments)]
fn embed_chunks(&mut self, _unused_vectors_distribution: &C::ErrorMetadata) {
fn embed_chunks(&mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<()> {
if self.inputs.is_empty() {
return;
return Ok(());
}
match I::embed_ref(self.inputs.as_slice(), self.embedder, self.threads, self.embedder_stats)
{
let res = match I::embed_ref(
self.inputs.as_slice(),
self.embedder,
self.threads,
self.embedder_stats,
) {
Ok(embeddings) => {
for (metadata, embedding) in self.metadata.iter().copied().zip(embeddings) {
self.on_embed.process_embedding_response(EmbeddingResponse {
@@ -137,37 +141,27 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
embedding: Some(embedding),
});
}
Ok(())
}
Err(error) => {
tracing::warn!(
%error,
"error embedding batch of documents, retrying one by one"
// reset metadata and inputs, and send metadata to the error processing.
let doc_alloc = self.metadata.bump();
let metadata = std::mem::replace(
&mut self.metadata,
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
);
// retry with one call per input
for (metadata, input) in self.metadata.iter().copied().zip(self.inputs.chunks(1)) {
match I::embed_ref(input, self.embedder, self.threads, self.embedder_stats) {
Ok(mut embeddings) => {
let Some(embedding) = embeddings.pop() else {
continue;
};
self.on_embed.process_embedding_response(EmbeddingResponse {
metadata,
embedding: Some(embedding),
})
}
Err(err) => {
tracing::warn!(
docid = metadata.external_docid,
%err,
"error embedding document"
);
}
}
}
self.inputs.clear();
return Err(self.on_embed.process_embedding_error(
error,
self.embedder_name,
unused_vectors_distribution,
metadata,
));
}
};
self.inputs.clear();
self.metadata.clear();
res
}
pub(crate) fn embedder_name(&self) -> &'doc str {

View File

@@ -8,7 +8,6 @@ use serde::{Deserialize, Serialize};
use crate::progress::Progress;
use crate::vector::Embeddings;
use crate::TimeBudget;
const HANNOY_EF_CONSTRUCTION: usize = 125;
const HANNOY_M: usize = 16;
@@ -592,7 +591,6 @@ impl VectorStore {
vector: &[f32],
limit: usize,
filter: Option<&RoaringBitmap>,
time_budget: &TimeBudget,
) -> crate::Result<Vec<(ItemId, f32)>> {
if self.backend == VectorStoreBackend::Arroy {
if self.quantized {
@@ -603,25 +601,11 @@ impl VectorStore {
.map_err(Into::into)
}
} else if self.quantized {
self._hannoy_nns_by_vector(
rtxn,
self._hannoy_quantized_db(),
vector,
limit,
filter,
time_budget,
)
.map_err(Into::into)
self._hannoy_nns_by_vector(rtxn, self._hannoy_quantized_db(), vector, limit, filter)
.map_err(Into::into)
} else {
self._hannoy_nns_by_vector(
rtxn,
self._hannoy_angular_db(),
vector,
limit,
filter,
time_budget,
)
.map_err(Into::into)
self._hannoy_nns_by_vector(rtxn, self._hannoy_angular_db(), vector, limit, filter)
.map_err(Into::into)
}
}
pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result<Vec<Vec<f32>>> {
@@ -1016,7 +1000,6 @@ impl VectorStore {
vector: &[f32],
limit: usize,
filter: Option<&RoaringBitmap>,
time_budget: &TimeBudget,
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> {
let mut results = Vec::new();
@@ -1028,10 +1011,7 @@ impl VectorStore {
searcher.candidates(filter);
}
let (res, _degraded) =
&mut searcher
.by_vector_with_cancellation(rtxn, vector, || time_budget.exceeded())?;
results.append(res);
results.append(&mut searcher.by_vector(rtxn, vector)?);
}
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));

View File

@@ -20,16 +20,14 @@ These make us iterate fast before stabilizing it for the current release.
### Release steps
The prototype name must follow this convention: `prototype-v<version>.<name>-<number>` where
- `version` is the version of Meilisearch on which the prototype is based.
- `name` is the feature name formatted in `kebab-case`. It should not end with a single number.
The prototype name must follow this convention: `prototype-X-Y` where
- `X` is the feature name formatted in `kebab-case`. It should not end with a single number.
- `Y` is the version of the prototype, starting from `0`.
✅ Example: `prototype-v1.23.0.search-personalization-0`. </br>
❌ Bad example: `prototype-search-personalization-0`: version is missing.</br>
❌ Bad example: `v1.23.0.auto-resize-0`: lacks the `prototype` prefix. </br>
❌ Bad example: `prototype-v1.23.0.auto-resize`: lacks the version suffix. </br>
❌ Bad example: `prototype-v1.23.0.auto-resize-0-0`: feature name ends with a single number.
✅ Example: `prototype-auto-resize-0`. </br>
❌ Bad example: `auto-resize-0`: lacks the `prototype` prefix. </br>
❌ Bad example: `prototype-auto-resize`: lacks the version suffix. </br>
❌ Bad example: `prototype-auto-resize-0-0`: feature name ends with a single number.
Steps to create a prototype:

View File

@@ -58,12 +58,6 @@ Why? GitHub Merge Queue does not work with branch patterns yet, so we have to ad
5. Go to the GitHub interface, in the [`Release` section](https://github.com/meilisearch/meilisearch/releases) and click on `Draft a new release`
⚠️⚠️⚠️ Publish on `release-vX.Y.Z+1` branch, not on `main`!
📝 <ins>About the changelogs</s>
- Use the "Generate release notes" button in the GitHub interface to get the exhaustive list of PRs.
- Separate the PRs into different categories: Enhancement/Features, Bug fixes, Maintenance.
- Ensure each line makes sense for external people reading the changelogs. Add more details of usage if needed.
- Thank the external contributors at the end of the changelogs.
⚠️ <ins>If doing a patch release that should NOT be the `latest` release</s>:
- Do NOT check `Set as the latest release` when creating the GitHub release. If you did, quickly interrupt all CIs and delete the GitHub release!