Introduce the extractor for the settings delta

This commit is contained in:
Kerollmops
2025-11-12 17:57:39 +01:00
parent 5a3f1f7c8a
commit 752cf43076
5 changed files with 234 additions and 7 deletions

View File

@@ -14,10 +14,14 @@ use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::indexer::document_changes::{
extract, DocumentChanges, Extractor, IndexingContext,
};
use crate::update::new::indexer::settings_changes::{
settings_change_extract, DocumentsIndentifiers, SettingsChangeExtractor,
};
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::update::new::DocumentChange;
use crate::update::new::{DocumentChange, DocumentIdentifiers};
use crate::update::settings::SettingsDelta;
use crate::{bucketed_position, DocumentId, FieldId, Result, MAX_POSITION_PER_ATTRIBUTE};
const MAX_COUNTED_WORDS: usize = 30;
@@ -411,3 +415,125 @@ impl WordDocidsExtractors {
cached_sorter.flush_fid_word_count(&mut buffer)
}
}
pub struct WordDocidsSettingsExtractorsData<'a, SD> {
tokenizer: DocumentTokenizer<'a>,
max_memory_by_thread: Option<usize>,
buckets: usize,
settings_delta: &'a SD,
}
impl<'extractor, SD: SettingsDelta + Sync> SettingsChangeExtractor<'extractor>
for WordDocidsSettingsExtractorsData<'_, SD>
{
type Data = RefCell<Option<WordDocidsBalancedCaches<'extractor>>>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> {
Ok(RefCell::new(Some(WordDocidsBalancedCaches::new_in(
self.buckets,
self.max_memory_by_thread,
extractor_alloc,
))))
}
fn process<'doc>(
&'doc self,
documents: impl Iterator<Item = crate::Result<DocumentIdentifiers<'doc>>>,
context: &'doc DocumentContext<Self::Data>,
) -> crate::Result<()> {
for document in documents {
let document = document?;
SettingsChangeWordDocidsExtractors::extract_settings_change(
document,
context,
&self.tokenizer,
self.settings_delta,
)?;
}
Ok(())
}
}
pub struct SettingsChangeWordDocidsExtractors;
impl SettingsChangeWordDocidsExtractors {
pub fn run_extraction<'fid, 'indexer, 'index, 'extractor, SD, MSP>(
settings_delta: &SD,
documents: &'indexer DocumentsIndentifiers<'indexer>,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
step: IndexingStep,
) -> Result<WordDocidsCaches<'extractor>>
where
SD: SettingsDelta + Sync,
MSP: Fn() -> bool + Sync,
{
// Warning: this is duplicated code from extract_word_pair_proximity_docids.rs
let rtxn = indexing_context.index.read_txn()?;
let stop_words = indexing_context.index.stop_words(&rtxn)?;
let allowed_separators = indexing_context.index.allowed_separators(&rtxn)?;
let allowed_separators: Option<Vec<_>> =
allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect());
let dictionary = indexing_context.index.dictionary(&rtxn)?;
let dictionary: Option<Vec<_>> =
dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect());
let mut builder = tokenizer_builder(
stop_words.as_ref(),
allowed_separators.as_deref(),
dictionary.as_deref(),
);
let tokenizer = builder.build();
let localized_attributes_rules =
indexing_context.index.localized_attributes_rules(&rtxn)?.unwrap_or_default();
let document_tokenizer = DocumentTokenizer {
tokenizer: &tokenizer,
localized_attributes_rules: &localized_attributes_rules,
max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE,
};
let extractor_data = WordDocidsSettingsExtractorsData {
tokenizer: document_tokenizer,
max_memory_by_thread: indexing_context.grenad_parameters.max_memory_by_thread(),
buckets: rayon::current_num_threads(),
settings_delta,
};
let datastore = ThreadLocal::new();
{
let span = tracing::debug_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
settings_change_extract(
documents,
&extractor_data,
indexing_context,
extractor_allocs,
&datastore,
step,
)?;
}
let mut merger = WordDocidsCaches::new();
for cache in datastore.into_iter().flat_map(RefCell::into_inner) {
merger.push(cache)?;
}
Ok(merger)
}
// TODO find a better name (extract_document_change?)
// and document this method.
fn extract_settings_change<SD: SettingsDelta>(
document: DocumentIdentifiers<'_>,
context: &DocumentContext<RefCell<Option<WordDocidsBalancedCaches>>>,
document_tokenizer: &DocumentTokenizer,
settings_delta: &SD,
) -> Result<()> {
// TODO extract words based on the settings delta here
// Note: In insert_del_u32 we should touch the word_fid_docids and
// the fid_word_count_docids if the current field has been added
// or deleted from the list (we can add a boolean to help).
dbg!(document.external_document_id());
Ok(())
}
}

View File

@@ -2,6 +2,7 @@ mod extract_word_docids;
mod extract_word_pair_proximity_docids;
mod tokenize_document;
pub use extract_word_docids::SettingsChangeWordDocidsExtractors;
pub use extract_word_docids::{WordDocidsCaches, WordDocidsExtractors};
pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;

View File

@@ -372,11 +372,10 @@ where
SD: SettingsDelta + Sync,
{
// Create the list of document ids to extract
let rtxn = indexing_context.index.read_txn()?;
let all_document_ids =
indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::<Vec<_>>();
let primary_key =
primary_key_from_db(indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?;
let index = indexing_context.index;
let rtxn = index.read_txn()?;
let all_document_ids = index.documents_ids(&rtxn)?.into_iter().collect::<Vec<_>>();
let primary_key = primary_key_from_db(index, &rtxn, &indexing_context.db_fields_ids_map)?;
let documents = DocumentsIndentifiers::new(&all_document_ids, primary_key);
let span =
@@ -391,6 +390,102 @@ where
extractor_allocs,
)?;
'word_docids: {
let WordDocidsCaches {
word_docids,
word_fid_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
SettingsChangeWordDocidsExtractors::run_extraction(
settings_delta,
&documents,
indexing_context,
extractor_allocs,
IndexingStep::ExtractingWords,
)?
};
indexing_context.progress.update_progress(IndexingStep::MergingWordCaches);
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordDocids);
merge_and_send_docids(
word_docids,
index.word_docids.remap_types(),
index,
extractor_sender.docids::<WordDocids>(),
&indexing_context.must_stop_processing,
)?;
}
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordFieldIdDocids);
merge_and_send_docids(
word_fid_docids,
index.word_fid_docids.remap_types(),
index,
extractor_sender.docids::<WordFidDocids>(),
&indexing_context.must_stop_processing,
)?;
}
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::ExactWordDocids);
merge_and_send_docids(
exact_word_docids,
index.exact_word_docids.remap_types(),
index,
extractor_sender.docids::<ExactWordDocids>(),
&indexing_context.must_stop_processing,
)?;
}
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::WordPositionDocids);
merge_and_send_docids(
word_position_docids,
index.word_position_docids.remap_types(),
index,
extractor_sender.docids::<WordPositionDocids>(),
&indexing_context.must_stop_processing,
)?;
}
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter();
indexing_context.progress.update_progress(MergingWordCache::FieldIdWordCountDocids);
merge_and_send_docids(
fid_word_count_docids,
index.field_id_word_count_docids.remap_types(),
index,
extractor_sender.docids::<FidWordCountDocids>(),
&indexing_context.must_stop_processing,
)?;
}
};
'vectors: {
if settings_delta.embedder_actions().is_empty() {
break 'vectors;

View File

@@ -11,6 +11,7 @@ use hashbrown::HashMap;
use heed::{RoTxn, RwTxn};
pub use partial_dump::PartialDump;
pub use post_processing::recompute_word_fst_from_word_docids_database;
pub use settings_changes::settings_change_extract;
pub use update_by_function::UpdateByFunction;
pub use write::ChannelCongestion;
use write::{build_vectors, update_index, write_to_db};

View File

@@ -1631,8 +1631,12 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
// Update index settings
let embedding_config_updates = self.update_embedding_configs()?;
self.update_user_defined_searchable_attributes()?;
let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
let mut new_inner_settings =
InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
// TODO maybe not needed?
new_inner_settings.recompute_searchables(self.wtxn, self.index)?;
let primary_key_id = self
.index