mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-23 21:26:02 +00:00
Compare commits
6 Commits
progress-t
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d7cb4c3d1 | ||
|
|
a1d35d9e52 | ||
|
|
9f17ab51d4 | ||
|
|
5ecb4eb79e | ||
|
|
0a91c091c6 | ||
|
|
47a15fbe24 |
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -2967,12 +2967,6 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "1.7.0"
|
||||
@@ -4042,7 +4036,6 @@ dependencies = [
|
||||
"futures",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"humantime",
|
||||
"index-scheduler",
|
||||
"indexmap",
|
||||
"insta",
|
||||
|
||||
@@ -117,7 +117,7 @@ secrecy = "0.10.3"
|
||||
actix-web-lab = { version = "0.24.1", default-features = false }
|
||||
urlencoding = "2.1.3"
|
||||
backoff = { version = "0.4.0", features = ["tokio"] }
|
||||
humantime = { version = "2.3.0", default-features = false }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.10.0"
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::{
|
||||
opts, register_gauge, register_gauge_vec, register_histogram_vec, register_int_counter_vec,
|
||||
register_int_gauge, register_int_gauge_vec, Gauge, GaugeVec, HistogramVec, IntCounterVec,
|
||||
IntGauge, IntGaugeVec,
|
||||
opts, register_gauge, register_histogram_vec, register_int_counter_vec, register_int_gauge,
|
||||
register_int_gauge_vec, Gauge, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
@@ -74,20 +73,6 @@ lazy_static! {
|
||||
&["kind", "value"]
|
||||
)
|
||||
.expect("Can't create a metric");
|
||||
pub static ref MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE: GaugeVec = register_gauge_vec!(
|
||||
opts!("meilisearch_batch_running_progress_trace", "The currently running progress trace"),
|
||||
&["batch_uid", "step_name"]
|
||||
)
|
||||
.expect("Can't create a metric");
|
||||
pub static ref MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS: IntGaugeVec =
|
||||
register_int_gauge_vec!(
|
||||
opts!(
|
||||
"meilisearch_last_finished_batches_progress_trace_ms",
|
||||
"The last few batches progress trace in milliseconds"
|
||||
),
|
||||
&["batch_uid", "step_name"]
|
||||
)
|
||||
.expect("Can't create a metric");
|
||||
pub static ref MEILISEARCH_LAST_UPDATE: IntGauge =
|
||||
register_int_gauge!(opts!("meilisearch_last_update", "Meilisearch Last Update"))
|
||||
.expect("Can't create a metric");
|
||||
|
||||
@@ -4,7 +4,6 @@ use index_scheduler::{IndexScheduler, Query};
|
||||
use meilisearch_auth::AuthController;
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::keys::actions;
|
||||
use meilisearch_types::milli::progress::ProgressStepView;
|
||||
use meilisearch_types::tasks::Status;
|
||||
use prometheus::{Encoder, TextEncoder};
|
||||
use time::OffsetDateTime;
|
||||
@@ -39,12 +38,6 @@ pub fn configure(config: &mut web::ServiceConfig) {
|
||||
# HELP meilisearch_db_size_bytes Meilisearch DB Size In Bytes
|
||||
# TYPE meilisearch_db_size_bytes gauge
|
||||
meilisearch_db_size_bytes 1130496
|
||||
# HELP meilisearch_batch_running_progress_trace The currently running progress trace
|
||||
# TYPE meilisearch_batch_running_progress_trace gauge
|
||||
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="document"} 0.710618582519409
|
||||
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="extracting word proximity"} 0.2222222222222222
|
||||
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="indexing"} 0.6666666666666666
|
||||
meilisearch_batch_running_progress_trace{batch_uid="0",step_name="processing tasks"} 0
|
||||
# HELP meilisearch_http_requests_total Meilisearch HTTP requests total
|
||||
# TYPE meilisearch_http_requests_total counter
|
||||
meilisearch_http_requests_total{method="GET",path="/metrics",status="400"} 1
|
||||
@@ -68,13 +61,6 @@ meilisearch_http_response_time_seconds_bucket{method="GET",path="/metrics",le="1
|
||||
meilisearch_http_response_time_seconds_bucket{method="GET",path="/metrics",le="+Inf"} 0
|
||||
meilisearch_http_response_time_seconds_sum{method="GET",path="/metrics"} 0
|
||||
meilisearch_http_response_time_seconds_count{method="GET",path="/metrics"} 0
|
||||
# HELP meilisearch_last_finished_batches_progress_trace_ms The last few batches progress trace in milliseconds
|
||||
# TYPE meilisearch_last_finished_batches_progress_trace_ms gauge
|
||||
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks"} 19360
|
||||
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes"} 368
|
||||
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes > preparing payloads"} 367
|
||||
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > computing document changes > preparing payloads > payload"} 367
|
||||
meilisearch_last_finished_batches_progress_trace_ms{batch_uid="0",step_name="processing tasks > indexing"} 18970
|
||||
# HELP meilisearch_index_count Meilisearch Index Count
|
||||
# TYPE meilisearch_index_count gauge
|
||||
meilisearch_index_count 1
|
||||
@@ -162,46 +148,6 @@ pub async fn get_metrics(
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch and expose the current progressing step
|
||||
crate::metrics::MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE.reset();
|
||||
let (batches, _total) = index_scheduler.get_batches_from_authorized_indexes(
|
||||
&Query { statuses: Some(vec![Status::Processing]), ..Query::default() },
|
||||
auth_filters,
|
||||
)?;
|
||||
if let Some(batch) = batches.into_iter().next() {
|
||||
let batch_uid = batch.uid.to_string();
|
||||
if let Some(progress) = batch.progress {
|
||||
for ProgressStepView { current_step, finished, total } in progress.steps {
|
||||
crate::metrics::MEILISEARCH_BATCH_RUNNING_PROGRESS_TRACE
|
||||
.with_label_values(&[batch_uid.as_str(), current_step.as_ref()])
|
||||
// We return the completion ratio of the current step
|
||||
.set(finished as f64 / total as f64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
crate::metrics::MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS.reset();
|
||||
let (batches, _total) = index_scheduler.get_batches_from_authorized_indexes(
|
||||
// Fetch the finished batches...
|
||||
&Query { statuses: Some(vec![Status::Succeeded, Status::Failed]), ..Query::default() },
|
||||
auth_filters,
|
||||
)?;
|
||||
// ...and get the last batch only.
|
||||
if let Some(batch) = batches.into_iter().next() {
|
||||
let batch_uid = batch.uid.to_string();
|
||||
for (step_name, duration_str) in batch.stats.progress_trace {
|
||||
let Some(duration_str) = duration_str.as_str() else { continue };
|
||||
match humantime::parse_duration(duration_str) {
|
||||
Ok(duration) => {
|
||||
crate::metrics::MEILISEARCH_LAST_FINISHED_BATCHES_PROGRESS_TRACE_MS
|
||||
.with_label_values(&[&batch_uid, &step_name])
|
||||
.set(duration.as_millis() as i64);
|
||||
}
|
||||
Err(e) => tracing::error!("Failed to parse duration: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(last_update) = response.last_update {
|
||||
crate::metrics::MEILISEARCH_LAST_UPDATE.set(last_update.unix_timestamp());
|
||||
}
|
||||
|
||||
@@ -1192,12 +1192,12 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
|
||||
Metadata { docid, external_docid: "", extractor_id },
|
||||
value,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// send last chunk
|
||||
let on_embed = session.drain(unused_vectors_distribution)?;
|
||||
let on_embed = session.drain(unused_vectors_distribution);
|
||||
on_embed.finish()
|
||||
}
|
||||
|
||||
|
||||
@@ -684,7 +684,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
metadata,
|
||||
input,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
ExtractorDiff::Unchanged => { /* nothing to do */ }
|
||||
}
|
||||
@@ -724,7 +724,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
if old_is_user_provided || full_reindex {
|
||||
session.on_embed_mut().clear_vectors(docid);
|
||||
}
|
||||
session.request_embedding(metadata, input, unused_vectors_distribution)?;
|
||||
session.request_embedding(metadata, input, unused_vectors_distribution);
|
||||
}
|
||||
ExtractorDiff::Unchanged => { /* do nothing */ }
|
||||
}
|
||||
@@ -764,7 +764,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
document_template,
|
||||
doc_alloc,
|
||||
new_fields_ids_map,
|
||||
);
|
||||
)
|
||||
.ignore_errors();
|
||||
|
||||
update_autogenerated(
|
||||
docid,
|
||||
@@ -850,7 +851,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
document_template,
|
||||
doc_alloc,
|
||||
new_fields_ids_map,
|
||||
);
|
||||
)
|
||||
.ignore_errors();
|
||||
|
||||
insert_autogenerated(
|
||||
docid,
|
||||
@@ -885,10 +887,10 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
|
||||
match self.kind {
|
||||
ChunkType::DocumentTemplate { document_template: _, session } => {
|
||||
session.drain(unused_vectors_distribution)?;
|
||||
session.drain(unused_vectors_distribution);
|
||||
}
|
||||
ChunkType::Fragments { fragments: _, session } => {
|
||||
session.drain(unused_vectors_distribution)?;
|
||||
session.drain(unused_vectors_distribution);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -1036,7 +1038,7 @@ where
|
||||
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
|
||||
|
||||
if let Some(new_rendered) = new_rendered {
|
||||
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)?
|
||||
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)
|
||||
} else {
|
||||
// remove any existing embedding
|
||||
OnEmbed::process_embedding_response(
|
||||
@@ -1072,7 +1074,7 @@ where
|
||||
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() },
|
||||
new_rendered,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -91,6 +91,7 @@ struct EmbedderData {
|
||||
request: RequestData,
|
||||
response: Response,
|
||||
configuration_source: ConfigurationSource,
|
||||
max_retry_duration: std::time::Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -182,10 +183,15 @@ impl Embedder {
|
||||
) -> Result<Self, NewEmbedderError> {
|
||||
let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}"));
|
||||
|
||||
let timeout = std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS")
|
||||
.ok()
|
||||
.map(|p| p.parse().unwrap())
|
||||
.unwrap_or(30);
|
||||
|
||||
let client = ureq::AgentBuilder::new()
|
||||
.max_idle_connections(REQUEST_PARALLELISM * 2)
|
||||
.max_idle_connections_per_host(REQUEST_PARALLELISM * 2)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.timeout(std::time::Duration::from_secs(timeout))
|
||||
.build();
|
||||
|
||||
let request = RequestData::new(
|
||||
@@ -196,6 +202,14 @@ impl Embedder {
|
||||
|
||||
let response = Response::new(options.response, &request)?;
|
||||
|
||||
let max_retry_duration =
|
||||
std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS")
|
||||
.ok()
|
||||
.map(|p| p.parse().unwrap())
|
||||
.unwrap_or(60);
|
||||
|
||||
let max_retry_duration = std::time::Duration::from_secs(max_retry_duration);
|
||||
|
||||
let data = EmbedderData {
|
||||
client,
|
||||
bearer,
|
||||
@@ -204,6 +218,7 @@ impl Embedder {
|
||||
response,
|
||||
configuration_source,
|
||||
headers: options.headers,
|
||||
max_retry_duration,
|
||||
};
|
||||
|
||||
let dimensions = if let Some(dimensions) = options.dimensions {
|
||||
@@ -457,7 +472,7 @@ where
|
||||
}
|
||||
}?;
|
||||
|
||||
let retry_duration = retry_duration.min(std::time::Duration::from_secs(60)); // don't wait more than a minute
|
||||
let retry_duration = retry_duration.min(data.max_retry_duration); // don't wait more than the max duration
|
||||
|
||||
// randomly up to double the retry duration
|
||||
let retry_duration = retry_duration
|
||||
|
||||
@@ -5,7 +5,7 @@ use serde_json::Value;
|
||||
use super::error::EmbedError;
|
||||
use super::{Embedder, Embedding};
|
||||
use crate::progress::EmbedderStats;
|
||||
use crate::{DocumentId, Result, ThreadPoolNoAbort};
|
||||
use crate::{DocumentId, ThreadPoolNoAbort};
|
||||
type ExtractorId = u8;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -108,32 +108,28 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
|
||||
metadata: Metadata<'doc>,
|
||||
rendered: I,
|
||||
unused_vectors_distribution: &C::ErrorMetadata,
|
||||
) -> Result<()> {
|
||||
) {
|
||||
if self.inputs.len() < self.inputs.capacity() {
|
||||
self.inputs.push(rendered);
|
||||
self.metadata.push(metadata);
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
|
||||
self.embed_chunks(unused_vectors_distribution)
|
||||
}
|
||||
|
||||
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<C> {
|
||||
self.embed_chunks(unused_vectors_distribution)?;
|
||||
Ok(self.on_embed)
|
||||
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> C {
|
||||
self.embed_chunks(unused_vectors_distribution);
|
||||
self.on_embed
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn embed_chunks(&mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<()> {
|
||||
fn embed_chunks(&mut self, _unused_vectors_distribution: &C::ErrorMetadata) {
|
||||
if self.inputs.is_empty() {
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
let res = match I::embed_ref(
|
||||
self.inputs.as_slice(),
|
||||
self.embedder,
|
||||
self.threads,
|
||||
self.embedder_stats,
|
||||
) {
|
||||
match I::embed_ref(self.inputs.as_slice(), self.embedder, self.threads, self.embedder_stats)
|
||||
{
|
||||
Ok(embeddings) => {
|
||||
for (metadata, embedding) in self.metadata.iter().copied().zip(embeddings) {
|
||||
self.on_embed.process_embedding_response(EmbeddingResponse {
|
||||
@@ -141,27 +137,37 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
|
||||
embedding: Some(embedding),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(error) => {
|
||||
// reset metadata and inputs, and send metadata to the error processing.
|
||||
let doc_alloc = self.metadata.bump();
|
||||
let metadata = std::mem::replace(
|
||||
&mut self.metadata,
|
||||
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
|
||||
tracing::warn!(
|
||||
%error,
|
||||
"error embedding batch of documents, retrying one by one"
|
||||
);
|
||||
self.inputs.clear();
|
||||
return Err(self.on_embed.process_embedding_error(
|
||||
error,
|
||||
self.embedder_name,
|
||||
unused_vectors_distribution,
|
||||
metadata,
|
||||
));
|
||||
// retry with one call per input
|
||||
for (metadata, input) in self.metadata.iter().copied().zip(self.inputs.chunks(1)) {
|
||||
match I::embed_ref(input, self.embedder, self.threads, self.embedder_stats) {
|
||||
Ok(mut embeddings) => {
|
||||
let Some(embedding) = embeddings.pop() else {
|
||||
continue;
|
||||
};
|
||||
self.on_embed.process_embedding_response(EmbeddingResponse {
|
||||
metadata,
|
||||
embedding: Some(embedding),
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
docid = metadata.external_docid,
|
||||
%err,
|
||||
"error embedding document"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
self.inputs.clear();
|
||||
self.metadata.clear();
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) fn embedder_name(&self) -> &'doc str {
|
||||
|
||||
Reference in New Issue
Block a user