mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-20 21:30:58 +00:00
Compare commits
1 Commits
more-effic
...
replace-ha
Author | SHA1 | Date | |
---|---|---|---|
9762d02900 |
@ -497,6 +497,7 @@ impl IndexScheduler {
|
||||
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
|
||||
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
|
||||
let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
current_batch.processing(Some(&mut task));
|
||||
|
||||
// If the task is not associated with any index, verify that it is an index swap and
|
||||
// create the batch directly. Otherwise, get the index name associated with the task
|
||||
@ -506,7 +507,6 @@ impl IndexScheduler {
|
||||
index_name
|
||||
} else {
|
||||
assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty()));
|
||||
current_batch.processing(Some(&mut task));
|
||||
return Ok(Some((Batch::IndexSwap { task }, current_batch)));
|
||||
};
|
||||
|
||||
|
@ -4319,35 +4319,10 @@ mod tests {
|
||||
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
|
||||
|
||||
let query = Query { statuses: Some(vec![Status::Processing]), ..Default::default() };
|
||||
let (mut batches, _) = index_scheduler
|
||||
.get_batches_from_authorized_indexes(query.clone(), &AuthFilter::default())
|
||||
let (batches, _) = index_scheduler
|
||||
.get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default())
|
||||
.unwrap();
|
||||
assert_eq!(batches.len(), 1);
|
||||
batches[0].started_at = OffsetDateTime::UNIX_EPOCH;
|
||||
// Insta cannot snapshot our batches because the batch stats contains an enum as key: https://github.com/mitsuhiko/insta/issues/689
|
||||
let batch = serde_json::to_string_pretty(&batches[0]).unwrap();
|
||||
snapshot!(batch, @r#"
|
||||
{
|
||||
"uid": 0,
|
||||
"details": {
|
||||
"primaryKey": "mouse"
|
||||
},
|
||||
"stats": {
|
||||
"totalNbTasks": 1,
|
||||
"status": {
|
||||
"processing": 1
|
||||
},
|
||||
"types": {
|
||||
"indexCreation": 1
|
||||
},
|
||||
"indexUids": {
|
||||
"catto": 1
|
||||
}
|
||||
},
|
||||
"startedAt": "1970-01-01T00:00:00Z",
|
||||
"finishedAt": null
|
||||
}
|
||||
"#);
|
||||
snapshot!(snapshot_bitmap(&batches), @"[0,]"); // only the processing batch in the first tick
|
||||
|
||||
let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() };
|
||||
let (batches, _) = index_scheduler
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(1):
|
||||
[1,]
|
||||
{uid: 1, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"beavero":1}}, }
|
||||
{uid: 1, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"beavero":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, batch_uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(1):
|
||||
[1,]
|
||||
{uid: 1, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"beavero":1}}, }
|
||||
{uid: 1, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"beavero":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, batch_uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"dumpUid":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"dumpCreation":1},"indexUids":{}}, }
|
||||
{uid: 0, details: {"dumpUid":null}, stats: {"totalNbTasks":1,"status":{"enqueued":1},"types":{"dumpCreation":1},"indexUids":{}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"catto":1}}, }
|
||||
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"catto":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"catto":1}}, }
|
||||
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"catto":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"catto":1}}, }
|
||||
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"catto":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, }
|
||||
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, }
|
||||
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, }
|
||||
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, }
|
||||
{uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"index_a":1}}, }
|
||||
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"index_a":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"index_a":1}}, }
|
||||
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"index_a":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[0,]
|
||||
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"index_a":1}}, }
|
||||
{uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"index_a":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(1):
|
||||
[1,]
|
||||
{uid: 1, details: {"primaryKey":"sheep"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, }
|
||||
{uid: 1, details: {"primaryKey":"sheep"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"doggo":2}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, batch_uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
|
@ -5,7 +5,7 @@ snapshot_kind: text
|
||||
### Autobatching Enabled = true
|
||||
### Processing batch Some(0):
|
||||
[3,]
|
||||
{uid: 0, details: {"matchedTasks":2,"deletedTasks":null,"originalFilter":"test_query"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"taskDeletion":1},"indexUids":{}}, }
|
||||
{uid: 0, details: {"matchedTasks":2,"deletedTasks":null,"originalFilter":"test_query"}, stats: {"totalNbTasks":1,"status":{"enqueued":1},"types":{"taskDeletion":1},"indexUids":{}}, }
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
|
@ -67,7 +67,7 @@ impl ProcessingBatch {
|
||||
task.batch_uid = Some(self.uid);
|
||||
// We don't store the statuses in the map since they're all enqueued but we must
|
||||
// still store them in the stats since that can be displayed.
|
||||
*self.stats.status.entry(Status::Processing).or_default() += 1;
|
||||
*self.stats.status.entry(task.status).or_default() += 1;
|
||||
|
||||
self.kinds.insert(task.kind.as_kind());
|
||||
*self.stats.types.entry(task.kind.as_kind()).or_default() += 1;
|
||||
|
@ -279,7 +279,6 @@ InvalidSearchPage , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidSearchQ , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidFacetSearchQuery , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidFacetSearchName , InvalidRequest , BAD_REQUEST ;
|
||||
FacetSearchDisabled , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidSearchVector , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidSearchShowMatchesPosition , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidSearchShowRankingScore , InvalidRequest , BAD_REQUEST ;
|
||||
|
@ -1407,13 +1407,6 @@ pub fn perform_facet_search(
|
||||
None => TimeBudget::default(),
|
||||
};
|
||||
|
||||
if !index.facet_search(&rtxn)? {
|
||||
return Err(ResponseError::from_msg(
|
||||
"The facet search is disabled for this index".to_string(),
|
||||
Code::FacetSearchDisabled,
|
||||
));
|
||||
}
|
||||
|
||||
// In the faceted search context, we want to use the intersection between the locales provided by the user
|
||||
// and the locales of the facet string.
|
||||
// If the facet string is not localized, we **ignore** the locales provided by the user because the facet data has no locale.
|
||||
|
@ -52,25 +52,6 @@ impl Value {
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Return `true` if the `status` field is set to `failed`.
|
||||
/// Panic if the `status` field doesn't exists.
|
||||
#[track_caller]
|
||||
pub fn is_fail(&self) -> bool {
|
||||
if !self["status"].is_string() {
|
||||
panic!("Called `is_fail` on {}", serde_json::to_string_pretty(&self.0).unwrap());
|
||||
}
|
||||
self["status"] == serde_json::Value::String(String::from("failed"))
|
||||
}
|
||||
|
||||
// Panic if the json doesn't contain the `status` field set to "succeeded"
|
||||
#[track_caller]
|
||||
pub fn failed(&self) -> &Self {
|
||||
if !self.is_fail() {
|
||||
panic!("Called failed on {}", serde_json::to_string_pretty(&self.0).unwrap());
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Value> for Value {
|
||||
|
@ -221,15 +221,8 @@ async fn add_documents_and_deactivate_facet_search() {
|
||||
let (response, code) =
|
||||
index.facet_search(json!({"facetName": "genres", "facetQuery": "a"})).await;
|
||||
|
||||
assert_eq!(code, 400, "{}", response);
|
||||
snapshot!(response, @r###"
|
||||
{
|
||||
"message": "The facet search is disabled for this index",
|
||||
"code": "facet_search_disabled",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#facet_search_disabled"
|
||||
}
|
||||
"###);
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
assert_eq!(dbg!(response)["facetHits"].as_array().unwrap().len(), 0);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
@ -252,15 +245,8 @@ async fn deactivate_facet_search_and_add_documents() {
|
||||
let (response, code) =
|
||||
index.facet_search(json!({"facetName": "genres", "facetQuery": "a"})).await;
|
||||
|
||||
assert_eq!(code, 400, "{}", response);
|
||||
snapshot!(response, @r###"
|
||||
{
|
||||
"message": "The facet search is disabled for this index",
|
||||
"code": "facet_search_disabled",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#facet_search_disabled"
|
||||
}
|
||||
"###);
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
assert_eq!(dbg!(response)["facetHits"].as_array().unwrap().len(), 0);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
|
@ -129,11 +129,11 @@ async fn perform_on_demand_snapshot() {
|
||||
|
||||
index.load_test_set().await;
|
||||
|
||||
let (task, _) = server.index("doggo").create(Some("bone")).await;
|
||||
index.wait_task(task.uid()).await.succeeded();
|
||||
server.index("doggo").create(Some("bone")).await;
|
||||
index.wait_task(2).await;
|
||||
|
||||
let (task, _) = server.index("doggo").create(Some("bone")).await;
|
||||
index.wait_task(task.uid()).await.failed();
|
||||
server.index("doggo").create(Some("bone")).await;
|
||||
index.wait_task(2).await;
|
||||
|
||||
let (task, code) = server.create_snapshot().await;
|
||||
snapshot!(code, @"202 Accepted");
|
||||
|
@ -1,6 +1,5 @@
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use either::Either;
|
||||
use heed::RoTxn;
|
||||
use raw_collections::RawMap;
|
||||
use serde_json::value::RawValue;
|
||||
@ -210,34 +209,29 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
|
||||
for MergedDocument<'d, 'doc, 't, Mapper>
|
||||
{
|
||||
fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'d str, &'d RawValue)>> {
|
||||
match &self.db {
|
||||
Some(db) => {
|
||||
let mut new_doc_it = self.new_doc.iter_top_level_fields();
|
||||
let mut db_it = db.iter_top_level_fields();
|
||||
let mut seen_fields = BTreeSet::new();
|
||||
let mut new_doc_it = self.new_doc.iter_top_level_fields();
|
||||
let mut db_it = self.db.iter().flat_map(|db| db.iter_top_level_fields());
|
||||
let mut seen_fields = BTreeSet::new();
|
||||
|
||||
Either::Left(std::iter::from_fn(move || {
|
||||
if let Some(next) = new_doc_it.next() {
|
||||
if let Ok((name, _)) = next {
|
||||
seen_fields.insert(name);
|
||||
}
|
||||
return Some(next);
|
||||
}
|
||||
loop {
|
||||
match db_it.next()? {
|
||||
Ok((name, value)) => {
|
||||
if seen_fields.contains(name) {
|
||||
continue;
|
||||
}
|
||||
return Some(Ok((name, value)));
|
||||
}
|
||||
Err(err) => return Some(Err(err)),
|
||||
}
|
||||
}
|
||||
}))
|
||||
std::iter::from_fn(move || {
|
||||
if let Some(next) = new_doc_it.next() {
|
||||
if let Ok((name, _)) = next {
|
||||
seen_fields.insert(name);
|
||||
}
|
||||
return Some(next);
|
||||
}
|
||||
None => Either::Right(self.new_doc.iter_top_level_fields()),
|
||||
}
|
||||
loop {
|
||||
match db_it.next()? {
|
||||
Ok((name, value)) => {
|
||||
if seen_fields.contains(name) {
|
||||
continue;
|
||||
}
|
||||
return Some(Ok((name, value)));
|
||||
}
|
||||
Err(err) => return Some(Err(err)),
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn vectors_field(&self) -> Result<Option<&'d RawValue>> {
|
||||
|
@ -1,10 +1,7 @@
|
||||
use bumpalo::Bump;
|
||||
use heed::RoTxn;
|
||||
|
||||
use super::document::{
|
||||
Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
|
||||
};
|
||||
use super::extract::perm_json_p;
|
||||
use super::document::{DocumentFromDb, DocumentFromVersions, MergedDocument, Versions};
|
||||
use super::vector_document::{
|
||||
MergedVectorDocument, VectorDocumentFromDb, VectorDocumentFromVersions,
|
||||
};
|
||||
@ -167,80 +164,6 @@ impl<'doc> Update<'doc> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether the updated version of the document is different from the current version for the passed subset of fields.
|
||||
///
|
||||
/// `true` if at least one top-level-field that is a exactly a member of field or a parent of a member of field changed.
|
||||
/// Otherwise `false`.
|
||||
pub fn has_changed_for_fields<'t, Mapper: FieldIdMapper>(
|
||||
&self,
|
||||
fields: Option<&[&str]>,
|
||||
rtxn: &'t RoTxn,
|
||||
index: &'t Index,
|
||||
mapper: &'t Mapper,
|
||||
) -> Result<bool> {
|
||||
let mut changed = false;
|
||||
let mut cached_current = None;
|
||||
let mut updated_selected_field_count = 0;
|
||||
|
||||
for entry in self.updated().iter_top_level_fields() {
|
||||
let (key, updated_value) = entry?;
|
||||
|
||||
if perm_json_p::select_field(key, fields, &[]) == perm_json_p::Selection::Skip {
|
||||
continue;
|
||||
}
|
||||
|
||||
updated_selected_field_count += 1;
|
||||
let current = match cached_current {
|
||||
Some(current) => current,
|
||||
None => self.current(rtxn, index, mapper)?,
|
||||
};
|
||||
let current_value = current.top_level_field(key)?;
|
||||
let Some(current_value) = current_value else {
|
||||
changed = true;
|
||||
break;
|
||||
};
|
||||
|
||||
if current_value.get() != updated_value.get() {
|
||||
changed = true;
|
||||
break;
|
||||
}
|
||||
cached_current = Some(current);
|
||||
}
|
||||
|
||||
if !self.has_deletion {
|
||||
// no field deletion, so fields that don't appear in `updated` cannot have changed
|
||||
return Ok(changed);
|
||||
}
|
||||
|
||||
if changed {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// we saw all updated fields, and set `changed` if any field wasn't in `current`.
|
||||
// so if there are as many fields in `current` as in `updated`, then nothing changed.
|
||||
// If there is any more fields in `current`, then they are missing in `updated`.
|
||||
let has_deleted_fields = {
|
||||
let current = match cached_current {
|
||||
Some(current) => current,
|
||||
None => self.current(rtxn, index, mapper)?,
|
||||
};
|
||||
|
||||
let mut current_selected_field_count = 0;
|
||||
for entry in current.iter_top_level_fields() {
|
||||
let (key, _) = entry?;
|
||||
|
||||
if perm_json_p::select_field(key, fields, &[]) == perm_json_p::Selection::Skip {
|
||||
continue;
|
||||
}
|
||||
current_selected_field_count += 1;
|
||||
}
|
||||
|
||||
current_selected_field_count != updated_selected_field_count
|
||||
};
|
||||
|
||||
Ok(has_deleted_fields)
|
||||
}
|
||||
|
||||
pub fn updated_vectors(
|
||||
&self,
|
||||
doc_alloc: &'doc Bump,
|
||||
|
@ -60,9 +60,10 @@
|
||||
//! For now we can use a grenad sorter for spilling even thought I think
|
||||
//! it's not the most efficient way (too many files open, sorting entries).
|
||||
|
||||
use std::borrow::Borrow;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::binary_heap::PeekMut;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::{BTreeMap, BinaryHeap};
|
||||
use std::fs::File;
|
||||
use std::hash::BuildHasher;
|
||||
use std::io::BufReader;
|
||||
@ -70,10 +71,7 @@ use std::{io, iter, mem};
|
||||
|
||||
use bumpalo::Bump;
|
||||
use grenad::ReaderCursor;
|
||||
use hashbrown::hash_map::RawEntryMut;
|
||||
use hashbrown::HashMap;
|
||||
use raw_collections::bbbul::{BitPacker, BitPacker4x};
|
||||
use raw_collections::map::FrozenMap;
|
||||
use raw_collections::{Bbbul, FrozenBbbul};
|
||||
use roaring::RoaringBitmap;
|
||||
use rustc_hash::FxBuildHasher;
|
||||
@ -105,9 +103,7 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
hasher: FxBuildHasher,
|
||||
max_memory,
|
||||
caches: InnerCaches::Normal(NormalCaches {
|
||||
caches: iter::repeat_with(|| HashMap::with_hasher_in(FxBuildHasher, alloc))
|
||||
.take(buckets)
|
||||
.collect(),
|
||||
caches: iter::repeat_with(BTreeMap::new).take(buckets).collect(),
|
||||
}),
|
||||
alloc,
|
||||
}
|
||||
@ -166,8 +162,8 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
rayon::current_thread_index().unwrap_or(0)
|
||||
);
|
||||
|
||||
let allocated: usize = normal_caches.caches.iter().map(|m| m.allocation_size()).sum();
|
||||
tracing::trace!("The last allocated HashMap took {allocated} bytes");
|
||||
// let allocated: usize = normal_caches.caches.iter().map(|m| m.allocation_size()).sum();
|
||||
// tracing::trace!("The last allocated BTreeMap took {allocated} bytes");
|
||||
|
||||
let dummy = NormalCaches { caches: Vec::new() };
|
||||
let NormalCaches { caches: cache_maps } = mem::replace(normal_caches, dummy);
|
||||
@ -187,21 +183,17 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
// that are the same size.
|
||||
let map = unsafe {
|
||||
std::mem::transmute::<
|
||||
&mut HashMap<
|
||||
&mut BTreeMap<
|
||||
&[u8],
|
||||
DelAddBbbul<BitPacker4x>, // from this
|
||||
FxBuildHasher,
|
||||
&Bump,
|
||||
>,
|
||||
&mut HashMap<
|
||||
&mut BTreeMap<
|
||||
&[u8],
|
||||
FrozenDelAddBbbul<BitPacker4x>, // to that
|
||||
FxBuildHasher,
|
||||
&Bump,
|
||||
>,
|
||||
>(map)
|
||||
};
|
||||
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() })
|
||||
Ok(FrozenCache { bucket, cache: FrozenBTreeMap::new(map), spilled: Vec::new() })
|
||||
})
|
||||
.collect(),
|
||||
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches
|
||||
@ -220,21 +212,17 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
// that are the same size.
|
||||
let map = unsafe {
|
||||
std::mem::transmute::<
|
||||
&mut HashMap<
|
||||
&mut BTreeMap<
|
||||
&[u8],
|
||||
DelAddBbbul<BitPacker4x>, // from this
|
||||
FxBuildHasher,
|
||||
&Bump,
|
||||
>,
|
||||
&mut HashMap<
|
||||
&mut BTreeMap<
|
||||
&[u8],
|
||||
FrozenDelAddBbbul<BitPacker4x>, // to that
|
||||
FxBuildHasher,
|
||||
&Bump,
|
||||
>,
|
||||
>(map)
|
||||
};
|
||||
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled })
|
||||
Ok(FrozenCache { bucket, cache: FrozenBTreeMap::new(map), spilled })
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
@ -245,14 +233,7 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
unsafe impl MostlySend for BalancedCaches<'_> {}
|
||||
|
||||
struct NormalCaches<'extractor> {
|
||||
caches: Vec<
|
||||
HashMap<
|
||||
&'extractor [u8],
|
||||
DelAddBbbul<'extractor, BitPacker4x>,
|
||||
FxBuildHasher,
|
||||
&'extractor Bump,
|
||||
>,
|
||||
>,
|
||||
caches: Vec<BTreeMap<&'extractor [u8], DelAddBbbul<'extractor, BitPacker4x>>>,
|
||||
}
|
||||
|
||||
impl<'extractor> NormalCaches<'extractor> {
|
||||
@ -266,17 +247,13 @@ impl<'extractor> NormalCaches<'extractor> {
|
||||
) {
|
||||
let hash = hasher.hash_one(key);
|
||||
let bucket = compute_bucket_from_hash(buckets, hash);
|
||||
|
||||
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
|
||||
RawEntryMut::Occupied(mut entry) => {
|
||||
entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
let cache = &mut self.caches[bucket];
|
||||
match cache.get_mut(key) {
|
||||
Some(deladd) => {
|
||||
deladd.del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
}
|
||||
RawEntryMut::Vacant(entry) => {
|
||||
entry.insert_hashed_nocheck(
|
||||
hash,
|
||||
alloc.alloc_slice_copy(key),
|
||||
DelAddBbbul::new_del_u32_in(n, alloc),
|
||||
);
|
||||
None => {
|
||||
cache.insert(alloc.alloc_slice_copy(key), DelAddBbbul::new_del_u32_in(n, alloc));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -291,30 +268,20 @@ impl<'extractor> NormalCaches<'extractor> {
|
||||
) {
|
||||
let hash = hasher.hash_one(key);
|
||||
let bucket = compute_bucket_from_hash(buckets, hash);
|
||||
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
|
||||
RawEntryMut::Occupied(mut entry) => {
|
||||
entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
let cache = &mut self.caches[bucket];
|
||||
match cache.get_mut(key) {
|
||||
Some(deladd) => {
|
||||
deladd.add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
}
|
||||
RawEntryMut::Vacant(entry) => {
|
||||
entry.insert_hashed_nocheck(
|
||||
hash,
|
||||
alloc.alloc_slice_copy(key),
|
||||
DelAddBbbul::new_add_u32_in(n, alloc),
|
||||
);
|
||||
None => {
|
||||
cache.insert(alloc.alloc_slice_copy(key), DelAddBbbul::new_add_u32_in(n, alloc));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct SpillingCaches<'extractor> {
|
||||
caches: Vec<
|
||||
HashMap<
|
||||
&'extractor [u8],
|
||||
DelAddBbbul<'extractor, BitPacker4x>,
|
||||
FxBuildHasher,
|
||||
&'extractor Bump,
|
||||
>,
|
||||
>,
|
||||
caches: Vec<BTreeMap<&'extractor [u8], DelAddBbbul<'extractor, BitPacker4x>>>,
|
||||
spilled_entries: Vec<grenad::Sorter<MergeDeladdCboRoaringBitmaps>>,
|
||||
deladd_buffer: Vec<u8>,
|
||||
cbo_buffer: Vec<u8>,
|
||||
@ -322,14 +289,7 @@ struct SpillingCaches<'extractor> {
|
||||
|
||||
impl<'extractor> SpillingCaches<'extractor> {
|
||||
fn from_cache_maps(
|
||||
caches: Vec<
|
||||
HashMap<
|
||||
&'extractor [u8],
|
||||
DelAddBbbul<'extractor, BitPacker4x>,
|
||||
FxBuildHasher,
|
||||
&'extractor Bump,
|
||||
>,
|
||||
>,
|
||||
caches: Vec<BTreeMap<&'extractor [u8], DelAddBbbul<'extractor, BitPacker4x>>>,
|
||||
) -> SpillingCaches<'extractor> {
|
||||
SpillingCaches {
|
||||
spilled_entries: iter::repeat_with(|| {
|
||||
@ -356,12 +316,12 @@ impl<'extractor> SpillingCaches<'extractor> {
|
||||
) -> Result<()> {
|
||||
let hash = hasher.hash_one(key);
|
||||
let bucket = compute_bucket_from_hash(buckets, hash);
|
||||
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
|
||||
RawEntryMut::Occupied(mut entry) => {
|
||||
entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
match self.caches[bucket].get_mut(key) {
|
||||
Some(deladd) => {
|
||||
deladd.del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
Ok(())
|
||||
}
|
||||
RawEntryMut::Vacant(_entry) => spill_entry_to_sorter(
|
||||
None => spill_entry_to_sorter(
|
||||
&mut self.spilled_entries[bucket],
|
||||
&mut self.deladd_buffer,
|
||||
&mut self.cbo_buffer,
|
||||
@ -381,12 +341,12 @@ impl<'extractor> SpillingCaches<'extractor> {
|
||||
) -> Result<()> {
|
||||
let hash = hasher.hash_one(key);
|
||||
let bucket = compute_bucket_from_hash(buckets, hash);
|
||||
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
|
||||
RawEntryMut::Occupied(mut entry) => {
|
||||
entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
match self.caches[bucket].get_mut(key) {
|
||||
Some(deladd) => {
|
||||
deladd.add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
Ok(())
|
||||
}
|
||||
RawEntryMut::Vacant(_entry) => spill_entry_to_sorter(
|
||||
None => spill_entry_to_sorter(
|
||||
&mut self.spilled_entries[bucket],
|
||||
&mut self.deladd_buffer,
|
||||
&mut self.cbo_buffer,
|
||||
@ -441,13 +401,7 @@ fn spill_entry_to_sorter(
|
||||
|
||||
pub struct FrozenCache<'a, 'extractor> {
|
||||
bucket: usize,
|
||||
cache: FrozenMap<
|
||||
'a,
|
||||
'extractor,
|
||||
&'extractor [u8],
|
||||
FrozenDelAddBbbul<'extractor, BitPacker4x>,
|
||||
FxBuildHasher,
|
||||
>,
|
||||
cache: FrozenBTreeMap<'a, &'extractor [u8], FrozenDelAddBbbul<'extractor, BitPacker4x>>,
|
||||
spilled: Vec<grenad::Reader<BufReader<File>>>,
|
||||
}
|
||||
|
||||
@ -466,6 +420,36 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>(
|
||||
Ok(bucket_caches)
|
||||
}
|
||||
|
||||
pub struct FrozenBTreeMap<'a, K, V>(&'a mut BTreeMap<K, V>);
|
||||
|
||||
unsafe impl<'a, K, V> Send for FrozenBTreeMap<'a, K, V>
|
||||
where
|
||||
K: Send,
|
||||
V: Send,
|
||||
{
|
||||
}
|
||||
|
||||
impl<'a, K, V> FrozenBTreeMap<'a, K, V> {
|
||||
#[inline]
|
||||
pub fn new(map: &'a mut BTreeMap<K, V>) -> Self {
|
||||
Self(map)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn iter_mut(&mut self) -> std::collections::btree_map::IterMut<'_, K, V> {
|
||||
self.0.iter_mut()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_mut<Q>(&mut self, key: &Q) -> Option<&mut V>
|
||||
where
|
||||
K: Borrow<Q> + Ord,
|
||||
Q: Ord + ?Sized,
|
||||
{
|
||||
self.0.get_mut(key)
|
||||
}
|
||||
}
|
||||
|
||||
/// Merges the caches that must be all associated to the same bucket
|
||||
/// but make sure to sort the different buckets before performing the merges.
|
||||
///
|
||||
@ -491,7 +475,7 @@ where
|
||||
for (source_index, source) in readers.into_iter().enumerate() {
|
||||
let mut cursor = source.into_cursor()?;
|
||||
if cursor.move_on_next()?.is_some() {
|
||||
heap.push(Entry { cursor, source_index });
|
||||
heap.push(CursorEntry { cursor, source_index });
|
||||
}
|
||||
}
|
||||
|
||||
@ -544,12 +528,11 @@ where
|
||||
|
||||
// Then manage the content on the HashMap entries that weren't taken (mem::take).
|
||||
while let Some(mut map) = maps.pop() {
|
||||
// Make sure we don't try to work with entries already managed by the spilled
|
||||
let mut ordered_entries: Vec<_> =
|
||||
map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect();
|
||||
ordered_entries.sort_unstable_by_key(|(key, _)| *key);
|
||||
for (key, bbbul) in map.iter_mut() {
|
||||
if bbbul.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (key, bbbul) in ordered_entries {
|
||||
let mut output = DelAddRoaringBitmap::empty();
|
||||
output.union_and_clear_bbbul(bbbul);
|
||||
|
||||
@ -567,29 +550,29 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct Entry<R> {
|
||||
struct CursorEntry<R> {
|
||||
cursor: ReaderCursor<R>,
|
||||
source_index: usize,
|
||||
}
|
||||
|
||||
impl<R> Ord for Entry<R> {
|
||||
fn cmp(&self, other: &Entry<R>) -> Ordering {
|
||||
impl<R> Ord for CursorEntry<R> {
|
||||
fn cmp(&self, other: &CursorEntry<R>) -> Ordering {
|
||||
let skey = self.cursor.current().map(|(k, _)| k);
|
||||
let okey = other.cursor.current().map(|(k, _)| k);
|
||||
skey.cmp(&okey).then(self.source_index.cmp(&other.source_index)).reverse()
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> Eq for Entry<R> {}
|
||||
impl<R> Eq for CursorEntry<R> {}
|
||||
|
||||
impl<R> PartialEq for Entry<R> {
|
||||
fn eq(&self, other: &Entry<R>) -> bool {
|
||||
impl<R> PartialEq for CursorEntry<R> {
|
||||
fn eq(&self, other: &CursorEntry<R>) -> bool {
|
||||
self.cmp(other) == Ordering::Equal
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> PartialOrd for Entry<R> {
|
||||
fn partial_cmp(&self, other: &Entry<R>) -> Option<Ordering> {
|
||||
impl<R> PartialOrd for CursorEntry<R> {
|
||||
fn partial_cmp(&self, other: &CursorEntry<R>) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
@ -97,15 +97,6 @@ impl FacetedDocidsExtractor {
|
||||
},
|
||||
),
|
||||
DocumentChange::Update(inner) => {
|
||||
if !inner.has_changed_for_fields(
|
||||
Some(attributes_to_extract),
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
)? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
extract_document_facets(
|
||||
attributes_to_extract,
|
||||
inner.current(rtxn, index, context.db_fields_ids_map)?,
|
||||
|
@ -351,15 +351,6 @@ impl WordDocidsExtractors {
|
||||
)?;
|
||||
}
|
||||
DocumentChange::Update(inner) => {
|
||||
if !inner.has_changed_for_fields(
|
||||
document_tokenizer.attribute_to_extract,
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
)? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut token_fn = |fname: &str, fid, pos, word: &str| {
|
||||
cached_sorter.insert_del_u32(
|
||||
fid,
|
||||
|
@ -70,15 +70,6 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
|
||||
)?;
|
||||
}
|
||||
DocumentChange::Update(inner) => {
|
||||
if !inner.has_changed_for_fields(
|
||||
document_tokenizer.attribute_to_extract,
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
)? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let document = inner.current(rtxn, index, context.db_fields_ids_map)?;
|
||||
process_document_tokens(
|
||||
document,
|
||||
|
Reference in New Issue
Block a user