Compare commits

..

1 Commits

Author SHA1 Message Date
9762d02900 Replace the HashMap caches by BTreeMaps 2024-12-05 15:22:30 +01:00
29 changed files with 128 additions and 321 deletions

View File

@ -497,6 +497,7 @@ impl IndexScheduler {
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task));
// If the task is not associated with any index, verify that it is an index swap and
// create the batch directly. Otherwise, get the index name associated with the task
@ -506,7 +507,6 @@ impl IndexScheduler {
index_name
} else {
assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty()));
current_batch.processing(Some(&mut task));
return Ok(Some((Batch::IndexSwap { task }, current_batch)));
};

View File

@ -4319,35 +4319,10 @@ mod tests {
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
let query = Query { statuses: Some(vec![Status::Processing]), ..Default::default() };
let (mut batches, _) = index_scheduler
.get_batches_from_authorized_indexes(query.clone(), &AuthFilter::default())
let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
.unwrap();
assert_eq!(batches.len(), 1);
batches[0].started_at = OffsetDateTime::UNIX_EPOCH;
// Insta cannot snapshot our batches because the batch stats contains an enum as key: https://github.com/mitsuhiko/insta/issues/689
let batch = serde_json::to_string_pretty(&batches[0]).unwrap();
snapshot!(batch, @r#"
{
"uid": 0,
"details": {
"primaryKey": "mouse"
},
"stats": {
"totalNbTasks": 1,
"status": {
"processing": 1
},
"types": {
"indexCreation": 1
},
"indexUids": {
"catto": 1
}
},
"startedAt": "1970-01-01T00:00:00Z",
"finishedAt": null
}
"#);
snapshot!(snapshot_bitmap(&batches), @"[0,]"); // only the processing batch in the first tick
let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() };
let (batches, _) = index_scheduler

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(1):
[1,]
{uid: 1, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"beavero":1}}, }
{uid: 1, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"beavero":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(1):
[1,]
{uid: 1, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"beavero":1}}, }
{uid: 1, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"beavero":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"dumpUid":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"dumpCreation":1},"indexUids":{}}, }
{uid: 0, details: {"dumpUid":null}, stats: {"totalNbTasks":1,"status":{"enqueued":1},"types":{"dumpCreation":1},"indexUids":{}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"catto":1}}, }
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"catto":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"catto":1}}, }
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"catto":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"catto":1}}, }
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"catto":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, }
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, }
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, }
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, }
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"index_a":1}}, }
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"index_a":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"index_a":1}}, }
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"index_a":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[0,]
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"index_a":1}}, }
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"index_a":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(1):
[1,]
{uid: 1, details: {"primaryKey":"sheep"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, }
{uid: 1, details: {"primaryKey":"sheep"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"doggo":2}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}

View File

@ -5,7 +5,7 @@ snapshot_kind: text
### Autobatching Enabled = true
### Processing batch Some(0):
[3,]
{uid: 0, details: {"matchedTasks":2,"deletedTasks":null,"originalFilter":"test_query"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"taskDeletion":1},"indexUids":{}}, }
{uid: 0, details: {"matchedTasks":2,"deletedTasks":null,"originalFilter":"test_query"}, stats: {"totalNbTasks":1,"status":{"enqueued":1},"types":{"taskDeletion":1},"indexUids":{}}, }
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}

View File

@ -67,7 +67,7 @@ impl ProcessingBatch {
task.batch_uid = Some(self.uid);
// We don't store the statuses in the map since they're all enqueued but we must
// still store them in the stats since that can be displayed.
*self.stats.status.entry(Status::Processing).or_default() += 1;
*self.stats.status.entry(task.status).or_default() += 1;
self.kinds.insert(task.kind.as_kind());
*self.stats.types.entry(task.kind.as_kind()).or_default() += 1;

View File

@ -279,7 +279,6 @@ InvalidSearchPage , InvalidRequest , BAD_REQUEST ;
InvalidSearchQ , InvalidRequest , BAD_REQUEST ;
InvalidFacetSearchQuery , InvalidRequest , BAD_REQUEST ;
InvalidFacetSearchName , InvalidRequest , BAD_REQUEST ;
FacetSearchDisabled , InvalidRequest , BAD_REQUEST ;
InvalidSearchVector , InvalidRequest , BAD_REQUEST ;
InvalidSearchShowMatchesPosition , InvalidRequest , BAD_REQUEST ;
InvalidSearchShowRankingScore , InvalidRequest , BAD_REQUEST ;

View File

@ -1407,13 +1407,6 @@ pub fn perform_facet_search(
None => TimeBudget::default(),
};
if !index.facet_search(&rtxn)? {
return Err(ResponseError::from_msg(
"The facet search is disabled for this index".to_string(),
Code::FacetSearchDisabled,
));
}
// In the faceted search context, we want to use the intersection between the locales provided by the user
// and the locales of the facet string.
// If the facet string is not localized, we **ignore** the locales provided by the user because the facet data has no locale.

View File

@ -52,25 +52,6 @@ impl Value {
}
self
}
/// Return `true` if the `status` field is set to `failed`.
/// Panic if the `status` field doesn't exists.
#[track_caller]
pub fn is_fail(&self) -> bool {
if !self["status"].is_string() {
panic!("Called `is_fail` on {}", serde_json::to_string_pretty(&self.0).unwrap());
}
self["status"] == serde_json::Value::String(String::from("failed"))
}
// Panic if the json doesn't contain the `status` field set to "succeeded"
#[track_caller]
pub fn failed(&self) -> &Self {
if !self.is_fail() {
panic!("Called failed on {}", serde_json::to_string_pretty(&self.0).unwrap());
}
self
}
}
impl From<serde_json::Value> for Value {

View File

@ -221,15 +221,8 @@ async fn add_documents_and_deactivate_facet_search() {
let (response, code) =
index.facet_search(json!({"facetName": "genres", "facetQuery": "a"})).await;
assert_eq!(code, 400, "{}", response);
snapshot!(response, @r###"
{
"message": "The facet search is disabled for this index",
"code": "facet_search_disabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#facet_search_disabled"
}
"###);
assert_eq!(code, 200, "{}", response);
assert_eq!(dbg!(response)["facetHits"].as_array().unwrap().len(), 0);
}
#[actix_rt::test]
@ -252,15 +245,8 @@ async fn deactivate_facet_search_and_add_documents() {
let (response, code) =
index.facet_search(json!({"facetName": "genres", "facetQuery": "a"})).await;
assert_eq!(code, 400, "{}", response);
snapshot!(response, @r###"
{
"message": "The facet search is disabled for this index",
"code": "facet_search_disabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#facet_search_disabled"
}
"###);
assert_eq!(code, 200, "{}", response);
assert_eq!(dbg!(response)["facetHits"].as_array().unwrap().len(), 0);
}
#[actix_rt::test]

View File

@ -129,11 +129,11 @@ async fn perform_on_demand_snapshot() {
index.load_test_set().await;
let (task, _) = server.index("doggo").create(Some("bone")).await;
index.wait_task(task.uid()).await.succeeded();
server.index("doggo").create(Some("bone")).await;
index.wait_task(2).await;
let (task, _) = server.index("doggo").create(Some("bone")).await;
index.wait_task(task.uid()).await.failed();
server.index("doggo").create(Some("bone")).await;
index.wait_task(2).await;
let (task, code) = server.create_snapshot().await;
snapshot!(code, @"202 Accepted");

View File

@ -1,6 +1,5 @@
use std::collections::{BTreeMap, BTreeSet};
use either::Either;
use heed::RoTxn;
use raw_collections::RawMap;
use serde_json::value::RawValue;
@ -210,34 +209,29 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
for MergedDocument<'d, 'doc, 't, Mapper>
{
fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'d str, &'d RawValue)>> {
match &self.db {
Some(db) => {
let mut new_doc_it = self.new_doc.iter_top_level_fields();
let mut db_it = db.iter_top_level_fields();
let mut seen_fields = BTreeSet::new();
let mut new_doc_it = self.new_doc.iter_top_level_fields();
let mut db_it = self.db.iter().flat_map(|db| db.iter_top_level_fields());
let mut seen_fields = BTreeSet::new();
Either::Left(std::iter::from_fn(move || {
if let Some(next) = new_doc_it.next() {
if let Ok((name, _)) = next {
seen_fields.insert(name);
}
return Some(next);
}
loop {
match db_it.next()? {
Ok((name, value)) => {
if seen_fields.contains(name) {
continue;
}
return Some(Ok((name, value)));
}
Err(err) => return Some(Err(err)),
}
}
}))
std::iter::from_fn(move || {
if let Some(next) = new_doc_it.next() {
if let Ok((name, _)) = next {
seen_fields.insert(name);
}
return Some(next);
}
None => Either::Right(self.new_doc.iter_top_level_fields()),
}
loop {
match db_it.next()? {
Ok((name, value)) => {
if seen_fields.contains(name) {
continue;
}
return Some(Ok((name, value)));
}
Err(err) => return Some(Err(err)),
}
}
})
}
fn vectors_field(&self) -> Result<Option<&'d RawValue>> {

View File

@ -1,10 +1,7 @@
use bumpalo::Bump;
use heed::RoTxn;
use super::document::{
Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
};
use super::extract::perm_json_p;
use super::document::{DocumentFromDb, DocumentFromVersions, MergedDocument, Versions};
use super::vector_document::{
MergedVectorDocument, VectorDocumentFromDb, VectorDocumentFromVersions,
};
@ -167,80 +164,6 @@ impl<'doc> Update<'doc> {
}
}
/// Returns whether the updated version of the document is different from the current version for the passed subset of fields.
///
/// `true` if at least one top-level-field that is a exactly a member of field or a parent of a member of field changed.
/// Otherwise `false`.
pub fn has_changed_for_fields<'t, Mapper: FieldIdMapper>(
&self,
fields: Option<&[&str]>,
rtxn: &'t RoTxn,
index: &'t Index,
mapper: &'t Mapper,
) -> Result<bool> {
let mut changed = false;
let mut cached_current = None;
let mut updated_selected_field_count = 0;
for entry in self.updated().iter_top_level_fields() {
let (key, updated_value) = entry?;
if perm_json_p::select_field(key, fields, &[]) == perm_json_p::Selection::Skip {
continue;
}
updated_selected_field_count += 1;
let current = match cached_current {
Some(current) => current,
None => self.current(rtxn, index, mapper)?,
};
let current_value = current.top_level_field(key)?;
let Some(current_value) = current_value else {
changed = true;
break;
};
if current_value.get() != updated_value.get() {
changed = true;
break;
}
cached_current = Some(current);
}
if !self.has_deletion {
// no field deletion, so fields that don't appear in `updated` cannot have changed
return Ok(changed);
}
if changed {
return Ok(true);
}
// we saw all updated fields, and set `changed` if any field wasn't in `current`.
// so if there are as many fields in `current` as in `updated`, then nothing changed.
// If there is any more fields in `current`, then they are missing in `updated`.
let has_deleted_fields = {
let current = match cached_current {
Some(current) => current,
None => self.current(rtxn, index, mapper)?,
};
let mut current_selected_field_count = 0;
for entry in current.iter_top_level_fields() {
let (key, _) = entry?;
if perm_json_p::select_field(key, fields, &[]) == perm_json_p::Selection::Skip {
continue;
}
current_selected_field_count += 1;
}
current_selected_field_count != updated_selected_field_count
};
Ok(has_deleted_fields)
}
pub fn updated_vectors(
&self,
doc_alloc: &'doc Bump,

View File

@ -60,9 +60,10 @@
//! For now we can use a grenad sorter for spilling even thought I think
//! it's not the most efficient way (too many files open, sorting entries).
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use std::collections::{BTreeMap, BinaryHeap};
use std::fs::File;
use std::hash::BuildHasher;
use std::io::BufReader;
@ -70,10 +71,7 @@ use std::{io, iter, mem};
use bumpalo::Bump;
use grenad::ReaderCursor;
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use raw_collections::bbbul::{BitPacker, BitPacker4x};
use raw_collections::map::FrozenMap;
use raw_collections::{Bbbul, FrozenBbbul};
use roaring::RoaringBitmap;
use rustc_hash::FxBuildHasher;
@ -105,9 +103,7 @@ impl<'extractor> BalancedCaches<'extractor> {
hasher: FxBuildHasher,
max_memory,
caches: InnerCaches::Normal(NormalCaches {
caches: iter::repeat_with(|| HashMap::with_hasher_in(FxBuildHasher, alloc))
.take(buckets)
.collect(),
caches: iter::repeat_with(BTreeMap::new).take(buckets).collect(),
}),
alloc,
}
@ -166,8 +162,8 @@ impl<'extractor> BalancedCaches<'extractor> {
rayon::current_thread_index().unwrap_or(0)
);
let allocated: usize = normal_caches.caches.iter().map(|m| m.allocation_size()).sum();
tracing::trace!("The last allocated HashMap took {allocated} bytes");
// let allocated: usize = normal_caches.caches.iter().map(|m| m.allocation_size()).sum();
// tracing::trace!("The last allocated BTreeMap took {allocated} bytes");
let dummy = NormalCaches { caches: Vec::new() };
let NormalCaches { caches: cache_maps } = mem::replace(normal_caches, dummy);
@ -187,21 +183,17 @@ impl<'extractor> BalancedCaches<'extractor> {
// that are the same size.
let map = unsafe {
std::mem::transmute::<
&mut HashMap<
&mut BTreeMap<
&[u8],
DelAddBbbul<BitPacker4x>, // from this
FxBuildHasher,
&Bump,
>,
&mut HashMap<
&mut BTreeMap<
&[u8],
FrozenDelAddBbbul<BitPacker4x>, // to that
FxBuildHasher,
&Bump,
>,
>(map)
};
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() })
Ok(FrozenCache { bucket, cache: FrozenBTreeMap::new(map), spilled: Vec::new() })
})
.collect(),
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches
@ -220,21 +212,17 @@ impl<'extractor> BalancedCaches<'extractor> {
// that are the same size.
let map = unsafe {
std::mem::transmute::<
&mut HashMap<
&mut BTreeMap<
&[u8],
DelAddBbbul<BitPacker4x>, // from this
FxBuildHasher,
&Bump,
>,
&mut HashMap<
&mut BTreeMap<
&[u8],
FrozenDelAddBbbul<BitPacker4x>, // to that
FxBuildHasher,
&Bump,
>,
>(map)
};
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled })
Ok(FrozenCache { bucket, cache: FrozenBTreeMap::new(map), spilled })
})
.collect(),
}
@ -245,14 +233,7 @@ impl<'extractor> BalancedCaches<'extractor> {
unsafe impl MostlySend for BalancedCaches<'_> {}
struct NormalCaches<'extractor> {
caches: Vec<
HashMap<
&'extractor [u8],
DelAddBbbul<'extractor, BitPacker4x>,
FxBuildHasher,
&'extractor Bump,
>,
>,
caches: Vec<BTreeMap<&'extractor [u8], DelAddBbbul<'extractor, BitPacker4x>>>,
}
impl<'extractor> NormalCaches<'extractor> {
@ -266,17 +247,13 @@ impl<'extractor> NormalCaches<'extractor> {
) {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
let cache = &mut self.caches[bucket];
match cache.get_mut(key) {
Some(deladd) => {
deladd.del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
}
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(
hash,
alloc.alloc_slice_copy(key),
DelAddBbbul::new_del_u32_in(n, alloc),
);
None => {
cache.insert(alloc.alloc_slice_copy(key), DelAddBbbul::new_del_u32_in(n, alloc));
}
}
}
@ -291,30 +268,20 @@ impl<'extractor> NormalCaches<'extractor> {
) {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
let cache = &mut self.caches[bucket];
match cache.get_mut(key) {
Some(deladd) => {
deladd.add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
}
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(
hash,
alloc.alloc_slice_copy(key),
DelAddBbbul::new_add_u32_in(n, alloc),
);
None => {
cache.insert(alloc.alloc_slice_copy(key), DelAddBbbul::new_add_u32_in(n, alloc));
}
}
}
}
struct SpillingCaches<'extractor> {
caches: Vec<
HashMap<
&'extractor [u8],
DelAddBbbul<'extractor, BitPacker4x>,
FxBuildHasher,
&'extractor Bump,
>,
>,
caches: Vec<BTreeMap<&'extractor [u8], DelAddBbbul<'extractor, BitPacker4x>>>,
spilled_entries: Vec<grenad::Sorter<MergeDeladdCboRoaringBitmaps>>,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
@ -322,14 +289,7 @@ struct SpillingCaches<'extractor> {
impl<'extractor> SpillingCaches<'extractor> {
fn from_cache_maps(
caches: Vec<
HashMap<
&'extractor [u8],
DelAddBbbul<'extractor, BitPacker4x>,
FxBuildHasher,
&'extractor Bump,
>,
>,
caches: Vec<BTreeMap<&'extractor [u8], DelAddBbbul<'extractor, BitPacker4x>>>,
) -> SpillingCaches<'extractor> {
SpillingCaches {
spilled_entries: iter::repeat_with(|| {
@ -356,12 +316,12 @@ impl<'extractor> SpillingCaches<'extractor> {
) -> Result<()> {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
match self.caches[bucket].get_mut(key) {
Some(deladd) => {
deladd.del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
Ok(())
}
RawEntryMut::Vacant(_entry) => spill_entry_to_sorter(
None => spill_entry_to_sorter(
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
@ -381,12 +341,12 @@ impl<'extractor> SpillingCaches<'extractor> {
) -> Result<()> {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
match self.caches[bucket].get_mut(key) {
Some(deladd) => {
deladd.add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
Ok(())
}
RawEntryMut::Vacant(_entry) => spill_entry_to_sorter(
None => spill_entry_to_sorter(
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
@ -441,13 +401,7 @@ fn spill_entry_to_sorter(
pub struct FrozenCache<'a, 'extractor> {
bucket: usize,
cache: FrozenMap<
'a,
'extractor,
&'extractor [u8],
FrozenDelAddBbbul<'extractor, BitPacker4x>,
FxBuildHasher,
>,
cache: FrozenBTreeMap<'a, &'extractor [u8], FrozenDelAddBbbul<'extractor, BitPacker4x>>,
spilled: Vec<grenad::Reader<BufReader<File>>>,
}
@ -466,6 +420,36 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>(
Ok(bucket_caches)
}
pub struct FrozenBTreeMap<'a, K, V>(&'a mut BTreeMap<K, V>);
unsafe impl<'a, K, V> Send for FrozenBTreeMap<'a, K, V>
where
K: Send,
V: Send,
{
}
impl<'a, K, V> FrozenBTreeMap<'a, K, V> {
#[inline]
pub fn new(map: &'a mut BTreeMap<K, V>) -> Self {
Self(map)
}
#[inline]
pub fn iter_mut(&mut self) -> std::collections::btree_map::IterMut<'_, K, V> {
self.0.iter_mut()
}
#[inline]
pub fn get_mut<Q>(&mut self, key: &Q) -> Option<&mut V>
where
K: Borrow<Q> + Ord,
Q: Ord + ?Sized,
{
self.0.get_mut(key)
}
}
/// Merges the caches that must be all associated to the same bucket
/// but make sure to sort the different buckets before performing the merges.
///
@ -491,7 +475,7 @@ where
for (source_index, source) in readers.into_iter().enumerate() {
let mut cursor = source.into_cursor()?;
if cursor.move_on_next()?.is_some() {
heap.push(Entry { cursor, source_index });
heap.push(CursorEntry { cursor, source_index });
}
}
@ -544,12 +528,11 @@ where
// Then manage the content on the HashMap entries that weren't taken (mem::take).
while let Some(mut map) = maps.pop() {
// Make sure we don't try to work with entries already managed by the spilled
let mut ordered_entries: Vec<_> =
map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect();
ordered_entries.sort_unstable_by_key(|(key, _)| *key);
for (key, bbbul) in map.iter_mut() {
if bbbul.is_empty() {
continue;
}
for (key, bbbul) in ordered_entries {
let mut output = DelAddRoaringBitmap::empty();
output.union_and_clear_bbbul(bbbul);
@ -567,29 +550,29 @@ where
Ok(())
}
struct Entry<R> {
struct CursorEntry<R> {
cursor: ReaderCursor<R>,
source_index: usize,
}
impl<R> Ord for Entry<R> {
fn cmp(&self, other: &Entry<R>) -> Ordering {
impl<R> Ord for CursorEntry<R> {
fn cmp(&self, other: &CursorEntry<R>) -> Ordering {
let skey = self.cursor.current().map(|(k, _)| k);
let okey = other.cursor.current().map(|(k, _)| k);
skey.cmp(&okey).then(self.source_index.cmp(&other.source_index)).reverse()
}
}
impl<R> Eq for Entry<R> {}
impl<R> Eq for CursorEntry<R> {}
impl<R> PartialEq for Entry<R> {
fn eq(&self, other: &Entry<R>) -> bool {
impl<R> PartialEq for CursorEntry<R> {
fn eq(&self, other: &CursorEntry<R>) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<R> PartialOrd for Entry<R> {
fn partial_cmp(&self, other: &Entry<R>) -> Option<Ordering> {
impl<R> PartialOrd for CursorEntry<R> {
fn partial_cmp(&self, other: &CursorEntry<R>) -> Option<Ordering> {
Some(self.cmp(other))
}
}

View File

@ -97,15 +97,6 @@ impl FacetedDocidsExtractor {
},
),
DocumentChange::Update(inner) => {
if !inner.has_changed_for_fields(
Some(attributes_to_extract),
rtxn,
index,
context.db_fields_ids_map,
)? {
return Ok(());
}
extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map)?,

View File

@ -351,15 +351,6 @@ impl WordDocidsExtractors {
)?;
}
DocumentChange::Update(inner) => {
if !inner.has_changed_for_fields(
document_tokenizer.attribute_to_extract,
&context.rtxn,
context.index,
context.db_fields_ids_map,
)? {
return Ok(());
}
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter.insert_del_u32(
fid,

View File

@ -70,15 +70,6 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
)?;
}
DocumentChange::Update(inner) => {
if !inner.has_changed_for_fields(
document_tokenizer.attribute_to_extract,
rtxn,
index,
context.db_fields_ids_map,
)? {
return Ok(());
}
let document = inner.current(rtxn, index, context.db_fields_ids_map)?;
process_document_tokens(
document,