Compare commits

...

6 Commits

Author SHA1 Message Date
Louis Dureuil
7d7cb4c3d1 Make max retry duration configurable with MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS 2025-10-21 09:36:22 +02:00
Louis Dureuil
a1d35d9e52 Configurable timeout with MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS 2025-10-21 09:31:06 +02:00
Louis Dureuil
9f17ab51d4 fixup: Add forgotten ignore errors 2025-10-20 18:17:45 +02:00
Louis Dureuil
5ecb4eb79e Embedding no longer returns a result 2025-10-20 17:55:14 +02:00
Louis Dureuil
0a91c091c6 Do not fail a batch when a the embedder fails to embed 2025-10-20 17:54:54 +02:00
Louis Dureuil
47a15fbe24 Ignor rendering errors on the document template 2025-10-20 17:54:19 +02:00
4 changed files with 63 additions and 40 deletions

View File

@@ -1192,12 +1192,12 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
Metadata { docid, external_docid: "", extractor_id },
value,
unused_vectors_distribution,
)?;
);
}
}
// send last chunk
let on_embed = session.drain(unused_vectors_distribution)?;
let on_embed = session.drain(unused_vectors_distribution);
on_embed.finish()
}

View File

@@ -684,7 +684,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
metadata,
input,
unused_vectors_distribution,
)?;
);
}
ExtractorDiff::Unchanged => { /* nothing to do */ }
}
@@ -724,7 +724,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
if old_is_user_provided || full_reindex {
session.on_embed_mut().clear_vectors(docid);
}
session.request_embedding(metadata, input, unused_vectors_distribution)?;
session.request_embedding(metadata, input, unused_vectors_distribution);
}
ExtractorDiff::Unchanged => { /* do nothing */ }
}
@@ -764,7 +764,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
document_template,
doc_alloc,
new_fields_ids_map,
);
)
.ignore_errors();
update_autogenerated(
docid,
@@ -850,7 +851,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
document_template,
doc_alloc,
new_fields_ids_map,
);
)
.ignore_errors();
insert_autogenerated(
docid,
@@ -885,10 +887,10 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
match self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
session.drain(unused_vectors_distribution)?;
session.drain(unused_vectors_distribution);
}
ChunkType::Fragments { fragments: _, session } => {
session.drain(unused_vectors_distribution)?;
session.drain(unused_vectors_distribution);
}
}
Ok(())
@@ -1036,7 +1038,7 @@ where
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
if let Some(new_rendered) = new_rendered {
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)?
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)
} else {
// remove any existing embedding
OnEmbed::process_embedding_response(
@@ -1072,7 +1074,7 @@ where
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() },
new_rendered,
unused_vectors_distribution,
)?;
);
}
}

View File

@@ -91,6 +91,7 @@ struct EmbedderData {
request: RequestData,
response: Response,
configuration_source: ConfigurationSource,
max_retry_duration: std::time::Duration,
}
#[derive(Debug)]
@@ -182,10 +183,15 @@ impl Embedder {
) -> Result<Self, NewEmbedderError> {
let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}"));
let timeout = std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(30);
let client = ureq::AgentBuilder::new()
.max_idle_connections(REQUEST_PARALLELISM * 2)
.max_idle_connections_per_host(REQUEST_PARALLELISM * 2)
.timeout(std::time::Duration::from_secs(30))
.timeout(std::time::Duration::from_secs(timeout))
.build();
let request = RequestData::new(
@@ -196,6 +202,14 @@ impl Embedder {
let response = Response::new(options.response, &request)?;
let max_retry_duration =
std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(60);
let max_retry_duration = std::time::Duration::from_secs(max_retry_duration);
let data = EmbedderData {
client,
bearer,
@@ -204,6 +218,7 @@ impl Embedder {
response,
configuration_source,
headers: options.headers,
max_retry_duration,
};
let dimensions = if let Some(dimensions) = options.dimensions {
@@ -457,7 +472,7 @@ where
}
}?;
let retry_duration = retry_duration.min(std::time::Duration::from_secs(60)); // don't wait more than a minute
let retry_duration = retry_duration.min(data.max_retry_duration); // don't wait more than the max duration
// randomly up to double the retry duration
let retry_duration = retry_duration

View File

@@ -5,7 +5,7 @@ use serde_json::Value;
use super::error::EmbedError;
use super::{Embedder, Embedding};
use crate::progress::EmbedderStats;
use crate::{DocumentId, Result, ThreadPoolNoAbort};
use crate::{DocumentId, ThreadPoolNoAbort};
type ExtractorId = u8;
#[derive(Clone, Copy)]
@@ -108,32 +108,28 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
metadata: Metadata<'doc>,
rendered: I,
unused_vectors_distribution: &C::ErrorMetadata,
) -> Result<()> {
) {
if self.inputs.len() < self.inputs.capacity() {
self.inputs.push(rendered);
self.metadata.push(metadata);
return Ok(());
return;
}
self.embed_chunks(unused_vectors_distribution)
}
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<C> {
self.embed_chunks(unused_vectors_distribution)?;
Ok(self.on_embed)
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> C {
self.embed_chunks(unused_vectors_distribution);
self.on_embed
}
#[allow(clippy::too_many_arguments)]
fn embed_chunks(&mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<()> {
fn embed_chunks(&mut self, _unused_vectors_distribution: &C::ErrorMetadata) {
if self.inputs.is_empty() {
return Ok(());
return;
}
let res = match I::embed_ref(
self.inputs.as_slice(),
self.embedder,
self.threads,
self.embedder_stats,
) {
match I::embed_ref(self.inputs.as_slice(), self.embedder, self.threads, self.embedder_stats)
{
Ok(embeddings) => {
for (metadata, embedding) in self.metadata.iter().copied().zip(embeddings) {
self.on_embed.process_embedding_response(EmbeddingResponse {
@@ -141,27 +137,37 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
embedding: Some(embedding),
});
}
Ok(())
}
Err(error) => {
// reset metadata and inputs, and send metadata to the error processing.
let doc_alloc = self.metadata.bump();
let metadata = std::mem::replace(
&mut self.metadata,
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
tracing::warn!(
%error,
"error embedding batch of documents, retrying one by one"
);
self.inputs.clear();
return Err(self.on_embed.process_embedding_error(
error,
self.embedder_name,
unused_vectors_distribution,
metadata,
));
// retry with one call per input
for (metadata, input) in self.metadata.iter().copied().zip(self.inputs.chunks(1)) {
match I::embed_ref(input, self.embedder, self.threads, self.embedder_stats) {
Ok(mut embeddings) => {
let Some(embedding) = embeddings.pop() else {
continue;
};
self.on_embed.process_embedding_response(EmbeddingResponse {
metadata,
embedding: Some(embedding),
})
}
Err(err) => {
tracing::warn!(
docid = metadata.external_docid,
%err,
"error embedding document"
);
}
}
}
}
};
self.inputs.clear();
self.metadata.clear();
res
}
pub(crate) fn embedder_name(&self) -> &'doc str {