Compare commits

..

6 Commits

Author SHA1 Message Date
Louis Dureuil
7d7cb4c3d1 Make max retry duration configurable with MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS 2025-10-21 09:36:22 +02:00
Louis Dureuil
a1d35d9e52 Configurable timeout with MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS 2025-10-21 09:31:06 +02:00
Louis Dureuil
9f17ab51d4 fixup: Add forgotten ignore errors 2025-10-20 18:17:45 +02:00
Louis Dureuil
5ecb4eb79e Embedding no longer returns a result 2025-10-20 17:55:14 +02:00
Louis Dureuil
0a91c091c6 Do not fail a batch when a the embedder fails to embed 2025-10-20 17:54:54 +02:00
Louis Dureuil
47a15fbe24 Ignor rendering errors on the document template 2025-10-20 17:54:19 +02:00
8 changed files with 66 additions and 119 deletions

7
Cargo.lock generated
View File

@@ -2967,12 +2967,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]]
name = "hyper"
version = "1.7.0"
@@ -4042,7 +4036,6 @@ dependencies = [
"futures",
"futures-util",
"hex",
"humantime",
"index-scheduler",
"indexmap",
"insta",

View File

@@ -117,7 +117,7 @@ secrecy = "0.10.3"
actix-web-lab = { version = "0.24.1", default-features = false }
urlencoding = "2.1.3"
backoff = { version = "0.4.0", features = ["tokio"] }
humantime = { version = "2.3.0", default-features = false }
[dev-dependencies]
actix-rt = "2.10.0"

View File

@@ -1,8 +1,7 @@
use lazy_static::lazy_static;
use prometheus::{
opts, register_gauge, register_gauge_vec, register_histogram_vec, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, Gauge, GaugeVec, HistogramVec, IntCounterVec,
IntGauge, IntGaugeVec,
opts, register_gauge, register_histogram_vec, register_int_counter_vec, register_int_gauge,
register_int_gauge_vec, Gauge, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
};
lazy_static! {
@@ -74,20 +73,6 @@ lazy_static! {
&["kind", "value"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE: GaugeVec = register_gauge_vec!(
opts!("meilisearch_batch_running_progress_trace", "The currently running progress trace"),
&["batch_uid", "step_name"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS: IntGaugeVec =
register_int_gauge_vec!(
opts!(
"meilisearch_last_finished_batches_progress_trace_ms",
"The last few batches progress trace in milliseconds"
),
&["batch_uid", "step_name"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_LAST_UPDATE: IntGauge =
register_int_gauge!(opts!("meilisearch_last_update", "Meilisearch Last Update"))
.expect("Can't create a metric");

View File

@@ -4,7 +4,6 @@ use index_scheduler::{IndexScheduler, Query};
use meilisearch_auth::AuthController;
use meilisearch_types::error::ResponseError;
use meilisearch_types::keys::actions;
use meilisearch_types::milli::progress::ProgressStepView;
use meilisearch_types::tasks::Status;
use prometheus::{Encoder, TextEncoder};
use time::OffsetDateTime;
@@ -39,12 +38,6 @@ pub fn configure(config: &mut web::ServiceConfig) {
# HELP meilisearch_db_size_bytes Meilisearch DB Size In Bytes
# TYPE meilisearch_db_size_bytes gauge
meilisearch_db_size_bytes 1130496
# HELP meilisearch_batch_running_progress_trace The currently running progress trace
# TYPE meilisearch_batch_running_progress_trace gauge
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="document"} 0.710618582519409
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="extracting word proximity"} 0.2222222222222222
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="indexing"} 0.6666666666666666
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="processing tasks"} 0
# HELP meilisearch_http_requests_total Meilisearch HTTP requests total
# TYPE meilisearch_http_requests_total counter
meilisearch_http_requests_total{method="GET",path="/metrics",status="400"} 1
@@ -68,13 +61,6 @@ meilisearch_http_response_time_seconds_bucket{method="GET",path="/metrics",le="1
meilisearch_http_response_time_seconds_bucket{method="GET",path="/metrics",le="+Inf"} 0
meilisearch_http_response_time_seconds_sum{method="GET",path="/metrics"} 0
meilisearch_http_response_time_seconds_count{method="GET",path="/metrics"} 0
# HELP meilisearch_last_finished_batches_progress_trace_ms The last few batches progress trace in milliseconds
# TYPE meilisearch_last_finished_batches_progress_trace_ms gauge
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks"} 19360
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes"} 368
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes > preparing payloads"} 367
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes > preparing payloads > payload"} 367
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > indexing"} 18970
# HELP meilisearch_index_count Meilisearch Index Count
# TYPE meilisearch_index_count gauge
meilisearch_index_count 1
@@ -162,46 +148,6 @@ pub async fn get_metrics(
}
}
// Fetch and expose the current progressing step
crate::metrics::MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE.reset();
let (batches, _total) = index_scheduler.get_batches_from_authorized_indexes(
&Query { statuses: Some(vec![Status::Processing]), ..Query::default() },
auth_filters,
)?;
if let Some(batch) = batches.into_iter().next() {
let batch_uid = batch.uid.to_string();
if let Some(progress) = batch.progress {
for ProgressStepView { current_step, finished, total } in progress.steps {
crate::metrics::MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE
.with_label_values(&[batch_uid.as_str(), current_step.as_ref()])
// We return the completion ratio of the current step
.set(finished as f64 / total as f64);
}
}
}
crate::metrics::MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS.reset();
let (batches, _total) = index_scheduler.get_batches_from_authorized_indexes(
// Fetch the finished batches...
&Query { statuses: Some(vec![Status::Succeeded, Status::Failed]), ..Query::default() },
auth_filters,
)?;
// ...and get the last batch only.
if let Some(batch) = batches.into_iter().next() {
let batch_uid = batch.uid.to_string();
for (step_name, duration_str) in batch.stats.progress_trace {
let Some(duration_str) = duration_str.as_str() else { continue };
match humantime::parse_duration(duration_str) {
Ok(duration) => {
crate::metrics::MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS
.with_label_values(&[&batch_uid, &step_name])
.set(duration.as_millis() as i64);
}
Err(e) => tracing::error!("Failed to parse duration: {e}"),
}
}
}
if let Some(last_update) = response.last_update {
crate::metrics::MEILISEARCH_LAST_UPDATE.set(last_update.unix_timestamp());
}

View File

@@ -1192,12 +1192,12 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
Metadata { docid, external_docid: "", extractor_id },
value,
unused_vectors_distribution,
)?;
);
}
}
// send last chunk
let on_embed = session.drain(unused_vectors_distribution)?;
let on_embed = session.drain(unused_vectors_distribution);
on_embed.finish()
}

View File

@@ -684,7 +684,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
metadata,
input,
unused_vectors_distribution,
)?;
);
}
ExtractorDiff::Unchanged => { /* nothing to do */ }
}
@@ -724,7 +724,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
if old_is_user_provided || full_reindex {
session.on_embed_mut().clear_vectors(docid);
}
session.request_embedding(metadata, input, unused_vectors_distribution)?;
session.request_embedding(metadata, input, unused_vectors_distribution);
}
ExtractorDiff::Unchanged => { /* do nothing */ }
}
@@ -764,7 +764,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
document_template,
doc_alloc,
new_fields_ids_map,
);
)
.ignore_errors();
update_autogenerated(
docid,
@@ -850,7 +851,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
document_template,
doc_alloc,
new_fields_ids_map,
);
)
.ignore_errors();
insert_autogenerated(
docid,
@@ -885,10 +887,10 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
match self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
session.drain(unused_vectors_distribution)?;
session.drain(unused_vectors_distribution);
}
ChunkType::Fragments { fragments: _, session } => {
session.drain(unused_vectors_distribution)?;
session.drain(unused_vectors_distribution);
}
}
Ok(())
@@ -1036,7 +1038,7 @@ where
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
if let Some(new_rendered) = new_rendered {
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)?
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)
} else {
// remove any existing embedding
OnEmbed::process_embedding_response(
@@ -1072,7 +1074,7 @@ where
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() },
new_rendered,
unused_vectors_distribution,
)?;
);
}
}

View File

@@ -91,6 +91,7 @@ struct EmbedderData {
request: RequestData,
response: Response,
configuration_source: ConfigurationSource,
max_retry_duration: std::time::Duration,
}
#[derive(Debug)]
@@ -182,10 +183,15 @@ impl Embedder {
) -> Result<Self, NewEmbedderError> {
let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}"));
let timeout = std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(30);
let client = ureq::AgentBuilder::new()
.max_idle_connections(REQUEST_PARALLELISM * 2)
.max_idle_connections_per_host(REQUEST_PARALLELISM * 2)
.timeout(std::time::Duration::from_secs(30))
.timeout(std::time::Duration::from_secs(timeout))
.build();
let request = RequestData::new(
@@ -196,6 +202,14 @@ impl Embedder {
let response = Response::new(options.response, &request)?;
let max_retry_duration =
std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(60);
let max_retry_duration = std::time::Duration::from_secs(max_retry_duration);
let data = EmbedderData {
client,
bearer,
@@ -204,6 +218,7 @@ impl Embedder {
response,
configuration_source,
headers: options.headers,
max_retry_duration,
};
let dimensions = if let Some(dimensions) = options.dimensions {
@@ -457,7 +472,7 @@ where
}
}?;
let retry_duration = retry_duration.min(std::time::Duration::from_secs(60)); // don't wait more than a minute
let retry_duration = retry_duration.min(data.max_retry_duration); // don't wait more than the max duration
// randomly up to double the retry duration
let retry_duration = retry_duration

View File

@@ -5,7 +5,7 @@ use serde_json::Value;
use super::error::EmbedError;
use super::{Embedder, Embedding};
use crate::progress::EmbedderStats;
use crate::{DocumentId, Result, ThreadPoolNoAbort};
use crate::{DocumentId, ThreadPoolNoAbort};
type ExtractorId = u8;
#[derive(Clone, Copy)]
@@ -108,32 +108,28 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
metadata: Metadata<'doc>,
rendered: I,
unused_vectors_distribution: &C::ErrorMetadata,
) -> Result<()> {
) {
if self.inputs.len() < self.inputs.capacity() {
self.inputs.push(rendered);
self.metadata.push(metadata);
return Ok(());
return;
}
self.embed_chunks(unused_vectors_distribution)
}
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<C> {
self.embed_chunks(unused_vectors_distribution)?;
Ok(self.on_embed)
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> C {
self.embed_chunks(unused_vectors_distribution);
self.on_embed
}
#[allow(clippy::too_many_arguments)]
fn embed_chunks(&mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<()> {
fn embed_chunks(&mut self, _unused_vectors_distribution: &C::ErrorMetadata) {
if self.inputs.is_empty() {
return Ok(());
return;
}
let res = match I::embed_ref(
self.inputs.as_slice(),
self.embedder,
self.threads,
self.embedder_stats,
) {
match I::embed_ref(self.inputs.as_slice(), self.embedder, self.threads, self.embedder_stats)
{
Ok(embeddings) => {
for (metadata, embedding) in self.metadata.iter().copied().zip(embeddings) {
self.on_embed.process_embedding_response(EmbeddingResponse {
@@ -141,27 +137,37 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
embedding: Some(embedding),
});
}
Ok(())
}
Err(error) => {
// reset metadata and inputs, and send metadata to the error processing.
let doc_alloc = self.metadata.bump();
let metadata = std::mem::replace(
&mut self.metadata,
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
tracing::warn!(
%error,
"error embedding batch of documents, retrying one by one"
);
self.inputs.clear();
return Err(self.on_embed.process_embedding_error(
error,
self.embedder_name,
unused_vectors_distribution,
metadata,
));
// retry with one call per input
for (metadata, input) in self.metadata.iter().copied().zip(self.inputs.chunks(1)) {
match I::embed_ref(input, self.embedder, self.threads, self.embedder_stats) {
Ok(mut embeddings) => {
let Some(embedding) = embeddings.pop() else {
continue;
};
self.on_embed.process_embedding_response(EmbeddingResponse {
metadata,
embedding: Some(embedding),
})
}
Err(err) => {
tracing::warn!(
docid = metadata.external_docid,
%err,
"error embedding document"
);
}
}
}
}
};
self.inputs.clear();
self.metadata.clear();
res
}
pub(crate) fn embedder_name(&self) -> &'doc str {