This commit is contained in:
ManyTheFish 2025-05-21 10:43:27 +02:00
parent fe22dbd0e3
commit 202f1b0c1e
15 changed files with 684 additions and 184 deletions

View File

@ -468,14 +468,14 @@ impl IndexScheduler {
}
progress.update_progress(SettingsProgress::ApplyTheSettings);
builder
let congestion = builder
.execute(
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
Ok((tasks, None))
Ok((tasks, congestion))
}
IndexOperation::DocumentClearAndSetting {
index_uid,

View File

@ -353,9 +353,11 @@ where
}
}
} else {
if embedder_actions.contains_key(name) {
match embedder_actions.get(name) {
Some(action) if action.write_back().is_none() => {
continue;
}
_ => {
vectors.insert(
name,
if entry.implicit {
@ -370,6 +372,8 @@ where
);
}
}
}
}
if vectors.is_empty() {
break 'inject_vectors;

View File

@ -14,16 +14,11 @@ use crate::vector::EmbeddingConfigs;
use crate::{DocumentId, Index, InternalError, Result};
pub enum DocumentChange<'doc> {
Deletion(Deletion<'doc>),
Deletion(DatabaseDocument<'doc>),
Update(Update<'doc>),
Insertion(Insertion<'doc>),
}
pub struct Deletion<'doc> {
docid: DocumentId,
external_document_id: &'doc str,
}
pub struct Update<'doc> {
docid: DocumentId,
external_document_id: &'doc str,
@ -37,7 +32,7 @@ pub struct Insertion<'doc> {
new: Versions<'doc>,
}
pub struct SettingsChangeDocument<'doc> {
pub struct DatabaseDocument<'doc> {
docid: DocumentId,
external_document_id: &'doc str,
}
@ -60,31 +55,6 @@ impl<'doc> DocumentChange<'doc> {
}
}
impl<'doc> Deletion<'doc> {
pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self {
Self { docid, external_document_id }
}
pub fn docid(&self) -> DocumentId {
self.docid
}
pub fn external_document_id(&self) -> &'doc str {
self.external_document_id
}
pub fn current<'a, Mapper: FieldIdMapper>(
&self,
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
) -> Result<DocumentFromDb<'a, Mapper>> {
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
}
impl<'doc> Insertion<'doc> {
pub fn create(docid: DocumentId, external_document_id: &'doc str, new: Versions<'doc>) -> Self {
Insertion { docid, external_document_id, new }
@ -310,7 +280,7 @@ impl<'doc> Update<'doc> {
}
}
impl<'doc> SettingsChangeDocument<'doc> {
impl<'doc> DatabaseDocument<'doc> {
pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self {
Self { docid, external_document_id }
}
@ -319,7 +289,7 @@ impl<'doc> SettingsChangeDocument<'doc> {
self.docid
}
pub fn external_docid(&self) -> &'doc str {
pub fn external_document_id(&self) -> &'doc str {
self.external_document_id
}

View File

@ -8,12 +8,12 @@ use super::DelAddRoaringBitmap;
use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender};
use crate::update::new::document::{write_to_obkv, Document as _};
use crate::update::new::document_change::SettingsChangeDocument;
use crate::update::new::document_change::DatabaseDocument;
use crate::update::new::indexer::document_changes::{
DocumentChangeContext, Extractor, IndexingContext,
};
use crate::update::new::indexer::settings_changes::{
settings_change_extract, SettingsChangeDocuments, SettingsChangeExtractor,
settings_change_extract, DatabaseDocuments, SettingsChangeExtractor,
};
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::{FullySend, ThreadLocal};
@ -195,7 +195,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE
fn process<'doc>(
&self,
documents: impl Iterator<Item = Result<SettingsChangeDocument<'doc>>>,
documents: impl Iterator<Item = Result<DatabaseDocument<'doc>>>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
@ -206,7 +206,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let external_docid = document.external_docid().to_owned();
let external_docid = document.external_document_id().to_owned();
let content =
document.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
let vector_content = document.current_vectors(
@ -238,25 +238,29 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE
/// modifies them by adding or removing vector fields based on embedder actions,
/// and then updates the database.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")]
pub fn update_database_documents<'pl, 'extractor, SCD, MSP, SD>(
document_changes: &SCD,
pub fn update_database_documents<'indexer, 'extractor, MSP, SD>(
documents: &'indexer DatabaseDocuments<'indexer>,
indexing_context: IndexingContext<MSP>,
extractor_sender: ExtractorBbqueueSender,
extractor_sender: &ExtractorBbqueueSender,
settings_delta: &SD,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta,
SCD: SettingsChangeDocuments<'pl>,
{
// skip if no embedder_actions
if settings_delta.embedder_actions().is_empty() {
return Ok(());
}
let document_sender = extractor_sender.documents();
let document_extractor =
SettingsChangeDocumentExtractor::new(document_sender, settings_delta.embedder_actions());
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
settings_change_extract(
document_changes,
documents,
&document_extractor,
indexing_context,
extractor_allocs,

View File

@ -12,7 +12,7 @@ pub use documents::*;
pub use faceted::*;
pub use geo::*;
pub use searchable::*;
pub use vectors::EmbeddingExtractor;
pub use vectors::{EmbeddingExtractor, SettingsChangeEmbeddingExtractor};
/// TODO move in permissive json pointer
pub mod perm_json_p {

View File

@ -1,4 +1,5 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use bumpalo::collections::Vec as BVec;
use bumpalo::Bump;
@ -8,13 +9,16 @@ use super::cache::DelAddRoaringBitmap;
use crate::error::FaultSource;
use crate::prompt::Prompt;
use crate::update::new::channel::EmbeddingSender;
use crate::update::new::document_change::DatabaseDocument;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
use crate::update::new::indexer::settings_changes::SettingsChangeExtractor;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::vector_document::VectorDocument;
use crate::update::new::DocumentChange;
use crate::vector::error::{
EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistributionBump,
};
use crate::vector::settings::{EmbedderAction, ReindexAction};
use crate::vector::{Embedder, Embedding, EmbeddingConfigs};
use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError};
@ -290,6 +294,203 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
}
}
pub struct SettingsChangeEmbeddingExtractor<'a, 'b> {
embedders: &'a EmbeddingConfigs,
old_embedders: &'a EmbeddingConfigs,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort,
}
impl<'a, 'b> SettingsChangeEmbeddingExtractor<'a, 'b> {
pub fn new(
embedders: &'a EmbeddingConfigs,
old_embedders: &'a EmbeddingConfigs,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
sender: EmbeddingSender<'a, 'b>,
field_distribution: &'a FieldDistribution,
threads: &'a ThreadPoolNoAbort,
) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self {
embedders,
old_embedders,
embedder_actions,
sender,
threads,
possible_embedding_mistakes,
}
}
}
impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbeddingExtractor<'_, '_> {
type Data = RefCell<EmbeddingExtractorData<'extractor>>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> {
Ok(RefCell::new(EmbeddingExtractorData(HashMap::new_in(extractor_alloc))))
}
fn process<'doc>(
&'doc self,
documents: impl Iterator<Item = crate::Result<DatabaseDocument<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>,
) -> crate::Result<()> {
let embedders = self.embedders.inner_as_ref();
let old_embedders = self.old_embedders.inner_as_ref();
let unused_vectors_distribution = UnusedVectorsDistributionBump::new_in(&context.doc_alloc);
let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc);
for (embedder_name, (embedder, prompt, _is_quantized)) in embedders {
// if the embedder is not in the embedder_actions, we don't need to reindex.
if let Some(reindex_action) =
self.embedder_actions.get(embedder_name).and_then(|action| action.reindex())
{
let embedder_id = context
.index
.embedder_category_id
.get(&context.rtxn, embedder_name)?
.ok_or_else(|| InternalError::DatabaseMissingEntry {
db_name: "embedder_category_id",
key: None,
})?;
all_chunks.push((
Chunks::new(
embedder,
embedder_id,
embedder_name,
prompt,
context.data,
&self.possible_embedding_mistakes,
self.threads,
self.sender,
&context.doc_alloc,
),
reindex_action,
))
}
}
for document in documents {
let document = document?;
let current_vectors = document.current_vectors(
&context.rtxn,
context.index,
context.db_fields_ids_map,
&context.doc_alloc,
)?;
for (chunks, reindex_action) in &mut all_chunks {
let embedder_name = chunks.embedder_name();
let current_vectors = current_vectors.vectors_for_key(embedder_name)?;
// if the vectors for this document have been already provided, we don't need to reindex.
let (is_new_embedder, must_regenerate) =
current_vectors.as_ref().map_or((true, true), |vectors| {
(!vectors.has_configured_embedder, vectors.regenerate)
});
match reindex_action {
ReindexAction::RegeneratePrompts => {
if !must_regenerate {
continue;
}
// we need to regenerate the prompts for the document
// Get the old prompt and render the document with it
let Some((_, old_prompt, _)) = old_embedders.get(embedder_name) else {
unreachable!("ReindexAction::RegeneratePrompts implies that the embedder {embedder_name} is in the old_embedders")
};
let old_rendered = old_prompt.render_document(
document.external_document_id(),
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
// Get the new prompt and render the document with it
let new_prompt = chunks.prompt();
let new_rendered = new_prompt.render_document(
document.external_document_id(),
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
// Compare the rendered documents
// if they are different, regenerate the vectors
if new_rendered != old_rendered {
chunks.set_autogenerated(
document.docid(),
document.external_document_id(),
new_rendered,
&unused_vectors_distribution,
)?;
}
}
ReindexAction::FullReindex => {
let prompt = chunks.prompt();
// if no inserted vectors, then regenerate: true + no embeddings => autogenerate
if must_regenerate {
let rendered = prompt.render_document(
document.external_document_id(),
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
chunks.set_autogenerated(
document.docid(),
document.external_document_id(),
rendered,
&unused_vectors_distribution,
)?;
} else if is_new_embedder {
if let Some(embeddings) =
current_vectors.and_then(|vectors| vectors.embeddings)
{
chunks.set_regenerate(document.docid(), false);
chunks.set_vectors(
document.external_document_id(),
document.docid(),
embeddings
.into_vec(&context.doc_alloc, embedder_name)
.map_err(|error| UserError::InvalidVectorsEmbedderConf {
document_id: document
.external_document_id()
.to_string(),
error: error.to_string(),
})?,
)?;
}
}
}
}
}
}
for (chunk, _) in all_chunks {
chunk.drain(&unused_vectors_distribution)?;
}
Ok(())
}
}
// **Warning**: the destructor of this struct is not normally run, make sure that all its fields:
// 1. don't have side effects tied to they destructors
// 2. if allocated, are allocated inside of the bumpalo

View File

@ -7,7 +7,7 @@ use roaring::RoaringBitmap;
use super::document_changes::{DocumentChangeContext, DocumentChanges};
use crate::documents::PrimaryKey;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, DocumentChange};
use crate::update::new::{DatabaseDocument, DocumentChange};
use crate::{DocumentId, Result};
#[derive(Default)]
@ -74,7 +74,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id))))
Ok(Some(DocumentChange::Deletion(DatabaseDocument::create(*docid, external_document_id))))
}
fn len(&self) -> usize {

View File

@ -19,7 +19,7 @@ use crate::progress::{AtomicPayloadStep, Progress};
use crate::update::new::document::Versions;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, Insertion, Update};
use crate::update::new::{DatabaseDocument, Insertion, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
@ -577,7 +577,7 @@ impl<'pl> PayloadOperations<'pl> {
if self.is_new {
Ok(None)
} else {
let deletion = Deletion::create(self.docid, external_doc);
let deletion = DatabaseDocument::create(self.docid, external_doc);
Ok(Some(DocumentChange::Deletion(deletion)))
}
}

View File

@ -12,15 +12,20 @@ use super::super::steps::IndexingStep;
use super::super::thread_local::{FullySend, ThreadLocal};
use super::super::FacetFieldIdsDelta;
use super::document_changes::{extract, DocumentChanges, IndexingContext};
use super::settings_changes::SettingsChangeDocuments;
use super::settings_changes::settings_change_extract;
use crate::documents::FieldIdMapper;
use crate::documents::PrimaryKey;
use crate::index::IndexEmbeddingConfig;
use crate::progress::MergingWordCache;
use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::indexer::settings_changes::DatabaseDocuments;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::SettingsDelta;
use crate::vector::EmbeddingConfigs;
use crate::Index;
use crate::InternalError;
use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
#[allow(clippy::too_many_arguments)]
@ -314,8 +319,7 @@ where
Result::Ok((facet_field_ids_delta, index_embeddings))
}
pub(super) fn extract_all_settings_changes<'pl, 'extractor, SCD, MSP, SD>(
document_changes: &SCD,
pub(super) fn extract_all_settings_changes<'extractor, MSP, SD>(
indexing_context: IndexingContext<MSP>,
indexer_span: Span,
extractor_sender: ExtractorBbqueueSender,
@ -324,31 +328,76 @@ pub(super) fn extract_all_settings_changes<'pl, 'extractor, SCD, MSP, SD>(
finished_extraction: &AtomicBool,
field_distribution: &mut BTreeMap<String, u64>,
mut index_embeddings: Vec<IndexEmbeddingConfig>,
document_ids: &mut RoaringBitmap,
modified_docids: &mut RoaringBitmap,
) -> Result<Vec<IndexEmbeddingConfig>>
where
SCD: SettingsChangeDocuments<'pl>,
MSP: Fn() -> bool + Sync,
SD: SettingsDelta,
{
// Create the list of document ids to extract
let rtxn = indexing_context.index.read_txn()?;
let all_document_ids =
indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::<Vec<_>>();
let primary_key =
primary_key_from_db(&indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?;
let documents = DatabaseDocuments::new(&all_document_ids, primary_key);
let span =
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let _entered = span.enter();
update_database_documents(
document_changes,
&documents,
indexing_context,
extractor_sender,
&extractor_sender,
settings_delta,
extractor_allocs,
)?;
'vectors: {
// TODO: extract embeddings for settings changes
// extract embeddings from new embedders
// remove embeddings for embedders that are no longer in the settings
todo!()
if settings_delta.embedder_actions().is_empty() {
break 'vectors;
}
let embedding_sender = extractor_sender.embeddings();
// extract the remaining embedders
let extractor = SettingsChangeEmbeddingExtractor::new(
settings_delta.new_embedders(),
settings_delta.old_embedders(),
settings_delta.embedder_actions(),
embedding_sender,
field_distribution,
request_threads(),
);
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{
let span = tracing::debug_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
settings_change_extract(
&documents,
&extractor,
indexing_context,
extractor_allocs,
&datastore,
IndexingStep::ExtractingEmbeddings,
)?;
}
{
let span = tracing::debug_span!(target: "indexing::documents::merge", "vectors");
let _entered = span.enter();
for config in &mut index_embeddings {
'data: for data in datastore.iter_mut() {
let data = &mut data.get_mut().0;
let Some(deladd) = data.remove(&config.name) else {
continue 'data;
};
deladd.apply_to(&mut config.user_provided, modified_docids);
}
}
}
}
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
@ -357,6 +406,24 @@ where
Result::Ok(index_embeddings)
}
fn primary_key_from_db<'indexer, 'index>(
index: &'indexer Index,
rtxn: &'indexer heed::RoTxn<'index>,
fields: &'indexer impl FieldIdMapper,
) -> Result<PrimaryKey<'indexer>> {
let Some(primary_key) = index.primary_key(rtxn)? else {
return Err(InternalError::DatabaseMissingEntry {
db_name: crate::index::db_name::MAIN,
key: Some(crate::index::main_key::PRIMARY_KEY_KEY),
}
.into());
};
let Some(primary_key) = PrimaryKey::new(primary_key, fields) else {
unreachable!("Primary key must exist at this point");
};
Ok(primary_key)
}
fn request_threads() -> &'static ThreadPoolNoAbort {
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();

View File

@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::sync::atomic::AtomicBool;
use std::sync::{Once, RwLock};
use std::thread::{self, Builder};
@ -20,8 +21,10 @@ use super::thread_local::ThreadLocal;
use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::progress::Progress;
use crate::update::settings::SettingsDelta;
use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
pub(crate) mod de;
@ -42,7 +45,7 @@ static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
#[allow(clippy::too_many_arguments)] // clippy: 😝
pub fn index<'pl, 'indexer, 'index, DC, MSP>(
pub(crate) fn index<'pl, 'indexer, 'index, DC, MSP>(
wtxn: &mut RwTxn,
index: &'index Index,
pool: &ThreadPoolNoAbort,
@ -64,48 +67,8 @@ where
let arroy_memory = grenad_parameters.max_memory;
// We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch
// is because we still use the old indexer for the settings and it is highly impacted by the
// max memory. So we keep the changes here and will remove these changes once we use the new
// indexer to also index settings. Related to #5125 and #5141.
let grenad_parameters = GrenadParameters {
max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100),
..grenad_parameters
};
// 5% percent of the allocated memory for the extractors, or min 100MiB
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
//
// Minimum capacity for bbqueues
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(
GrenadParameters {
max_memory: Some(minimum_total_extractors_capacity),
..grenad_parameters
},
minimum_total_bbbuffer_capacity,
), // 100 MiB by thread by default
|max_memory| {
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
let new_grenad_parameters = GrenadParameters {
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)
},
);
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
let (grenad_parameters, total_bbbuffer_capacity) =
indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
@ -179,10 +142,7 @@ where
let dimensions = embedder.dimensions();
let writer = ArroyWrapper::new(vector_arroy, embedder_index, *was_quantized);
Ok((
embedder_index,
(embedder_name.as_str(), embedder.as_ref(), writer, dimensions),
))
Ok((embedder_index, (embedder.as_ref(), writer, dimensions)))
})
.collect();
@ -238,3 +198,234 @@ where
Ok(congestion)
}
#[allow(clippy::too_many_arguments)] // clippy: 😝
pub fn reindex<'pl, 'indexer, 'index, MSP, SD>(
wtxn: &mut RwTxn<'index>,
index: &'index Index,
pool: &ThreadPoolNoAbort,
grenad_parameters: GrenadParameters,
settings_delta: &'indexer SD,
must_stop_processing: &'indexer MSP,
progress: &'indexer Progress,
) -> Result<ChannelCongestion>
where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta + Sync,
{
delete_old_embedders(wtxn, index, settings_delta)?;
let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false);
let arroy_memory = grenad_parameters.max_memory;
let (grenad_parameters, total_bbbuffer_capacity) =
indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();
let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
let db_fields_ids_map = index.fields_ids_map(wtxn)?;
let new_fields_ids_map = settings_delta.new_fields_ids_map().clone();
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
let indexing_context = IndexingContext {
index,
db_fields_ids_map: &db_fields_ids_map,
new_fields_ids_map: &new_fields_ids_map,
doc_allocs: &doc_allocs,
fields_ids_map_store: &fields_ids_map_store,
must_stop_processing,
progress,
grenad_parameters: &grenad_parameters,
};
let index_embeddings = index.embedding_configs(wtxn)?;
let mut field_distribution = index.field_distribution(wtxn)?;
let mut modified_docids = roaring::RoaringBitmap::new();
let congestion = thread::scope(|s| -> Result<ChannelCongestion> {
let indexer_span = tracing::Span::current();
let finished_extraction = &finished_extraction;
// prevent moving the field_distribution and document_ids in the inner closure...
let field_distribution = &mut field_distribution;
let modified_docids = &mut modified_docids;
let extractor_handle =
Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.install(move || {
extract::extract_all_settings_changes(
indexing_context,
indexer_span,
extractor_sender,
settings_delta,
&mut extractor_allocs,
finished_extraction,
field_distribution,
index_embeddings,
modified_docids,
)
})
.unwrap()
})?;
let new_embedders = settings_delta.new_embedders();
let embedder_actions = settings_delta.embedder_actions();
let mut arroy_writers =
arroy_writers_from_embedder_actions(wtxn, index, &embedder_actions, &new_embedders)?;
let congestion =
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
let index_embeddings = extractor_handle.join().unwrap()?;
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
pool.install(|| {
build_vectors(
index,
wtxn,
indexing_context.progress,
index_embeddings,
arroy_memory,
&mut arroy_writers,
&indexing_context.must_stop_processing,
)
})
.unwrap()?;
indexing_context.progress.update_progress(IndexingStep::Finalizing);
Ok(congestion) as Result<_>
})?;
// required to into_inner the new_fields_ids_map
drop(fields_ids_map_store);
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
let document_ids = index.documents_ids(wtxn)?;
update_index(
index,
wtxn,
new_fields_ids_map,
None,
settings_delta.new_embedders().clone(),
field_distribution,
document_ids,
)?;
Ok(congestion)
}
fn arroy_writers_from_embedder_actions<'indexer, 'index>(
wtxn: &mut RwTxn<'index>,
index: &'index Index,
embedder_actions: &'indexer BTreeMap<String, EmbedderAction>,
embedders: &'indexer EmbeddingConfigs,
) -> Result<HashMap<u8, (&'indexer Embedder, ArroyWrapper, usize)>> {
let vector_arroy = index.vector_arroy;
embedders
.inner_as_ref()
.iter()
.filter_map(|(embedder_name, (embedder, _, _))| match embedder_actions.get(embedder_name) {
None => None,
Some(action) if action.write_back().is_some() => None,
Some(action) => {
let embedder_index = match index.embedder_category_id.get(wtxn, embedder_name) {
Ok(Some(embedder_index)) => embedder_index,
Ok(None) => {
return Some(Err(InternalError::DatabaseMissingEntry {
db_name: "embedder_category_id",
key: None,
}
.into()))
}
Err(e) => return Some(Err(e.into())),
};
let writer =
ArroyWrapper::new(vector_arroy, embedder_index, action.is_being_quantized);
let dimensions = embedder.dimensions();
Some(Ok((embedder_index, (embedder.as_ref(), writer, dimensions))))
}
})
.collect()
}
fn delete_old_embedders<'indexer, 'index, SD>(
wtxn: &mut RwTxn<'_>,
index: &'index Index,
settings_delta: &'indexer SD,
) -> Result<()>
where
SD: SettingsDelta,
{
for (_name, action) in settings_delta.embedder_actions() {
if let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() {
let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized);
let dimensions = reader.dimensions(wtxn)?;
reader.clear(wtxn, dimensions)?;
}
}
Ok(())
}
fn indexer_memory_settings(
current_num_threads: usize,
grenad_parameters: GrenadParameters,
) -> (GrenadParameters, usize) {
// We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch
// is because we still use the old indexer for the settings and it is highly impacted by the
// max memory. So we keep the changes here and will remove these changes once we use the new
// indexer to also index settings. Related to #5125 and #5141.
let grenad_parameters = GrenadParameters {
max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100),
..grenad_parameters
};
// 5% percent of the allocated memory for the extractors, or min 100MiB
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
//
// Minimum capacity for bbqueues
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * current_num_threads;
// 50 MiB
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(
GrenadParameters {
max_memory: Some(minimum_total_extractors_capacity),
..grenad_parameters
},
minimum_total_bbbuffer_capacity,
), // 100 MiB by thread by default
|max_memory| {
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
let new_grenad_parameters = GrenadParameters {
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)
},
);
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
(grenad_parameters, total_bbbuffer_capacity)
}

View File

@ -1,23 +1,19 @@
use std::cell::{Cell, RefCell};
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use bumpalo::Bump;
use heed::{RoTxn, WithoutTls};
use rayon::iter::IndexedParallelIterator;
use rayon::slice::ParallelSlice;
use super::super::document_change::DocumentChange;
use super::document_changes::IndexingContext;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::progress::{AtomicDocumentStep, Progress};
use crate::update::new::document_change::SettingsChangeDocument;
use crate::documents::PrimaryKey;
use crate::progress::AtomicDocumentStep;
use crate::update::new::document_change::DatabaseDocument;
use crate::update::new::indexer::document_changes::DocumentChangeContext;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::update::settings::SettingsDelta;
use crate::update::GrenadParameters;
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
use crate::{DocumentId, InternalError, Result};
/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s
pub trait SettingsChangeExtractor<'extractor>: Sync {
@ -27,46 +23,62 @@ pub trait SettingsChangeExtractor<'extractor>: Sync {
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = Result<SettingsChangeDocument<'doc>>>,
changes: impl Iterator<Item = Result<DatabaseDocument<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>,
) -> Result<()>;
}
pub struct DatabaseDocuments<'indexer> {
documents: &'indexer [DocumentId],
primary_key: PrimaryKey<'indexer>,
}
pub trait SettingsChangeDocuments<'pl // lifetime of the underlying payload
>: Sync {
type Item: Send;
fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
impl<'indexer> DatabaseDocuments<'indexer> {
pub fn new(documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>) -> Self {
Self { documents, primary_key }
}
fn item_to_settings_change_document<'doc, // lifetime of a single `process` call
T: MostlySend>(
fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator<Item = &[DocumentId]> {
self.documents.par_chunks(chunk_size)
}
fn item_to_database_document<
'doc, // lifetime of a single `process` call
T: MostlySend,
>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
item: &'doc Self::Item,
) -> Result<Option<SettingsChangeDocument<'doc>>> where 'pl: 'doc // the payload must survive the process calls
;
docid: &'doc DocumentId,
) -> Result<Option<DatabaseDocument<'doc>>> {
let current = context.index.document(&context.rtxn, *docid)?;
let external_document_id = self.primary_key.extract_docid_from_db(
current,
&context.db_fields_ids_map,
&context.doc_alloc,
)?;
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
Ok(Some(DatabaseDocument::create(*docid, external_document_id)))
}
fn len(&self) -> usize {
self.documents.len()
}
}
const CHUNK_SIZE: usize = 100;
pub fn settings_change_extract<
'pl, // covariant lifetime of the underlying payload
'extractor, // invariant lifetime of extractor_alloc
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing
'data, // invariant on EX::Data lifetime of datastore
'index, // covariant lifetime of the index
EX,
SCD: SettingsChangeDocuments<'pl>,
MSP,
EX: SettingsChangeExtractor<'extractor>,
MSP: Fn() -> bool + Sync,
>(
document_changes: &SCD,
documents: &'indexer DatabaseDocuments<'indexer>,
extractor: &EX,
IndexingContext {
index,
@ -81,11 +93,7 @@ pub fn settings_change_extract<
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
datastore: &'data ThreadLocal<EX::Data>,
step: IndexingStep,
) -> Result<()>
where
EX: SettingsChangeExtractor<'extractor>,
MSP: Fn() -> bool + Sync,
{
) -> Result<()> {
tracing::trace!("We are resetting the extractor allocators");
progress.update_progress(step);
// Clean up and reuse the extractor allocs
@ -94,11 +102,11 @@ where
extractor_alloc.0.reset();
}
let total_documents = document_changes.len() as u32;
let total_documents = documents.len() as u32;
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
progress.update_progress(progress_step);
let pi = document_changes.iter(CHUNK_SIZE);
let pi = documents.iter(CHUNK_SIZE);
pi.try_arc_for_each_try_init(
|| {
DocumentChangeContext::new(
@ -121,9 +129,9 @@ where
context.doc_alloc.reset();
let items = items.as_ref();
let changes = items.iter().filter_map(|item| {
document_changes.item_to_settings_change_document(context, item).transpose()
});
let changes = items
.iter()
.filter_map(|item| documents.item_to_database_document(context, item).transpose());
let res = extractor.process(changes, context).map_err(Arc::new);
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);

View File

@ -13,7 +13,7 @@ use crate::error::{FieldIdMapMissingEntry, InternalError};
use crate::update::new::document::Versions;
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update};
use crate::update::new::{DatabaseDocument, DocumentChange, KvReaderFieldId, Update};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
pub struct UpdateByFunction {
@ -128,10 +128,9 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
match scope.remove::<Dynamic>("doc") {
// If the "doc" variable has been set to (), we effectively delete the document.
Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(Deletion::create(
docid,
doc_alloc.alloc_str(&document_id),
)))),
Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(
DatabaseDocument::create(docid, doc_alloc.alloc_str(&document_id)),
))),
None => unreachable!("missing doc variable from the Rhai scope"),
Some(new_document) => match new_document.try_cast() {
Some(new_rhai_document) => {

View File

@ -21,7 +21,7 @@ pub fn write_to_db(
finished_extraction: &AtomicBool,
index: &Index,
wtxn: &mut RwTxn<'_>,
arroy_writers: &HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
arroy_writers: &HashMap<u8, (&Embedder, ArroyWrapper, usize)>,
) -> Result<ChannelCongestion> {
// Used by by the ArroySetVector to copy the embedding into an
// aligned memory area, required by arroy to accept a new vector.
@ -53,7 +53,7 @@ pub fn write_to_db(
}
ReceiverAction::LargeVectors(large_vectors) => {
let LargeVectors { docid, embedder_id, .. } = large_vectors;
let (_, _, writer, dimensions) =
let (_, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions);
for embedding in large_vectors.read_embeddings(*dimensions) {
@ -105,7 +105,7 @@ pub fn build_vectors<MSP>(
progress: &Progress,
index_embeddings: Vec<IndexEmbeddingConfig>,
arroy_memory: Option<usize>,
arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
arroy_writers: &mut HashMap<u8, (&Embedder, ArroyWrapper, usize)>,
must_stop_processing: &MSP,
) -> Result<()>
where
@ -117,7 +117,7 @@ where
let seed = rand::random();
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
for (_index, (_embedder_name, _embedder, writer, dimensions)) in arroy_writers {
for (_index, (_embedder, writer, dimensions)) in arroy_writers {
let dimensions = *dimensions;
writer.build_and_quantize(
wtxn,
@ -166,7 +166,7 @@ pub fn write_from_bbqueue(
writer_receiver: &mut WriterBbqueueReceiver<'_>,
index: &Index,
wtxn: &mut RwTxn<'_>,
arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>,
arroy_writers: &HashMap<u8, (&crate::vector::Embedder, ArroyWrapper, usize)>,
aligned_embedding: &mut Vec<f32>,
) -> crate::Result<()> {
while let Some(frame_with_header) = writer_receiver.recv_frame() {
@ -207,7 +207,7 @@ pub fn write_from_bbqueue(
}
}
EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers {
for (_index, (_embedder, writer, dimensions)) in arroy_writers {
let dimensions = *dimensions;
writer.del_items(wtxn, dimensions, docid)?;
}
@ -215,7 +215,7 @@ pub fn write_from_bbqueue(
EntryHeader::ArroySetVectors(asvs) => {
let ArroySetVectors { docid, embedder_id, .. } = asvs;
let frame = frame_with_header.frame();
let (_, _, writer, dimensions) =
let (_, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions);
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);

View File

@ -1,4 +1,4 @@
pub use document_change::{Deletion, DocumentChange, Insertion, Update};
pub use document_change::{DatabaseDocument, DocumentChange, Insertion, Update};
pub use indexer::ChannelCongestion;
pub use merger::{
merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta,

View File

@ -31,13 +31,17 @@ use crate::progress::Progress;
use crate::prompt::{default_max_bytes, default_template_text, PromptData};
use crate::proximity::ProximityPrecision;
use crate::update::index_documents::IndexDocumentsMethod;
use crate::update::new::indexer::reindex;
use crate::update::{IndexDocuments, UpdateIndexingStep};
use crate::vector::settings::{
EmbedderAction, EmbedderSource, EmbeddingSettings, NestingContext, ReindexAction,
SubEmbeddingSettings, WriteBackToDocuments,
};
use crate::vector::{Embedder, EmbeddingConfig, EmbeddingConfigs};
use crate::{FieldId, FilterableAttributesRule, Index, LocalizedAttributesRule, Result};
use crate::{
ChannelCongestion, FieldId, FieldsIdsMap, FilterableAttributesRule, Index,
LocalizedAttributesRule, Result, ThreadPoolNoAbortBuilder,
};
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum Setting<T> {
@ -1424,16 +1428,18 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
mut self,
must_stop_processing: &'indexer MSP,
progress: &'indexer Progress,
) -> Result<()>
) -> Result<Option<ChannelCongestion>>
where
MSP: Fn() -> bool + Sync,
{
// force the old indexer if the environment says so
if std::env::var_os("MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS").is_some() {
return self.execute(
return self
.execute(
|indexing_step| tracing::debug!("update: {:?}", indexing_step),
must_stop_processing,
);
)
.map(|_| None);
}
// only use the new indexer when only the embedder possibly changed
@ -1469,7 +1475,40 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
indexer_config: _,
} = &self
{
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
let old_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
// Update index settings
let embedding_config_updates = self.update_embedding_configs()?;
let mut new_inner_settings =
InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
new_inner_settings.recompute_searchables(self.wtxn, self.index)?;
let primary_key_id = self
.index
.primary_key(self.wtxn)?
.and_then(|name| new_inner_settings.fields_ids_map.id(name));
let settings_update_only = true;
let inner_settings_diff = InnerIndexSettingsDiff::new(
old_inner_settings,
new_inner_settings,
primary_key_id,
embedding_config_updates,
settings_update_only,
);
todo!()
// reindex(
// self.wtxn,
// self.index,
// &self.indexer_config.pool,
// self.indexer_config.grenad_parameters(),
// &inner_settings_diff,
// must_stop_processing,
// progress,
// )
// 1. First we want to update the database and compute the settings diff, we might reuse a bunch of existing functions here
// 2. Pick which pipelines we need to run.
// 3. Execute extraction pipelines
@ -1486,6 +1525,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|indexing_step| tracing::debug!("update: {:?}", indexing_step),
must_stop_processing,
)
.map(|_| None)
}
// create rtxn, populate FieldIdMapWithMetadata (old + new)
@ -2179,15 +2219,31 @@ fn deserialize_sub_embedder(
}
}
/// Implement this trait for the settings delta type.
/// This is used in the new settings update flow and will allow to easily replace the old settings delta type: `InnerIndexSettingsDiff`.
pub trait SettingsDelta {
fn new_embedding_configs(&self) -> &EmbeddingConfigs;
fn new_embedders(&self) -> &EmbeddingConfigs;
fn old_embedders(&self) -> &EmbeddingConfigs;
fn embedder_actions(&self) -> &BTreeMap<String, EmbedderAction>;
fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata;
}
impl SettingsDelta for InnerIndexSettingsDiff {
fn new_embedders(&self) -> &EmbeddingConfigs {
&self.new.embedding_configs
}
fn old_embedders(&self) -> &EmbeddingConfigs {
&self.old.embedding_configs
}
fn embedder_actions(&self) -> &BTreeMap<String, EmbedderAction> {
&self.embedding_config_updates
}
fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata {
&self.new.fields_ids_map
}
}
#[cfg(test)]