From 64b570abe271962a54d9641cb2e7a0ebbe77c589 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Mon, 19 May 2025 15:04:35 +0200 Subject: [PATCH] Add a settings change extractor --- .../milli/src/update/new/document_change.rs | 30 ++++ .../update/new/indexer/document_changes.rs | 2 +- crates/milli/src/update/new/indexer/mod.rs | 1 + .../update/new/indexer/settings_changes.rs | 139 ++++++++++++++++++ 4 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 crates/milli/src/update/new/indexer/settings_changes.rs diff --git a/crates/milli/src/update/new/document_change.rs b/crates/milli/src/update/new/document_change.rs index 8a8ac4bb3..1eb2ddb44 100644 --- a/crates/milli/src/update/new/document_change.rs +++ b/crates/milli/src/update/new/document_change.rs @@ -37,6 +37,11 @@ pub struct Insertion<'doc> { new: Versions<'doc>, } +pub struct SettingsChangeDocument<'doc> { + docid: DocumentId, + external_document_id: &'doc str, +} + impl<'doc> DocumentChange<'doc> { pub fn docid(&self) -> DocumentId { match &self { @@ -304,3 +309,28 @@ impl<'doc> Update<'doc> { } } } + +impl<'doc> SettingsChangeDocument<'doc> { + pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self { + Self { docid, external_document_id } + } + + pub fn docid(&self) -> DocumentId { + self.docid + } + + pub fn external_document_id(&self) -> &'doc str { + self.external_document_id + } + + pub fn current<'a, Mapper: FieldIdMapper>( + &self, + rtxn: &'a RoTxn, + index: &'a Index, + mapper: &'a Mapper, + ) -> Result> { + Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) + } +} diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index 5302c9d05..ca5bc8dc5 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -43,7 +43,7 @@ pub struct DocumentChangeContext< pub extractor_alloc: &'extractor Bump, /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents - doc_allocs: &'doc ThreadLocal>>, + pub doc_allocs: &'doc ThreadLocal>>, /// Extractor-specific data pub data: &'doc T, diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index a9ac130a5..a8348f960 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -32,6 +32,7 @@ mod extract; mod guess_primary_key; mod partial_dump; mod post_processing; +mod settings_changes; mod update_by_function; mod write; diff --git a/crates/milli/src/update/new/indexer/settings_changes.rs b/crates/milli/src/update/new/indexer/settings_changes.rs new file mode 100644 index 000000000..ac349527d --- /dev/null +++ b/crates/milli/src/update/new/indexer/settings_changes.rs @@ -0,0 +1,139 @@ +use std::cell::{Cell, RefCell}; +use std::sync::atomic::Ordering; +use std::sync::{Arc, RwLock}; + +use bumpalo::Bump; +use heed::{RoTxn, WithoutTls}; +use rayon::iter::IndexedParallelIterator; + +use super::super::document_change::DocumentChange; +use super::document_changes::IndexingContext; +use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; +use crate::progress::{AtomicDocumentStep, Progress}; +use crate::update::new::document_change::SettingsChangeDocument; +use crate::update::new::indexer::document_changes::DocumentChangeContext; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; +use crate::update::new::steps::IndexingStep; +use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; +use crate::update::GrenadParameters; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; + +/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s +pub trait SettingsChangeExtractor<'extractor>: Sync { + type Data: MostlySend; + + fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result; + + fn process<'doc>( + &'doc self, + changes: impl Iterator>>, + context: &'doc DocumentChangeContext, + ) -> Result<()>; +} + +pub trait SettingsChangeDocuments<'pl // lifetime of the underlying payload +>: Sync { + type Item: Send; + + fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator>; + + fn len(&self) -> usize; + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn item_to_settings_change_document<'doc, // lifetime of a single `process` call + T: MostlySend>( + &'doc self, + context: &'doc DocumentChangeContext, + item: &'doc Self::Item, + ) -> Result>> where 'pl: 'doc // the payload must survive the process calls + ; +} + +const CHUNK_SIZE: usize = 100; + +pub fn settings_change_extract< + 'pl, // covariant lifetime of the underlying payload + 'extractor, // invariant lifetime of extractor_alloc + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing + 'data, // invariant on EX::Data lifetime of datastore + 'index, // covariant lifetime of the index + EX, + SCD: SettingsChangeDocuments<'pl>, + MSP, +>( + document_changes: &SCD, + extractor: &EX, + IndexingContext { + index, + db_fields_ids_map, + new_fields_ids_map, + doc_allocs, + fields_ids_map_store, + must_stop_processing, + progress, + grenad_parameters: _, + }: IndexingContext<'fid, 'indexer, 'index, MSP>, + extractor_allocs: &'extractor mut ThreadLocal>, + datastore: &'data ThreadLocal, + step: IndexingStep, +) -> Result<()> +where + EX: SettingsChangeExtractor<'extractor>, + MSP: Fn() -> bool + Sync, +{ + tracing::trace!("We are resetting the extractor allocators"); + progress.update_progress(step); + // Clean up and reuse the extractor allocs + for extractor_alloc in extractor_allocs.iter_mut() { + tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes()); + extractor_alloc.0.reset(); + } + + let total_documents = document_changes.len() as u32; + let (step, progress_step) = AtomicDocumentStep::new(total_documents); + progress.update_progress(progress_step); + + let pi = document_changes.iter(CHUNK_SIZE); + pi.try_arc_for_each_try_init( + || { + DocumentChangeContext::new( + index, + db_fields_ids_map, + new_fields_ids_map, + extractor_allocs, + doc_allocs, + datastore, + fields_ids_map_store, + move |index_alloc| extractor.init_data(index_alloc), + ) + }, + |context, items| { + if (must_stop_processing)() { + return Err(Arc::new(InternalError::AbortedIndexation.into())); + } + + // Clean up and reuse the document-specific allocator + context.doc_alloc.reset(); + + let items = items.as_ref(); + let changes = items.iter().filter_map(|item| { + document_changes.item_to_settings_change_document(context, item).transpose() + }); + + let res = extractor.process(changes, context).map_err(Arc::new); + step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed); + + // send back the doc_alloc in the pool + context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); + + res + }, + )?; + step.store(total_documents, Ordering::Relaxed); + + Ok(()) +}