Merge pull request #5984 from meilisearch/embedder-error-modes

Embedder failure modes
This commit is contained in:
Clément Renault
2025-11-10 15:34:01 +00:00
committed by GitHub
5 changed files with 193 additions and 41 deletions

View File

@@ -1173,6 +1173,7 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
request_threads,
&doc_alloc,
embedder_stats,
false,
on_embed,
);

View File

@@ -35,6 +35,7 @@ pub struct EmbeddingExtractor<'a, 'b> {
possible_embedding_mistakes: PossibleEmbeddingMistakes,
embedder_stats: &'a EmbedderStats,
threads: &'a ThreadPoolNoAbort,
failure_modes: EmbedderFailureModes,
}
impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
@@ -46,7 +47,15 @@ impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
threads: &'a ThreadPoolNoAbort,
) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self { embedders, sender, threads, possible_embedding_mistakes, embedder_stats }
let failure_modes = EmbedderFailureModes::from_env();
Self {
embedders,
sender,
threads,
possible_embedding_mistakes,
embedder_stats,
failure_modes,
}
}
}
@@ -91,6 +100,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
self.threads,
self.sender,
&context.doc_alloc,
self.failure_modes,
))
}
@@ -267,6 +277,7 @@ pub struct SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort,
failure_modes: EmbedderFailureModes,
}
impl<'a, 'b, SD: SettingsDelta> SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
@@ -279,7 +290,16 @@ impl<'a, 'b, SD: SettingsDelta> SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
threads: &'a ThreadPoolNoAbort,
) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self { settings_delta, embedder_stats, sender, threads, possible_embedding_mistakes }
let failure_modes = EmbedderFailureModes::from_env();
Self {
settings_delta,
embedder_stats,
sender,
threads,
possible_embedding_mistakes,
failure_modes,
}
}
}
@@ -336,6 +356,7 @@ impl<'extractor, SD: SettingsDelta + Sync> SettingsChangeExtractor<'extractor>
self.threads,
self.sender,
&context.doc_alloc,
self.failure_modes,
),
reindex_action,
));
@@ -539,6 +560,7 @@ struct Chunks<'a, 'b, 'extractor> {
enum ChunkType<'a, 'b> {
DocumentTemplate {
document_template: &'a Prompt,
ignore_document_template_failures: bool,
session: EmbedSession<'a, OnEmbeddingDocumentUpdates<'a, 'b>, &'a str>,
},
Fragments {
@@ -559,6 +581,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
threads: &'a ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>,
doc_alloc: &'a Bump,
failure_modes: EmbedderFailureModes,
) -> Self {
let embedder = &runtime.embedder;
let dimensions = embedder.dimensions();
@@ -567,12 +590,14 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
let kind = if fragments.is_empty() {
ChunkType::DocumentTemplate {
document_template: &runtime.document_template,
ignore_document_template_failures: failure_modes.ignore_document_template_failures,
session: EmbedSession::new(
&runtime.embedder,
embedder_name,
threads,
doc_alloc,
embedder_stats,
failure_modes.ignore_embedder_failures,
OnEmbeddingDocumentUpdates {
embedder_id: embedder_info.embedder_id,
sender,
@@ -589,6 +614,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
threads,
doc_alloc,
embedder_stats,
failure_modes.ignore_embedder_failures,
OnEmbeddingDocumentUpdates {
embedder_id: embedder_info.embedder_id,
sender,
@@ -693,7 +719,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
},
)?;
}
ChunkType::DocumentTemplate { document_template, session } => {
ChunkType::DocumentTemplate {
document_template,
ignore_document_template_failures,
session,
} => {
let doc_alloc = session.doc_alloc();
let old_embedder = settings_delta.old_embedders().get(session.embedder_name());
@@ -702,6 +732,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
} else {
old_embedder.as_ref().map(|old_embedder| &old_embedder.document_template)
};
let extractor =
DocumentTemplateExtractor::new(document_template, doc_alloc, fields_ids_map);
let old_extractor = old_document_template.map(|old_document_template| {
@@ -710,7 +741,15 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
let metadata =
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
match extractor.diff_settings(document, &external_docid, old_extractor.as_ref())? {
let extractor_diff = if *ignore_document_template_failures {
let extractor = extractor.ignore_errors();
let old_extractor = old_extractor.map(DocumentTemplateExtractor::ignore_errors);
extractor.diff_settings(document, &external_docid, old_extractor.as_ref())?
} else {
extractor.diff_settings(document, &external_docid, old_extractor.as_ref())?
};
match extractor_diff {
ExtractorDiff::Removed => {
if old_is_user_provided || full_reindex {
session.on_embed_mut().clear_vectors(docid);
@@ -758,7 +797,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
new_must_regenerate,
);
match &mut self.kind {
ChunkType::DocumentTemplate { document_template, session } => {
ChunkType::DocumentTemplate {
document_template,
ignore_document_template_failures,
session,
} => {
let doc_alloc = session.doc_alloc();
let ex = DocumentTemplateExtractor::new(
document_template,
@@ -766,18 +809,33 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
new_fields_ids_map,
);
update_autogenerated(
docid,
external_docid,
[ex],
old_document,
new_document,
&external_docid,
old_must_regenerate,
old_is_user_provided,
session,
unused_vectors_distribution,
)?
if *ignore_document_template_failures {
update_autogenerated(
docid,
external_docid,
[ex.ignore_errors()],
old_document,
new_document,
&external_docid,
old_must_regenerate,
old_is_user_provided,
session,
unused_vectors_distribution,
)
} else {
update_autogenerated(
docid,
external_docid,
[ex],
old_document,
new_document,
&external_docid,
old_must_regenerate,
old_is_user_provided,
session,
unused_vectors_distribution,
)
}?
}
ChunkType::Fragments { fragments, session } => {
let doc_alloc = session.doc_alloc();
@@ -844,23 +902,38 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
);
match &mut self.kind {
ChunkType::DocumentTemplate { document_template, session } => {
ChunkType::DocumentTemplate {
document_template,
ignore_document_template_failures,
session,
} => {
let doc_alloc = session.doc_alloc();
let ex = DocumentTemplateExtractor::new(
document_template,
doc_alloc,
new_fields_ids_map,
);
insert_autogenerated(
docid,
external_docid,
[ex],
new_document,
&external_docid,
session,
unused_vectors_distribution,
)?;
if *ignore_document_template_failures {
insert_autogenerated(
docid,
external_docid,
[ex.ignore_errors()],
new_document,
&external_docid,
session,
unused_vectors_distribution,
)?;
} else {
insert_autogenerated(
docid,
external_docid,
[ex],
new_document,
&external_docid,
session,
unused_vectors_distribution,
)?;
}
}
ChunkType::Fragments { fragments, session } => {
let doc_alloc = session.doc_alloc();
@@ -884,7 +957,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
match self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
ChunkType::DocumentTemplate {
document_template: _,
ignore_document_template_failures: _,
session,
} => {
session.drain(unused_vectors_distribution)?;
}
ChunkType::Fragments { fragments: _, session } => {
@@ -896,9 +973,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
pub fn embedder_name(&self) -> &'a str {
match &self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
session.embedder_name()
}
ChunkType::DocumentTemplate {
document_template: _,
ignore_document_template_failures: _,
session,
} => session.embedder_name(),
ChunkType::Fragments { fragments: _, session } => session.embedder_name(),
}
}
@@ -967,7 +1046,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
}
}
match &mut self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
ChunkType::DocumentTemplate {
document_template: _,
ignore_document_template_failures: _,
session,
} => {
session.on_embed_mut().process_embeddings(
Metadata { docid, external_docid, extractor_id: 0 },
embeddings,
@@ -1078,3 +1161,41 @@ where
Ok(())
}
#[derive(Clone, Copy, PartialEq, Eq, Default)]
struct EmbedderFailureModes {
pub ignore_document_template_failures: bool,
pub ignore_embedder_failures: bool,
}
impl EmbedderFailureModes {
fn from_env() -> Self {
match std::env::var("MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES") {
Ok(failure_modes) => Self::parse_from_str(
&failure_modes,
"`MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES`",
),
Err(std::env::VarError::NotPresent) => Self::default(),
Err(std::env::VarError::NotUnicode(_)) => panic!(
"`MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES` contains a non-unicode value"
),
}
}
fn parse_from_str(failure_modes: &str, provenance: &'static str) -> Self {
let Self { mut ignore_document_template_failures, mut ignore_embedder_failures } =
Default::default();
for segment in failure_modes.split(',') {
let segment = segment.trim();
match segment {
"ignore_document_template_failures" => {
ignore_document_template_failures = true;
}
"ignore_embedder_failures" => ignore_embedder_failures = true,
"" => continue,
segment => panic!("Unrecognized segment value for {provenance}: {segment}"),
}
}
Self { ignore_document_template_failures, ignore_embedder_failures }
}
}

View File

@@ -1631,8 +1631,11 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
// Update index settings
let embedding_config_updates = self.update_embedding_configs()?;
self.update_user_defined_searchable_attributes()?;
let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
let mut new_inner_settings =
InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
new_inner_settings.recompute_searchables(self.wtxn, self.index)?;
let primary_key_id = self
.index

View File

@@ -91,6 +91,7 @@ struct EmbedderData {
request: RequestData,
response: Response,
configuration_source: ConfigurationSource,
max_retry_duration: std::time::Duration,
}
#[derive(Debug)]
@@ -182,10 +183,15 @@ impl Embedder {
) -> Result<Self, NewEmbedderError> {
let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}"));
let timeout = std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(30);
let client = ureq::AgentBuilder::new()
.max_idle_connections(REQUEST_PARALLELISM * 2)
.max_idle_connections_per_host(REQUEST_PARALLELISM * 2)
.timeout(std::time::Duration::from_secs(30))
.timeout(std::time::Duration::from_secs(timeout))
.build();
let request = RequestData::new(
@@ -196,6 +202,14 @@ impl Embedder {
let response = Response::new(options.response, &request)?;
let max_retry_duration =
std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(60);
let max_retry_duration = std::time::Duration::from_secs(max_retry_duration);
let data = EmbedderData {
client,
bearer,
@@ -204,6 +218,7 @@ impl Embedder {
response,
configuration_source,
headers: options.headers,
max_retry_duration,
};
let dimensions = if let Some(dimensions) = options.dimensions {
@@ -457,7 +472,7 @@ where
}
}?;
let retry_duration = retry_duration.min(std::time::Duration::from_secs(60)); // don't wait more than a minute
let retry_duration = retry_duration.min(data.max_retry_duration); // don't wait more than the max duration
// randomly up to double the retry duration
let retry_duration = retry_duration

View File

@@ -44,6 +44,7 @@ pub struct EmbedSession<'doc, C, I> {
embedder_name: &'doc str,
embedder_stats: &'doc EmbedderStats,
ignore_embedding_failures: bool,
on_embed: C,
}
@@ -87,6 +88,7 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
threads: &'doc ThreadPoolNoAbort,
doc_alloc: &'doc Bump,
embedder_stats: &'doc EmbedderStats,
ignore_embedding_failures: bool,
on_embed: C,
) -> Self {
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
@@ -99,6 +101,7 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
threads,
embedder_name,
embedder_stats,
ignore_embedding_failures,
on_embed,
}
}
@@ -144,24 +147,33 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
Ok(())
}
Err(error) => {
// reset metadata and inputs, and send metadata to the error processing.
// send metadata to the error processing.
let doc_alloc = self.metadata.bump();
let metadata = std::mem::replace(
&mut self.metadata,
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
);
self.inputs.clear();
return Err(self.on_embed.process_embedding_error(
Err(self.on_embed.process_embedding_error(
error,
self.embedder_name,
unused_vectors_distribution,
metadata,
));
))
}
};
self.inputs.clear();
self.metadata.clear();
res
if self.ignore_embedding_failures {
if let Err(err) = res {
tracing::warn!(
%err,
"ignored error embedding batch of documents due to failure policy"
);
}
Ok(())
} else {
res
}
}
pub(crate) fn embedder_name(&self) -> &'doc str {