mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-06-19 19:08:10 +00:00
Update documents by reinserting or removing vector fields based on embedders
This commit is contained in:
parent
64b570abe2
commit
fe22dbd0e3
@ -319,7 +319,7 @@ impl<'doc> SettingsChangeDocument<'doc> {
|
|||||||
self.docid
|
self.docid
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn external_document_id(&self) -> &'doc str {
|
pub fn external_docid(&self) -> &'doc str {
|
||||||
self.external_document_id
|
self.external_document_id
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -333,4 +333,16 @@ impl<'doc> SettingsChangeDocument<'doc> {
|
|||||||
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
||||||
)?)
|
)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn current_vectors<'a, Mapper: FieldIdMapper>(
|
||||||
|
&self,
|
||||||
|
rtxn: &'a RoTxn,
|
||||||
|
index: &'a Index,
|
||||||
|
mapper: &'a Mapper,
|
||||||
|
doc_alloc: &'a Bump,
|
||||||
|
) -> Result<VectorDocumentFromDb<'a>> {
|
||||||
|
Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or(
|
||||||
|
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
||||||
|
)?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,25 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use bumpalo::Bump;
|
use bumpalo::Bump;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
|
|
||||||
use super::DelAddRoaringBitmap;
|
use super::DelAddRoaringBitmap;
|
||||||
use crate::constants::RESERVED_GEO_FIELD_NAME;
|
use crate::constants::RESERVED_GEO_FIELD_NAME;
|
||||||
use crate::update::new::channel::DocumentsSender;
|
use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender};
|
||||||
use crate::update::new::document::{write_to_obkv, Document as _};
|
use crate::update::new::document::{write_to_obkv, Document as _};
|
||||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
|
use crate::update::new::document_change::SettingsChangeDocument;
|
||||||
|
use crate::update::new::indexer::document_changes::{
|
||||||
|
DocumentChangeContext, Extractor, IndexingContext,
|
||||||
|
};
|
||||||
|
use crate::update::new::indexer::settings_changes::{
|
||||||
|
settings_change_extract, SettingsChangeDocuments, SettingsChangeExtractor,
|
||||||
|
};
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
use crate::update::new::thread_local::FullySend;
|
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
|
use crate::update::settings::SettingsDelta;
|
||||||
|
use crate::vector::settings::EmbedderAction;
|
||||||
use crate::vector::EmbeddingConfigs;
|
use crate::vector::EmbeddingConfigs;
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
@ -162,3 +171,98 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct SettingsChangeDocumentExtractor<'a, 'b> {
|
||||||
|
document_sender: DocumentsSender<'a, 'b>,
|
||||||
|
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b> SettingsChangeDocumentExtractor<'a, 'b> {
|
||||||
|
pub fn new(
|
||||||
|
document_sender: DocumentsSender<'a, 'b>,
|
||||||
|
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
|
||||||
|
) -> Self {
|
||||||
|
Self { document_sender, embedder_actions }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentExtractor<'_, '_> {
|
||||||
|
type Data = FullySend<RefCell<DocumentExtractorData>>;
|
||||||
|
|
||||||
|
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
||||||
|
Ok(FullySend(Default::default()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process<'doc>(
|
||||||
|
&self,
|
||||||
|
documents: impl Iterator<Item = Result<SettingsChangeDocument<'doc>>>,
|
||||||
|
context: &DocumentChangeContext<Self::Data>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
|
||||||
|
|
||||||
|
for document in documents {
|
||||||
|
let document = document?;
|
||||||
|
// **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop
|
||||||
|
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
|
||||||
|
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
|
||||||
|
|
||||||
|
let external_docid = document.external_docid().to_owned();
|
||||||
|
let content =
|
||||||
|
document.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||||
|
let vector_content = document.current_vectors(
|
||||||
|
&context.rtxn,
|
||||||
|
context.index,
|
||||||
|
&context.db_fields_ids_map,
|
||||||
|
&context.doc_alloc,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let content = write_to_obkv(
|
||||||
|
&content,
|
||||||
|
Some(&vector_content),
|
||||||
|
&self.embedder_actions,
|
||||||
|
&mut new_fields_ids_map,
|
||||||
|
&mut document_buffer,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
/// TODO: avoid sending a document without any modifications
|
||||||
|
self.document_sender.uncompressed(document.docid(), external_docid, content).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Modify the database documents based on the settings changes.
|
||||||
|
///
|
||||||
|
/// This function extracts the documents from the database,
|
||||||
|
/// modifies them by adding or removing vector fields based on embedder actions,
|
||||||
|
/// and then updates the database.
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")]
|
||||||
|
pub fn update_database_documents<'pl, 'extractor, SCD, MSP, SD>(
|
||||||
|
document_changes: &SCD,
|
||||||
|
indexing_context: IndexingContext<MSP>,
|
||||||
|
extractor_sender: ExtractorBbqueueSender,
|
||||||
|
settings_delta: &SD,
|
||||||
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
MSP: Fn() -> bool + Sync,
|
||||||
|
SD: SettingsDelta,
|
||||||
|
SCD: SettingsChangeDocuments<'pl>,
|
||||||
|
{
|
||||||
|
let document_sender = extractor_sender.documents();
|
||||||
|
let document_extractor =
|
||||||
|
SettingsChangeDocumentExtractor::new(document_sender, settings_delta.embedder_actions());
|
||||||
|
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
|
|
||||||
|
settings_change_extract(
|
||||||
|
document_changes,
|
||||||
|
&document_extractor,
|
||||||
|
indexing_context,
|
||||||
|
extractor_allocs,
|
||||||
|
&datastore,
|
||||||
|
crate::update::new::steps::IndexingStep::ExtractingDocuments,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
@ -12,12 +12,14 @@ use super::super::steps::IndexingStep;
|
|||||||
use super::super::thread_local::{FullySend, ThreadLocal};
|
use super::super::thread_local::{FullySend, ThreadLocal};
|
||||||
use super::super::FacetFieldIdsDelta;
|
use super::super::FacetFieldIdsDelta;
|
||||||
use super::document_changes::{extract, DocumentChanges, IndexingContext};
|
use super::document_changes::{extract, DocumentChanges, IndexingContext};
|
||||||
|
use super::settings_changes::SettingsChangeDocuments;
|
||||||
use crate::index::IndexEmbeddingConfig;
|
use crate::index::IndexEmbeddingConfig;
|
||||||
use crate::progress::MergingWordCache;
|
use crate::progress::MergingWordCache;
|
||||||
use crate::proximity::ProximityPrecision;
|
use crate::proximity::ProximityPrecision;
|
||||||
use crate::update::new::extract::EmbeddingExtractor;
|
use crate::update::new::extract::EmbeddingExtractor;
|
||||||
use crate::update::new::merger::merge_and_send_rtree;
|
use crate::update::new::merger::merge_and_send_rtree;
|
||||||
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
||||||
|
use crate::update::settings::SettingsDelta;
|
||||||
use crate::vector::EmbeddingConfigs;
|
use crate::vector::EmbeddingConfigs;
|
||||||
use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||||
|
|
||||||
@ -312,6 +314,49 @@ where
|
|||||||
Result::Ok((facet_field_ids_delta, index_embeddings))
|
Result::Ok((facet_field_ids_delta, index_embeddings))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn extract_all_settings_changes<'pl, 'extractor, SCD, MSP, SD>(
|
||||||
|
document_changes: &SCD,
|
||||||
|
indexing_context: IndexingContext<MSP>,
|
||||||
|
indexer_span: Span,
|
||||||
|
extractor_sender: ExtractorBbqueueSender,
|
||||||
|
settings_delta: &SD,
|
||||||
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
|
finished_extraction: &AtomicBool,
|
||||||
|
field_distribution: &mut BTreeMap<String, u64>,
|
||||||
|
mut index_embeddings: Vec<IndexEmbeddingConfig>,
|
||||||
|
document_ids: &mut RoaringBitmap,
|
||||||
|
modified_docids: &mut RoaringBitmap,
|
||||||
|
) -> Result<Vec<IndexEmbeddingConfig>>
|
||||||
|
where
|
||||||
|
SCD: SettingsChangeDocuments<'pl>,
|
||||||
|
MSP: Fn() -> bool + Sync,
|
||||||
|
SD: SettingsDelta,
|
||||||
|
{
|
||||||
|
let span =
|
||||||
|
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
update_database_documents(
|
||||||
|
document_changes,
|
||||||
|
indexing_context,
|
||||||
|
extractor_sender,
|
||||||
|
settings_delta,
|
||||||
|
extractor_allocs,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
'vectors: {
|
||||||
|
// TODO: extract embeddings for settings changes
|
||||||
|
// extract embeddings from new embedders
|
||||||
|
// remove embeddings for embedders that are no longer in the settings
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
|
||||||
|
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
|
Result::Ok(index_embeddings)
|
||||||
|
}
|
||||||
|
|
||||||
fn request_threads() -> &'static ThreadPoolNoAbort {
|
fn request_threads() -> &'static ThreadPoolNoAbort {
|
||||||
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
|
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ mod extract;
|
|||||||
mod guess_primary_key;
|
mod guess_primary_key;
|
||||||
mod partial_dump;
|
mod partial_dump;
|
||||||
mod post_processing;
|
mod post_processing;
|
||||||
mod settings_changes;
|
pub mod settings_changes;
|
||||||
mod update_by_function;
|
mod update_by_function;
|
||||||
mod write;
|
mod write;
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ use crate::update::new::indexer::document_changes::DocumentChangeContext;
|
|||||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
||||||
use crate::update::new::steps::IndexingStep;
|
use crate::update::new::steps::IndexingStep;
|
||||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||||
|
use crate::update::settings::SettingsDelta;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
||||||
|
|
||||||
|
@ -2179,6 +2179,17 @@ fn deserialize_sub_embedder(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait SettingsDelta {
|
||||||
|
fn new_embedding_configs(&self) -> &EmbeddingConfigs;
|
||||||
|
fn embedder_actions(&self) -> &BTreeMap<String, EmbedderAction>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SettingsDelta for InnerIndexSettingsDiff {
|
||||||
|
fn embedder_actions(&self) -> &BTreeMap<String, EmbedderAction> {
|
||||||
|
&self.embedding_config_updates
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
#[path = "test_settings.rs"]
|
#[path = "test_settings.rs"]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user