Allow to customize failure modes with MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES

This commit is contained in:
Louis Dureuil
2025-11-10 14:23:51 +01:00
parent a9d6e86077
commit 40456795d0
4 changed files with 182 additions and 39 deletions

View File

@@ -1173,6 +1173,7 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
request_threads, request_threads,
&doc_alloc, &doc_alloc,
embedder_stats, embedder_stats,
false,
on_embed, on_embed,
); );

View File

@@ -35,6 +35,7 @@ pub struct EmbeddingExtractor<'a, 'b> {
possible_embedding_mistakes: PossibleEmbeddingMistakes, possible_embedding_mistakes: PossibleEmbeddingMistakes,
embedder_stats: &'a EmbedderStats, embedder_stats: &'a EmbedderStats,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
failure_modes: EmbedderFailureModes,
} }
impl<'a, 'b> EmbeddingExtractor<'a, 'b> { impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
@@ -46,7 +47,15 @@ impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
) -> Self { ) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution); let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self { embedders, sender, threads, possible_embedding_mistakes, embedder_stats } let failure_modes = EmbedderFailureModes::from_env();
Self {
embedders,
sender,
threads,
possible_embedding_mistakes,
embedder_stats,
failure_modes,
}
} }
} }
@@ -91,6 +100,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
self.threads, self.threads,
self.sender, self.sender,
&context.doc_alloc, &context.doc_alloc,
self.failure_modes,
)) ))
} }
@@ -267,6 +277,7 @@ pub struct SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
sender: EmbeddingSender<'a, 'b>, sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes, possible_embedding_mistakes: PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
failure_modes: EmbedderFailureModes,
} }
impl<'a, 'b, SD: SettingsDelta> SettingsChangeEmbeddingExtractor<'a, 'b, SD> { impl<'a, 'b, SD: SettingsDelta> SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
@@ -279,7 +290,16 @@ impl<'a, 'b, SD: SettingsDelta> SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
) -> Self { ) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution); let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self { settings_delta, embedder_stats, sender, threads, possible_embedding_mistakes } let failure_modes = EmbedderFailureModes::from_env();
Self {
settings_delta,
embedder_stats,
sender,
threads,
possible_embedding_mistakes,
failure_modes,
}
} }
} }
@@ -336,6 +356,7 @@ impl<'extractor, SD: SettingsDelta + Sync> SettingsChangeExtractor<'extractor>
self.threads, self.threads,
self.sender, self.sender,
&context.doc_alloc, &context.doc_alloc,
self.failure_modes,
), ),
reindex_action, reindex_action,
)); ));
@@ -539,6 +560,7 @@ struct Chunks<'a, 'b, 'extractor> {
enum ChunkType<'a, 'b> { enum ChunkType<'a, 'b> {
DocumentTemplate { DocumentTemplate {
document_template: &'a Prompt, document_template: &'a Prompt,
ignore_document_template_failures: bool,
session: EmbedSession<'a, OnEmbeddingDocumentUpdates<'a, 'b>, &'a str>, session: EmbedSession<'a, OnEmbeddingDocumentUpdates<'a, 'b>, &'a str>,
}, },
Fragments { Fragments {
@@ -559,6 +581,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>, sender: EmbeddingSender<'a, 'b>,
doc_alloc: &'a Bump, doc_alloc: &'a Bump,
failure_modes: EmbedderFailureModes,
) -> Self { ) -> Self {
let embedder = &runtime.embedder; let embedder = &runtime.embedder;
let dimensions = embedder.dimensions(); let dimensions = embedder.dimensions();
@@ -567,12 +590,14 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
let kind = if fragments.is_empty() { let kind = if fragments.is_empty() {
ChunkType::DocumentTemplate { ChunkType::DocumentTemplate {
document_template: &runtime.document_template, document_template: &runtime.document_template,
ignore_document_template_failures: failure_modes.ignore_document_template_failures,
session: EmbedSession::new( session: EmbedSession::new(
&runtime.embedder, &runtime.embedder,
embedder_name, embedder_name,
threads, threads,
doc_alloc, doc_alloc,
embedder_stats, embedder_stats,
failure_modes.ignore_embedder_failures,
OnEmbeddingDocumentUpdates { OnEmbeddingDocumentUpdates {
embedder_id: embedder_info.embedder_id, embedder_id: embedder_info.embedder_id,
sender, sender,
@@ -589,6 +614,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
threads, threads,
doc_alloc, doc_alloc,
embedder_stats, embedder_stats,
failure_modes.ignore_embedder_failures,
OnEmbeddingDocumentUpdates { OnEmbeddingDocumentUpdates {
embedder_id: embedder_info.embedder_id, embedder_id: embedder_info.embedder_id,
sender, sender,
@@ -693,7 +719,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
}, },
)?; )?;
} }
ChunkType::DocumentTemplate { document_template, session } => { ChunkType::DocumentTemplate {
document_template,
ignore_document_template_failures,
session,
} => {
let doc_alloc = session.doc_alloc(); let doc_alloc = session.doc_alloc();
let old_embedder = settings_delta.old_embedders().get(session.embedder_name()); let old_embedder = settings_delta.old_embedders().get(session.embedder_name());
@@ -702,6 +732,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
} else { } else {
old_embedder.as_ref().map(|old_embedder| &old_embedder.document_template) old_embedder.as_ref().map(|old_embedder| &old_embedder.document_template)
}; };
let extractor = let extractor =
DocumentTemplateExtractor::new(document_template, doc_alloc, fields_ids_map); DocumentTemplateExtractor::new(document_template, doc_alloc, fields_ids_map);
let old_extractor = old_document_template.map(|old_document_template| { let old_extractor = old_document_template.map(|old_document_template| {
@@ -710,7 +741,15 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
let metadata = let metadata =
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() }; Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
match extractor.diff_settings(document, &external_docid, old_extractor.as_ref())? { let extractor_diff = if *ignore_document_template_failures {
let extractor = extractor.ignore_errors();
let old_extractor = old_extractor.map(DocumentTemplateExtractor::ignore_errors);
extractor.diff_settings(document, &external_docid, old_extractor.as_ref())?
} else {
extractor.diff_settings(document, &external_docid, old_extractor.as_ref())?
};
match extractor_diff {
ExtractorDiff::Removed => { ExtractorDiff::Removed => {
if old_is_user_provided || full_reindex { if old_is_user_provided || full_reindex {
session.on_embed_mut().clear_vectors(docid); session.on_embed_mut().clear_vectors(docid);
@@ -758,7 +797,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
new_must_regenerate, new_must_regenerate,
); );
match &mut self.kind { match &mut self.kind {
ChunkType::DocumentTemplate { document_template, session } => { ChunkType::DocumentTemplate {
document_template,
ignore_document_template_failures,
session,
} => {
let doc_alloc = session.doc_alloc(); let doc_alloc = session.doc_alloc();
let ex = DocumentTemplateExtractor::new( let ex = DocumentTemplateExtractor::new(
document_template, document_template,
@@ -766,18 +809,33 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
new_fields_ids_map, new_fields_ids_map,
); );
update_autogenerated( if *ignore_document_template_failures {
docid, update_autogenerated(
external_docid, docid,
[ex], external_docid,
old_document, [ex.ignore_errors()],
new_document, old_document,
&external_docid, new_document,
old_must_regenerate, &external_docid,
old_is_user_provided, old_must_regenerate,
session, old_is_user_provided,
unused_vectors_distribution, session,
)? unused_vectors_distribution,
)
} else {
update_autogenerated(
docid,
external_docid,
[ex],
old_document,
new_document,
&external_docid,
old_must_regenerate,
old_is_user_provided,
session,
unused_vectors_distribution,
)
}?
} }
ChunkType::Fragments { fragments, session } => { ChunkType::Fragments { fragments, session } => {
let doc_alloc = session.doc_alloc(); let doc_alloc = session.doc_alloc();
@@ -844,23 +902,38 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
); );
match &mut self.kind { match &mut self.kind {
ChunkType::DocumentTemplate { document_template, session } => { ChunkType::DocumentTemplate {
document_template,
ignore_document_template_failures,
session,
} => {
let doc_alloc = session.doc_alloc(); let doc_alloc = session.doc_alloc();
let ex = DocumentTemplateExtractor::new( let ex = DocumentTemplateExtractor::new(
document_template, document_template,
doc_alloc, doc_alloc,
new_fields_ids_map, new_fields_ids_map,
); );
if *ignore_document_template_failures {
insert_autogenerated( insert_autogenerated(
docid, docid,
external_docid, external_docid,
[ex], [ex.ignore_errors()],
new_document, new_document,
&external_docid, &external_docid,
session, session,
unused_vectors_distribution, unused_vectors_distribution,
)?; )?;
} else {
insert_autogenerated(
docid,
external_docid,
[ex],
new_document,
&external_docid,
session,
unused_vectors_distribution,
)?;
}
} }
ChunkType::Fragments { fragments, session } => { ChunkType::Fragments { fragments, session } => {
let doc_alloc = session.doc_alloc(); let doc_alloc = session.doc_alloc();
@@ -884,7 +957,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> { pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
match self.kind { match self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => { ChunkType::DocumentTemplate {
document_template: _,
ignore_document_template_failures: _,
session,
} => {
session.drain(unused_vectors_distribution)?; session.drain(unused_vectors_distribution)?;
} }
ChunkType::Fragments { fragments: _, session } => { ChunkType::Fragments { fragments: _, session } => {
@@ -896,9 +973,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
pub fn embedder_name(&self) -> &'a str { pub fn embedder_name(&self) -> &'a str {
match &self.kind { match &self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => { ChunkType::DocumentTemplate {
session.embedder_name() document_template: _,
} ignore_document_template_failures: _,
session,
} => session.embedder_name(),
ChunkType::Fragments { fragments: _, session } => session.embedder_name(), ChunkType::Fragments { fragments: _, session } => session.embedder_name(),
} }
} }
@@ -967,7 +1046,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
} }
} }
match &mut self.kind { match &mut self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => { ChunkType::DocumentTemplate {
document_template: _,
ignore_document_template_failures: _,
session,
} => {
session.on_embed_mut().process_embeddings( session.on_embed_mut().process_embeddings(
Metadata { docid, external_docid, extractor_id: 0 }, Metadata { docid, external_docid, extractor_id: 0 },
embeddings, embeddings,
@@ -1078,3 +1161,47 @@ where
Ok(()) Ok(())
} }
#[derive(Clone, Copy, PartialEq, Eq, Default)]
struct EmbedderFailureModes {
pub ignore_document_template_failures: bool,
pub ignore_embedder_failures: bool,
}
impl EmbedderFailureModes {
fn from_env() -> Self {
std::env::var_os("MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES")
.map(|failure_modes| {
Self::parse_from_os_str(
&failure_modes,
"`MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES`",
)
})
.unwrap_or_default()
}
fn parse_from_os_str(failure_modes: &std::ffi::OsStr, provenance: &'static str) -> Self {
if failure_modes == "ignore_document_template_failure,ignore_embedder_failures"
|| failure_modes == "ignore_embedder_failures,ignore_document_template_failure"
{
Self { ignore_document_template_failures: true, ignore_embedder_failures: true }
} else if failure_modes == "ignore_document_template_failure"
|| failure_modes == "ignore_document_template_failure,"
|| failure_modes == ",ignore_document_template_failure"
{
Self { ignore_document_template_failures: true, ignore_embedder_failures: false }
} else if failure_modes == "ignore_embedder_failures"
|| failure_modes == "ignore_embedder_failures,"
|| failure_modes == ",ignore_embedder_failures"
{
Self { ignore_embedder_failures: true, ignore_document_template_failures: false }
} else if failure_modes == " " || failure_modes == "" || failure_modes == "," {
Self { ignore_document_template_failures: false, ignore_embedder_failures: false }
} else {
panic!(
"Unexpected value {failure_modes} for {provenance}",
failure_modes = failure_modes.to_string_lossy()
)
}
}
}

View File

@@ -1631,8 +1631,11 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
// Update index settings // Update index settings
let embedding_config_updates = self.update_embedding_configs()?; let embedding_config_updates = self.update_embedding_configs()?;
self.update_user_defined_searchable_attributes()?;
let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?; let mut new_inner_settings =
InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
new_inner_settings.recompute_searchables(self.wtxn, self.index)?;
let primary_key_id = self let primary_key_id = self
.index .index

View File

@@ -44,6 +44,7 @@ pub struct EmbedSession<'doc, C, I> {
embedder_name: &'doc str, embedder_name: &'doc str,
embedder_stats: &'doc EmbedderStats, embedder_stats: &'doc EmbedderStats,
ignore_embedding_failures: bool,
on_embed: C, on_embed: C,
} }
@@ -87,6 +88,7 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
threads: &'doc ThreadPoolNoAbort, threads: &'doc ThreadPoolNoAbort,
doc_alloc: &'doc Bump, doc_alloc: &'doc Bump,
embedder_stats: &'doc EmbedderStats, embedder_stats: &'doc EmbedderStats,
ignore_embedding_failures: bool,
on_embed: C, on_embed: C,
) -> Self { ) -> Self {
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint(); let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
@@ -99,6 +101,7 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
threads, threads,
embedder_name, embedder_name,
embedder_stats, embedder_stats,
ignore_embedding_failures,
on_embed, on_embed,
} }
} }
@@ -144,24 +147,33 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
Ok(()) Ok(())
} }
Err(error) => { Err(error) => {
// reset metadata and inputs, and send metadata to the error processing. // send metadata to the error processing.
let doc_alloc = self.metadata.bump(); let doc_alloc = self.metadata.bump();
let metadata = std::mem::replace( let metadata = std::mem::replace(
&mut self.metadata, &mut self.metadata,
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc), BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
); );
self.inputs.clear(); Err(self.on_embed.process_embedding_error(
return Err(self.on_embed.process_embedding_error(
error, error,
self.embedder_name, self.embedder_name,
unused_vectors_distribution, unused_vectors_distribution,
metadata, metadata,
)); ))
} }
}; };
self.inputs.clear(); self.inputs.clear();
self.metadata.clear(); self.metadata.clear();
res if self.ignore_embedding_failures {
if let Err(err) = res {
tracing::warn!(
%err,
"ignored error embedding batch of documents due to failure policy"
);
}
Ok(())
} else {
res
}
} }
pub(crate) fn embedder_name(&self) -> &'doc str { pub(crate) fn embedder_name(&self) -> &'doc str {