mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-24 21:37:03 +00:00
Compare commits
4 Commits
set-search
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3b95007292 | ||
|
|
4a2b8be428 | ||
|
|
bea11a1353 | ||
|
|
5a7948bfab |
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -444,12 +444,12 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||
|
||||
[[package]]
|
||||
name = "arroy"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "08e6111f351d004bd13e95ab540721272136fd3218b39d3ec95a2ea1c4e6a0a6"
|
||||
version = "0.7.0"
|
||||
source = "git+https://github.com/meilisearch/arroy.git?rev=1a21e5c0cbe3e106b05d834076a5b727cf46bca1#1a21e5c0cbe3e106b05d834076a5b727cf46bca1"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
"crossbeam",
|
||||
"enum-iterator",
|
||||
"heed",
|
||||
"memmap2",
|
||||
@@ -461,6 +461,7 @@ dependencies = [
|
||||
"roaring",
|
||||
"tempfile",
|
||||
"thiserror 2.0.12",
|
||||
"thread_local",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
@@ -1367,6 +1368,19 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"crossbeam-deque",
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-queue",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.15"
|
||||
|
||||
@@ -238,7 +238,7 @@ impl IndexScheduler {
|
||||
#[cfg(test)]
|
||||
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded);
|
||||
|
||||
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32);
|
||||
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u64);
|
||||
progress.update_progress(task_progress_obj);
|
||||
process_batch_info = info;
|
||||
let mut success = 0;
|
||||
@@ -317,7 +317,7 @@ impl IndexScheduler {
|
||||
Err(err) => {
|
||||
#[cfg(test)]
|
||||
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchFailed);
|
||||
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32);
|
||||
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u64);
|
||||
progress.update_progress(task_progress_obj);
|
||||
|
||||
if matches!(err, Error::DatabaseUpgrade(_)) {
|
||||
|
||||
@@ -353,8 +353,8 @@ impl IndexScheduler {
|
||||
for (step, swap) in swaps.iter().enumerate() {
|
||||
progress.update_progress(VariableNameStep::<SwappingTheIndexes>::new(
|
||||
format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1),
|
||||
step as u32,
|
||||
swaps.len() as u32,
|
||||
step as u64,
|
||||
swaps.len() as u64,
|
||||
));
|
||||
self.apply_index_swap(
|
||||
&mut wtxn,
|
||||
@@ -472,7 +472,7 @@ impl IndexScheduler {
|
||||
// 3. before_name -> new_name in the task's KindWithContent
|
||||
progress.update_progress(InnerSwappingTwoIndexes::UpdateTheTasks);
|
||||
let tasks_to_update = &index_lhs_task_ids | &index_rhs_task_ids;
|
||||
let (atomic, task_progress) = AtomicTaskStep::new(tasks_to_update.len() as u32);
|
||||
let (atomic, task_progress) = AtomicTaskStep::new(tasks_to_update.len() as u64);
|
||||
progress.update_progress(task_progress);
|
||||
|
||||
for task_id in tasks_to_update {
|
||||
@@ -529,7 +529,7 @@ impl IndexScheduler {
|
||||
// The tasks that have been removed *per batches*.
|
||||
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
|
||||
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u64);
|
||||
progress.update_progress(task_progress);
|
||||
for task_id in to_delete_tasks.iter() {
|
||||
let task =
|
||||
@@ -575,7 +575,7 @@ impl IndexScheduler {
|
||||
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
||||
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
||||
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u64,
|
||||
);
|
||||
progress.update_progress(task_progress);
|
||||
for index in affected_indexes.iter() {
|
||||
@@ -594,7 +594,7 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u64);
|
||||
progress.update_progress(task_progress);
|
||||
for task in to_delete_tasks.iter() {
|
||||
self.queue.tasks.all_tasks.delete(wtxn, &task)?;
|
||||
@@ -611,7 +611,7 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
||||
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
|
||||
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u64);
|
||||
progress.update_progress(batch_progress);
|
||||
for (batch_id, to_delete_tasks) in affected_batches {
|
||||
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
|
||||
@@ -786,7 +786,7 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
// 3. We now have a list of tasks to cancel, cancel them
|
||||
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
|
||||
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u64);
|
||||
progress.update_progress(progress_obj);
|
||||
|
||||
let mut tasks = self.queue.tasks.get_existing_tasks(
|
||||
@@ -797,7 +797,7 @@ impl IndexScheduler {
|
||||
)?;
|
||||
|
||||
progress.update_progress(TaskCancelationProgress::UpdatingTasks);
|
||||
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
|
||||
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u64);
|
||||
progress.update_progress(progress_obj);
|
||||
for task in tasks.iter_mut() {
|
||||
task.status = Status::Canceled;
|
||||
|
||||
@@ -57,7 +57,7 @@ impl IndexScheduler {
|
||||
let mut dump_tasks = dump.create_tasks_queue()?;
|
||||
|
||||
let (atomic, update_task_progress) =
|
||||
AtomicTaskStep::new(self.queue.tasks.all_tasks.len(&rtxn)? as u32);
|
||||
AtomicTaskStep::new(self.queue.tasks.all_tasks.len(&rtxn)? as u64);
|
||||
progress.update_progress(update_task_progress);
|
||||
|
||||
for ret in self.queue.tasks.all_tasks.iter(&rtxn)? {
|
||||
@@ -119,7 +119,7 @@ impl IndexScheduler {
|
||||
let mut dump_batches = dump.create_batches_queue()?;
|
||||
|
||||
let (atomic_batch_progress, update_batch_progress) =
|
||||
AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u32);
|
||||
AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u64);
|
||||
progress.update_progress(update_batch_progress);
|
||||
|
||||
for ret in self.queue.batches.all_batches.iter(&rtxn)? {
|
||||
@@ -149,7 +149,7 @@ impl IndexScheduler {
|
||||
|
||||
// 5. Dump the indexes
|
||||
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
|
||||
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;
|
||||
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u64;
|
||||
let mut count = 0;
|
||||
let () = self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
|
||||
progress.update_progress(VariableNameStep::<DumpCreationProgress>::new(
|
||||
@@ -178,7 +178,7 @@ impl IndexScheduler {
|
||||
let nb_documents = index
|
||||
.number_of_documents(&rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?
|
||||
as u32;
|
||||
as u64;
|
||||
let (atomic, update_document_progress) = AtomicDocumentStep::new(nb_documents);
|
||||
progress.update_progress(update_document_progress);
|
||||
let documents = index
|
||||
|
||||
@@ -55,8 +55,8 @@ impl IndexScheduler {
|
||||
|
||||
progress.update_progress(VariableNameStep::<ExportIndex>::new(
|
||||
format!("Exporting index `{uid}`"),
|
||||
i as u32,
|
||||
indexes.len() as u32,
|
||||
i as u64,
|
||||
indexes.len() as u64,
|
||||
));
|
||||
|
||||
let ExportIndexSettings { filter, override_settings } = export_settings;
|
||||
@@ -155,7 +155,7 @@ impl IndexScheduler {
|
||||
// spawn many threads to process the documents
|
||||
drop(index_rtxn);
|
||||
|
||||
let total_documents = universe.len() as u32;
|
||||
let total_documents = universe.len();
|
||||
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
|
||||
progress.update_progress(progress_step);
|
||||
|
||||
@@ -163,7 +163,7 @@ impl IndexScheduler {
|
||||
IndexUidPattern::new_unchecked(uid.clone()),
|
||||
DetailsExportIndexSettings {
|
||||
settings: (*export_settings).clone(),
|
||||
matched_documents: Some(total_documents as u64),
|
||||
matched_documents: Some(total_documents),
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -58,7 +58,7 @@ impl IndexScheduler {
|
||||
// 2.4 Only copy the update files of the enqueued tasks
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
||||
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
||||
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u64);
|
||||
progress.update_progress(update_file_progress);
|
||||
for task_id in enqueued {
|
||||
let task =
|
||||
@@ -74,12 +74,12 @@ impl IndexScheduler {
|
||||
// 3. Snapshot every indexes
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
||||
let index_mapping = self.index_mapper.index_mapping;
|
||||
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
||||
let nb_indexes = index_mapping.len(&rtxn)? as u64;
|
||||
|
||||
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
||||
let (name, uuid) = result?;
|
||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||
name, i as u32, nb_indexes,
|
||||
name, i as u64, nb_indexes,
|
||||
));
|
||||
let index = self.index_mapper.index(&rtxn, name)?;
|
||||
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
|
||||
|
||||
@@ -22,8 +22,8 @@ impl IndexScheduler {
|
||||
}
|
||||
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
|
||||
format!("Upgrading index `{uid}`"),
|
||||
i as u32,
|
||||
indexes.len() as u32,
|
||||
i as u64,
|
||||
indexes.len() as u64,
|
||||
));
|
||||
let index = self.index(uid)?;
|
||||
let mut index_wtxn = index.write_txn()?;
|
||||
@@ -65,8 +65,8 @@ impl IndexScheduler {
|
||||
for (i, uid) in indexes.iter().enumerate() {
|
||||
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
|
||||
format!("Rollbacking index `{uid}`"),
|
||||
i as u32,
|
||||
indexes.len() as u32,
|
||||
i as u64,
|
||||
indexes.len() as u64,
|
||||
));
|
||||
let index_schd_rtxn = self.env.read_txn()?;
|
||||
|
||||
|
||||
@@ -1050,8 +1050,7 @@ pub fn prepare_search<'t>(
|
||||
.map(|x| x as usize)
|
||||
.unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS);
|
||||
|
||||
search.is_exhaustive_pagination(is_finite_pagination);
|
||||
search.max_total_hits(Some(max_total_hits));
|
||||
search.exhaustive_number_hits(is_finite_pagination);
|
||||
search.scoring_strategy(
|
||||
if query.show_ranking_score
|
||||
|| query.show_ranking_score_details
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use super::shared_index_with_documents;
|
||||
use crate::common::Server;
|
||||
use crate::json;
|
||||
use meili_snap::{json_string, snapshot};
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn default_search_should_return_estimated_total_hit() {
|
||||
@@ -134,61 +133,3 @@ async fn ensure_placeholder_search_hit_count_valid() {
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_issue_5274() {
|
||||
let server = Server::new_shared();
|
||||
let index = server.unique_index();
|
||||
|
||||
let documents = json!([
|
||||
{
|
||||
"id": 1,
|
||||
"title": "Document 1",
|
||||
"content": "This is the first."
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"title": "Document 2",
|
||||
"content": "This is the second doc."
|
||||
}
|
||||
]);
|
||||
let (task, _code) = index.add_documents(documents, None).await;
|
||||
server.wait_task(task.uid()).await.succeeded();
|
||||
|
||||
// Find out the lowest ranking score among the documents
|
||||
let (rep, _status) = index
|
||||
.search_post(json!({"q": "doc", "page": 1, "hitsPerPage": 2, "showRankingScore": true}))
|
||||
.await;
|
||||
let hits = rep["hits"].as_array().expect("Missing hits array");
|
||||
let second_hit = hits.get(1).expect("Missing second hit");
|
||||
let ranking_score = second_hit
|
||||
.get("_rankingScore")
|
||||
.expect("Missing _rankingScore field")
|
||||
.as_f64()
|
||||
.expect("Expected _rankingScore to be a f64");
|
||||
|
||||
// Search with a ranking score threshold just above and expect to be a single hit
|
||||
let (rep, _status) = index
|
||||
.search_post(json!({"q": "doc", "page": 1, "hitsPerPage": 1, "rankingScoreThreshold": ranking_score + 0.0001}))
|
||||
.await;
|
||||
|
||||
snapshot!(json_string!(rep, {
|
||||
".processingTimeMs" => "[ignored]",
|
||||
}), @r#"
|
||||
{
|
||||
"hits": [
|
||||
{
|
||||
"id": 2,
|
||||
"title": "Document 2",
|
||||
"content": "This is the second doc."
|
||||
}
|
||||
],
|
||||
"query": "doc",
|
||||
"processingTimeMs": "[ignored]",
|
||||
"hitsPerPage": 1,
|
||||
"page": 1,
|
||||
"totalPages": 1,
|
||||
"totalHits": 1
|
||||
}
|
||||
"#);
|
||||
}
|
||||
|
||||
@@ -162,8 +162,8 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
||||
let (uid, uuid) = result?;
|
||||
progress.update_progress(VariableNameStep::new(
|
||||
&uid,
|
||||
index_index as u32,
|
||||
index_count as u32,
|
||||
index_index as u64,
|
||||
index_count as u64,
|
||||
));
|
||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||
|
||||
@@ -220,12 +220,12 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
||||
|
||||
pub struct VariableNameStep {
|
||||
name: String,
|
||||
current: u32,
|
||||
total: u32,
|
||||
current: u64,
|
||||
total: u64,
|
||||
}
|
||||
|
||||
impl VariableNameStep {
|
||||
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
|
||||
pub fn new(name: impl Into<String>, current: u64, total: u64) -> Self {
|
||||
Self { name: name.into(), current, total }
|
||||
}
|
||||
}
|
||||
@@ -235,11 +235,11 @@ impl Step for VariableNameStep {
|
||||
self.name.clone().into()
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
fn current(&self) -> u64 {
|
||||
self.current
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
fn total(&self) -> u64 {
|
||||
self.total
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,7 +87,8 @@ rhai = { version = "1.22.2", features = [
|
||||
"no_time",
|
||||
"sync",
|
||||
] }
|
||||
arroy = "0.6.1"
|
||||
# arroy = "0.6.1"
|
||||
arroy = { git = "https://github.com/meilisearch/arroy.git", rev = "1a21e5c0cbe3e106b05d834076a5b727cf46bca1" } # incremental update
|
||||
rand = "0.8.5"
|
||||
tracing = "0.1.41"
|
||||
ureq = { version = "2.12.1", features = ["json"] }
|
||||
|
||||
@@ -420,7 +420,9 @@ impl From<arroy::Error> for Error {
|
||||
| arroy::Error::NeedBuild(_)
|
||||
| arroy::Error::MissingKey { .. }
|
||||
| arroy::Error::MissingMetadata(_)
|
||||
| arroy::Error::CannotDecodeKeyMode { .. } => {
|
||||
| arroy::Error::CannotDecodeKeyMode { .. }
|
||||
| arroy::Error::UnknownVersion { .. }
|
||||
| arroy::Error::Panic(_) => {
|
||||
Error::InternalError(InternalError::ArroyError(value))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::any::TypeId;
|
||||
use std::borrow::Cow;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -13,8 +13,8 @@ use utoipa::ToSchema;
|
||||
|
||||
pub trait Step: 'static + Send + Sync {
|
||||
fn name(&self) -> Cow<'static, str>;
|
||||
fn current(&self) -> u32;
|
||||
fn total(&self) -> u32;
|
||||
fn current(&self) -> u64;
|
||||
fn total(&self) -> u64;
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
@@ -132,13 +132,13 @@ pub trait NamedStep: 'static + Send + Sync + Default {
|
||||
/// - The total number of steps doesn't change
|
||||
pub struct AtomicSubStep<Name: NamedStep> {
|
||||
unit_name: Name,
|
||||
current: Arc<AtomicU32>,
|
||||
total: u32,
|
||||
current: Arc<AtomicU64>,
|
||||
total: u64,
|
||||
}
|
||||
|
||||
impl<Name: NamedStep> AtomicSubStep<Name> {
|
||||
pub fn new(total: u32) -> (Arc<AtomicU32>, Self) {
|
||||
let current = Arc::new(AtomicU32::new(0));
|
||||
pub fn new(total: u64) -> (Arc<AtomicU64>, Self) {
|
||||
let current = Arc::new(AtomicU64::new(0));
|
||||
(current.clone(), Self { current, total, unit_name: Name::default() })
|
||||
}
|
||||
}
|
||||
@@ -148,11 +148,11 @@ impl<Name: NamedStep> Step for AtomicSubStep<Name> {
|
||||
self.unit_name.name().into()
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
fn current(&self) -> u64 {
|
||||
self.current.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
fn total(&self) -> u64 {
|
||||
self.total
|
||||
}
|
||||
}
|
||||
@@ -183,13 +183,13 @@ macro_rules! make_enum_progress {
|
||||
}
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
*self as u32
|
||||
fn current(&self) -> u64 {
|
||||
*self as u64
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
fn total(&self) -> u64 {
|
||||
use $crate::progress::_private_enum_iterator::Sequence;
|
||||
Self::CARDINALITY as u32
|
||||
Self::CARDINALITY as u64
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -235,8 +235,8 @@ pub struct ProgressView {
|
||||
#[schema(rename_all = "camelCase")]
|
||||
pub struct ProgressStepView {
|
||||
pub current_step: Cow<'static, str>,
|
||||
pub finished: u32,
|
||||
pub total: u32,
|
||||
pub finished: u64,
|
||||
pub total: u64,
|
||||
}
|
||||
|
||||
/// Used when the name can change but it's still the same step.
|
||||
@@ -252,13 +252,13 @@ pub struct ProgressStepView {
|
||||
/// ```
|
||||
pub struct VariableNameStep<U: Send + Sync + 'static> {
|
||||
name: String,
|
||||
current: u32,
|
||||
total: u32,
|
||||
current: u64,
|
||||
total: u64,
|
||||
phantom: PhantomData<U>,
|
||||
}
|
||||
|
||||
impl<U: Send + Sync + 'static> VariableNameStep<U> {
|
||||
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
|
||||
pub fn new(name: impl Into<String>, current: u64, total: u64) -> Self {
|
||||
Self { name: name.into(), current, total, phantom: PhantomData }
|
||||
}
|
||||
}
|
||||
@@ -268,11 +268,11 @@ impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
|
||||
self.name.clone().into()
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
fn current(&self) -> u64 {
|
||||
self.current
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
fn total(&self) -> u64 {
|
||||
self.total
|
||||
}
|
||||
}
|
||||
@@ -285,22 +285,26 @@ impl Step for arroy::MainStep {
|
||||
"writing the descendants and metadata"
|
||||
}
|
||||
arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
|
||||
arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes",
|
||||
arroy::MainStep::UpdatingTheTrees => "updating the trees",
|
||||
arroy::MainStep::CreateNewTrees => "create new trees",
|
||||
arroy::MainStep::WritingNodesToDatabase => "writing nodes to database",
|
||||
arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees",
|
||||
arroy::MainStep::WriteTheMetadata => "write the metadata",
|
||||
arroy::MainStep::RetrievingTheItemsIds => "retrieving the items ids",
|
||||
arroy::MainStep::RetrievingTheUsedTreeNodes => "retrieving the used tree nodes",
|
||||
arroy::MainStep::DeletingExtraTrees => "deleting extra trees",
|
||||
arroy::MainStep::RemoveItemsFromExistingTrees => "remove items from existing trees",
|
||||
arroy::MainStep::InsertItemsInCurrentTrees => "insert items in current trees",
|
||||
arroy::MainStep::RetrievingTheItems => "retrieving the items",
|
||||
arroy::MainStep::RetrievingTheTreeNodes => "retrieving the tree nodes",
|
||||
arroy::MainStep::RetrieveTheLargeDescendants => "retrieve the large descendants",
|
||||
arroy::MainStep::CreateTreesForItems => "create trees for items",
|
||||
}
|
||||
.into()
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
*self as u32
|
||||
fn current(&self) -> u64 {
|
||||
*self as u64
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
Self::CARDINALITY as u32
|
||||
fn total(&self) -> u64 {
|
||||
Self::CARDINALITY as u64
|
||||
}
|
||||
}
|
||||
|
||||
@@ -309,11 +313,11 @@ impl Step for arroy::SubStep {
|
||||
self.unit.into()
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
fn current(&self) -> u64 {
|
||||
self.current.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
fn total(&self) -> u64 {
|
||||
self.max
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,8 +209,7 @@ impl Search<'_> {
|
||||
terms_matching_strategy: self.terms_matching_strategy,
|
||||
scoring_strategy: ScoringStrategy::Detailed,
|
||||
words_limit: self.words_limit,
|
||||
is_exhaustive_pagination: self.is_exhaustive_pagination,
|
||||
max_total_hits: self.max_total_hits,
|
||||
exhaustive_number_hits: self.exhaustive_number_hits,
|
||||
rtxn: self.rtxn,
|
||||
index: self.index,
|
||||
semantic: self.semantic.clone(),
|
||||
|
||||
@@ -51,8 +51,7 @@ pub struct Search<'a> {
|
||||
terms_matching_strategy: TermsMatchingStrategy,
|
||||
scoring_strategy: ScoringStrategy,
|
||||
words_limit: usize,
|
||||
is_exhaustive_pagination: bool,
|
||||
max_total_hits: Option<usize>,
|
||||
exhaustive_number_hits: bool,
|
||||
rtxn: &'a heed::RoTxn<'a>,
|
||||
index: &'a Index,
|
||||
semantic: Option<SemanticSearch>,
|
||||
@@ -74,8 +73,7 @@ impl<'a> Search<'a> {
|
||||
geo_param: new::GeoSortParameter::default(),
|
||||
terms_matching_strategy: TermsMatchingStrategy::default(),
|
||||
scoring_strategy: Default::default(),
|
||||
is_exhaustive_pagination: false,
|
||||
max_total_hits: None,
|
||||
exhaustive_number_hits: false,
|
||||
words_limit: 10,
|
||||
rtxn,
|
||||
index,
|
||||
@@ -162,13 +160,8 @@ impl<'a> Search<'a> {
|
||||
|
||||
/// Forces the search to exhaustively compute the number of candidates,
|
||||
/// this will increase the search time but allows finite pagination.
|
||||
pub fn is_exhaustive_pagination(&mut self, is_exhaustive_pagination: bool) -> &mut Search<'a> {
|
||||
self.is_exhaustive_pagination = is_exhaustive_pagination;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn max_total_hits(&mut self, max_total_hits: Option<usize>) -> &mut Search<'a> {
|
||||
self.max_total_hits = max_total_hits;
|
||||
pub fn exhaustive_number_hits(&mut self, exhaustive_number_hits: bool) -> &mut Search<'a> {
|
||||
self.exhaustive_number_hits = exhaustive_number_hits;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -231,13 +224,6 @@ impl<'a> Search<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
let mut search_k_div_trees = None;
|
||||
if self.is_exhaustive_pagination {
|
||||
if let Some(max_total_hits) = self.max_total_hits {
|
||||
search_k_div_trees = Some(max_total_hits);
|
||||
}
|
||||
}
|
||||
|
||||
let universe = filtered_universe(ctx.index, ctx.txn, &self.filter)?;
|
||||
let PartialSearchResult {
|
||||
located_query_terms,
|
||||
@@ -257,8 +243,6 @@ impl<'a> Search<'a> {
|
||||
&mut ctx,
|
||||
vector,
|
||||
self.scoring_strategy,
|
||||
self.is_exhaustive_pagination,
|
||||
self.max_total_hits,
|
||||
universe,
|
||||
&self.sort_criteria,
|
||||
&self.distinct,
|
||||
@@ -268,7 +252,6 @@ impl<'a> Search<'a> {
|
||||
embedder_name,
|
||||
embedder,
|
||||
*quantized,
|
||||
search_k_div_trees,
|
||||
self.time_budget.clone(),
|
||||
self.ranking_score_threshold,
|
||||
)?,
|
||||
@@ -277,8 +260,7 @@ impl<'a> Search<'a> {
|
||||
self.query.as_deref(),
|
||||
self.terms_matching_strategy,
|
||||
self.scoring_strategy,
|
||||
self.is_exhaustive_pagination,
|
||||
self.max_total_hits,
|
||||
self.exhaustive_number_hits,
|
||||
universe,
|
||||
&self.sort_criteria,
|
||||
&self.distinct,
|
||||
@@ -331,8 +313,7 @@ impl fmt::Debug for Search<'_> {
|
||||
terms_matching_strategy,
|
||||
scoring_strategy,
|
||||
words_limit,
|
||||
is_exhaustive_pagination,
|
||||
max_total_hits,
|
||||
exhaustive_number_hits,
|
||||
rtxn: _,
|
||||
index: _,
|
||||
semantic,
|
||||
@@ -351,8 +332,7 @@ impl fmt::Debug for Search<'_> {
|
||||
.field("searchable_attributes", searchable_attributes)
|
||||
.field("terms_matching_strategy", terms_matching_strategy)
|
||||
.field("scoring_strategy", scoring_strategy)
|
||||
.field("is_exhaustive_pagination", is_exhaustive_pagination)
|
||||
.field("max_total_hits", max_total_hits)
|
||||
.field("exhaustive_number_hits", exhaustive_number_hits)
|
||||
.field("words_limit", words_limit)
|
||||
.field(
|
||||
"semantic.embedder_name",
|
||||
|
||||
@@ -32,8 +32,6 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
|
||||
logger: &mut dyn SearchLogger<Q>,
|
||||
time_budget: TimeBudget,
|
||||
ranking_score_threshold: Option<f64>,
|
||||
exhaustive_number_hits: bool,
|
||||
max_total_hits: Option<usize>,
|
||||
) -> Result<BucketSortOutput> {
|
||||
logger.initial_query(query);
|
||||
logger.ranking_rules(&ranking_rules);
|
||||
@@ -161,12 +159,7 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
|
||||
};
|
||||
}
|
||||
|
||||
let max_total_hits = max_total_hits.unwrap_or(usize::MAX);
|
||||
while valid_docids.len() < length
|
||||
|| (exhaustive_number_hits
|
||||
&& ranking_score_threshold.is_some()
|
||||
&& valid_docids.len() < max_total_hits)
|
||||
{
|
||||
while valid_docids.len() < length {
|
||||
if time_budget.exceeded() {
|
||||
loop {
|
||||
let bucket = std::mem::take(&mut ranking_rule_universes[cur_ranking_rule_index]);
|
||||
|
||||
@@ -510,7 +510,6 @@ mod tests {
|
||||
crate::TermsMatchingStrategy::default(),
|
||||
crate::score_details::ScoringStrategy::Skip,
|
||||
false,
|
||||
None,
|
||||
universe,
|
||||
&None,
|
||||
&None,
|
||||
|
||||
@@ -377,7 +377,6 @@ fn get_ranking_rules_for_vector<'ctx>(
|
||||
embedder_name: &str,
|
||||
embedder: &Embedder,
|
||||
quantized: bool,
|
||||
search_k_div_trees: Option<usize>,
|
||||
) -> Result<Vec<BoxRankingRule<'ctx, PlaceholderQuery>>> {
|
||||
// query graph search
|
||||
|
||||
@@ -406,7 +405,6 @@ fn get_ranking_rules_for_vector<'ctx>(
|
||||
embedder_name,
|
||||
embedder,
|
||||
quantized,
|
||||
search_k_div_trees,
|
||||
)?;
|
||||
ranking_rules.push(Box::new(vector_sort));
|
||||
vector = true;
|
||||
@@ -628,8 +626,6 @@ pub fn execute_vector_search(
|
||||
ctx: &mut SearchContext<'_>,
|
||||
vector: &[f32],
|
||||
scoring_strategy: ScoringStrategy,
|
||||
exhaustive_number_hits: bool,
|
||||
max_total_hits: Option<usize>,
|
||||
universe: RoaringBitmap,
|
||||
sort_criteria: &Option<Vec<AscDesc>>,
|
||||
distinct: &Option<String>,
|
||||
@@ -639,7 +635,6 @@ pub fn execute_vector_search(
|
||||
embedder_name: &str,
|
||||
embedder: &Embedder,
|
||||
quantized: bool,
|
||||
search_k_div_trees: Option<usize>,
|
||||
time_budget: TimeBudget,
|
||||
ranking_score_threshold: Option<f64>,
|
||||
) -> Result<PartialSearchResult> {
|
||||
@@ -656,7 +651,6 @@ pub fn execute_vector_search(
|
||||
embedder_name,
|
||||
embedder,
|
||||
quantized,
|
||||
search_k_div_trees,
|
||||
)?;
|
||||
|
||||
let mut placeholder_search_logger = logger::DefaultSearchLogger;
|
||||
@@ -675,8 +669,6 @@ pub fn execute_vector_search(
|
||||
placeholder_search_logger,
|
||||
time_budget,
|
||||
ranking_score_threshold,
|
||||
exhaustive_number_hits,
|
||||
max_total_hits,
|
||||
)?;
|
||||
|
||||
Ok(PartialSearchResult {
|
||||
@@ -697,7 +689,6 @@ pub fn execute_search(
|
||||
terms_matching_strategy: TermsMatchingStrategy,
|
||||
scoring_strategy: ScoringStrategy,
|
||||
exhaustive_number_hits: bool,
|
||||
max_total_hits: Option<usize>,
|
||||
mut universe: RoaringBitmap,
|
||||
sort_criteria: &Option<Vec<AscDesc>>,
|
||||
distinct: &Option<String>,
|
||||
@@ -834,8 +825,6 @@ pub fn execute_search(
|
||||
query_graph_logger,
|
||||
time_budget,
|
||||
ranking_score_threshold,
|
||||
exhaustive_number_hits,
|
||||
max_total_hits,
|
||||
)?
|
||||
} else {
|
||||
let ranking_rules =
|
||||
@@ -852,8 +841,6 @@ pub fn execute_search(
|
||||
placeholder_search_logger,
|
||||
time_budget,
|
||||
ranking_score_threshold,
|
||||
exhaustive_number_hits,
|
||||
max_total_hits,
|
||||
)?
|
||||
};
|
||||
|
||||
|
||||
@@ -572,7 +572,7 @@ fn test_distinct_all_candidates() {
|
||||
let mut s = Search::new(&txn, &index);
|
||||
s.terms_matching_strategy(TermsMatchingStrategy::Last);
|
||||
s.sort_criteria(vec![AscDesc::Desc(Member::Field(S("rank1")))]);
|
||||
s.is_exhaustive_pagination(true);
|
||||
s.exhaustive_number_hits(true);
|
||||
|
||||
let SearchResult { documents_ids, candidates, .. } = s.execute().unwrap();
|
||||
let candidates = candidates.iter().collect::<Vec<_>>();
|
||||
|
||||
@@ -18,11 +18,9 @@ pub struct VectorSort<Q: RankingRuleQueryTrait> {
|
||||
distribution_shift: Option<DistributionShift>,
|
||||
embedder_index: u8,
|
||||
quantized: bool,
|
||||
search_k_div_trees: Option<usize>,
|
||||
}
|
||||
|
||||
impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
ctx: &SearchContext<'_>,
|
||||
target: Vec<f32>,
|
||||
@@ -31,7 +29,6 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
|
||||
embedder_name: &str,
|
||||
embedder: &Embedder,
|
||||
quantized: bool,
|
||||
search_k_div_trees: Option<usize>,
|
||||
) -> Result<Self> {
|
||||
let embedder_index = ctx
|
||||
.index
|
||||
@@ -45,7 +42,6 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
|
||||
vector_candidates,
|
||||
cached_sorted_docids: Default::default(),
|
||||
limit,
|
||||
search_k_div_trees,
|
||||
distribution_shift: embedder.distribution(),
|
||||
embedder_index,
|
||||
quantized,
|
||||
@@ -61,13 +57,7 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
|
||||
|
||||
let before = Instant::now();
|
||||
let reader = ArroyWrapper::new(ctx.index.vector_arroy, self.embedder_index, self.quantized);
|
||||
let results = reader.nns_by_vector(
|
||||
ctx.txn,
|
||||
target,
|
||||
self.limit,
|
||||
self.search_k_div_trees,
|
||||
Some(vector_candidates),
|
||||
)?;
|
||||
let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?;
|
||||
self.cached_sorted_docids = results.into_iter();
|
||||
*ctx.vector_store_stats.get_or_insert_default() += VectorStoreStats {
|
||||
total_time: before.elapsed(),
|
||||
|
||||
@@ -135,7 +135,7 @@ where
|
||||
extractor_alloc.0.reset();
|
||||
}
|
||||
|
||||
let total_documents = document_changes.len() as u32;
|
||||
let total_documents = document_changes.len() as u64;
|
||||
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
|
||||
progress.update_progress(progress_step);
|
||||
|
||||
@@ -167,7 +167,7 @@ where
|
||||
});
|
||||
|
||||
let res = extractor.process(changes, context).map_err(Arc::new);
|
||||
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
|
||||
step.fetch_add(items.as_ref().len() as u64, Ordering::Relaxed);
|
||||
|
||||
// send back the doc_alloc in the pool
|
||||
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
|
||||
|
||||
@@ -85,14 +85,14 @@ impl<'pl> DocumentOperation<'pl> {
|
||||
let mut primary_key = None;
|
||||
|
||||
let payload_count = operations.len();
|
||||
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32);
|
||||
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u64);
|
||||
progress.update_progress(progress_step);
|
||||
|
||||
for (payload_index, operation) in operations.into_iter().enumerate() {
|
||||
if must_stop_processing() {
|
||||
return Err(InternalError::AbortedIndexation.into());
|
||||
}
|
||||
step.store(payload_index as u32, Ordering::Relaxed);
|
||||
step.store(payload_index as u64, Ordering::Relaxed);
|
||||
|
||||
let mut bytes = 0;
|
||||
let result = match operation {
|
||||
@@ -145,7 +145,7 @@ impl<'pl> DocumentOperation<'pl> {
|
||||
};
|
||||
operations_stats.push(PayloadStats { document_count, bytes, error });
|
||||
}
|
||||
step.store(payload_count as u32, Ordering::Relaxed);
|
||||
step.store(payload_count as u64, Ordering::Relaxed);
|
||||
|
||||
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
|
||||
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =
|
||||
|
||||
@@ -101,7 +101,7 @@ pub fn settings_change_extract<
|
||||
extractor_alloc.0.reset();
|
||||
}
|
||||
|
||||
let total_documents = documents.len() as u32;
|
||||
let total_documents = documents.len() as u64;
|
||||
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
|
||||
progress.update_progress(progress_step);
|
||||
|
||||
@@ -132,7 +132,7 @@ pub fn settings_change_extract<
|
||||
.filter_map(|item| documents.item_to_database_document(context, item).transpose());
|
||||
|
||||
let res = extractor.process(documents, context).map_err(Arc::new);
|
||||
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
|
||||
step.fetch_add(items.as_ref().len() as u64, Ordering::Relaxed);
|
||||
|
||||
// send back the doc_alloc in the pool
|
||||
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
|
||||
|
||||
@@ -11,7 +11,7 @@ pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progre
|
||||
let field_id_map = index.fields_ids_map(wtxn)?;
|
||||
|
||||
let (update_document_count, sub_step) =
|
||||
AtomicSubStep::<progress::Document>::new(document_count as u32);
|
||||
AtomicSubStep::<progress::Document>::new(document_count as u64);
|
||||
progress.update_progress(sub_step);
|
||||
|
||||
let docids = index.documents_ids(wtxn)?;
|
||||
|
||||
@@ -81,8 +81,8 @@ where
|
||||
target.1,
|
||||
target.2
|
||||
),
|
||||
i as u32,
|
||||
upgrade_path.len() as u32,
|
||||
i as u64,
|
||||
upgrade_path.len() as u64,
|
||||
));
|
||||
regenerate_stats |= upgrade.upgrade(wtxn, index, from, progress.clone())?;
|
||||
index.put_version(wtxn, target)?;
|
||||
|
||||
@@ -133,7 +133,7 @@ impl ArroyWrapper {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng>(
|
||||
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng + Send + Sync>(
|
||||
&mut self,
|
||||
wtxn: &mut RwTxn,
|
||||
progress: &Progress,
|
||||
@@ -483,20 +483,12 @@ impl ArroyWrapper {
|
||||
rtxn: &RoTxn,
|
||||
vector: &[f32],
|
||||
limit: usize,
|
||||
search_k_div_trees: Option<usize>,
|
||||
filter: Option<&RoaringBitmap>,
|
||||
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||
if self.quantized {
|
||||
self._nns_by_vector(
|
||||
rtxn,
|
||||
self.quantized_db(),
|
||||
vector,
|
||||
limit,
|
||||
search_k_div_trees,
|
||||
filter,
|
||||
)
|
||||
self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter)
|
||||
} else {
|
||||
self._nns_by_vector(rtxn, self.angular_db(), vector, limit, search_k_div_trees, filter)
|
||||
self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -506,7 +498,6 @@ impl ArroyWrapper {
|
||||
db: arroy::Database<D>,
|
||||
vector: &[f32],
|
||||
limit: usize,
|
||||
search_k_div_trees: Option<usize>,
|
||||
filter: Option<&RoaringBitmap>,
|
||||
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||
let mut results = Vec::new();
|
||||
@@ -518,12 +509,6 @@ impl ArroyWrapper {
|
||||
if reader.item_ids().is_disjoint(filter) {
|
||||
continue;
|
||||
}
|
||||
if let Some(mut search_k) = search_k_div_trees {
|
||||
search_k *= reader.n_trees();
|
||||
if let Ok(search_k) = search_k.try_into() {
|
||||
searcher.search_k(search_k);
|
||||
}
|
||||
}
|
||||
searcher.candidates(filter);
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ macro_rules! test_distinct {
|
||||
search.query(search::TEST_QUERY);
|
||||
search.limit($limit);
|
||||
search.offset($offset);
|
||||
search.is_exhaustive_pagination($exhaustive);
|
||||
search.exhaustive_number_hits($exhaustive);
|
||||
|
||||
search.terms_matching_strategy(TermsMatchingStrategy::default());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user