mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Compare commits
6 Commits
c19fa9f1e1
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d7cb4c3d1 | ||
|
|
a1d35d9e52 | ||
|
|
9f17ab51d4 | ||
|
|
5ecb4eb79e | ||
|
|
0a91c091c6 | ||
|
|
47a15fbe24 |
@@ -1192,12 +1192,12 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
|
||||
Metadata { docid, external_docid: "", extractor_id },
|
||||
value,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// send last chunk
|
||||
let on_embed = session.drain(unused_vectors_distribution)?;
|
||||
let on_embed = session.drain(unused_vectors_distribution);
|
||||
on_embed.finish()
|
||||
}
|
||||
|
||||
|
||||
@@ -684,7 +684,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
metadata,
|
||||
input,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
ExtractorDiff::Unchanged => { /* nothing to do */ }
|
||||
}
|
||||
@@ -724,7 +724,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
if old_is_user_provided || full_reindex {
|
||||
session.on_embed_mut().clear_vectors(docid);
|
||||
}
|
||||
session.request_embedding(metadata, input, unused_vectors_distribution)?;
|
||||
session.request_embedding(metadata, input, unused_vectors_distribution);
|
||||
}
|
||||
ExtractorDiff::Unchanged => { /* do nothing */ }
|
||||
}
|
||||
@@ -764,7 +764,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
document_template,
|
||||
doc_alloc,
|
||||
new_fields_ids_map,
|
||||
);
|
||||
)
|
||||
.ignore_errors();
|
||||
|
||||
update_autogenerated(
|
||||
docid,
|
||||
@@ -850,7 +851,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
document_template,
|
||||
doc_alloc,
|
||||
new_fields_ids_map,
|
||||
);
|
||||
)
|
||||
.ignore_errors();
|
||||
|
||||
insert_autogenerated(
|
||||
docid,
|
||||
@@ -885,10 +887,10 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
|
||||
match self.kind {
|
||||
ChunkType::DocumentTemplate { document_template: _, session } => {
|
||||
session.drain(unused_vectors_distribution)?;
|
||||
session.drain(unused_vectors_distribution);
|
||||
}
|
||||
ChunkType::Fragments { fragments: _, session } => {
|
||||
session.drain(unused_vectors_distribution)?;
|
||||
session.drain(unused_vectors_distribution);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -1036,7 +1038,7 @@ where
|
||||
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
|
||||
|
||||
if let Some(new_rendered) = new_rendered {
|
||||
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)?
|
||||
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)
|
||||
} else {
|
||||
// remove any existing embedding
|
||||
OnEmbed::process_embedding_response(
|
||||
@@ -1072,7 +1074,7 @@ where
|
||||
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() },
|
||||
new_rendered,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -91,6 +91,7 @@ struct EmbedderData {
|
||||
request: RequestData,
|
||||
response: Response,
|
||||
configuration_source: ConfigurationSource,
|
||||
max_retry_duration: std::time::Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -182,10 +183,15 @@ impl Embedder {
|
||||
) -> Result<Self, NewEmbedderError> {
|
||||
let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}"));
|
||||
|
||||
let timeout = std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS")
|
||||
.ok()
|
||||
.map(|p| p.parse().unwrap())
|
||||
.unwrap_or(30);
|
||||
|
||||
let client = ureq::AgentBuilder::new()
|
||||
.max_idle_connections(REQUEST_PARALLELISM * 2)
|
||||
.max_idle_connections_per_host(REQUEST_PARALLELISM * 2)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.timeout(std::time::Duration::from_secs(timeout))
|
||||
.build();
|
||||
|
||||
let request = RequestData::new(
|
||||
@@ -196,6 +202,14 @@ impl Embedder {
|
||||
|
||||
let response = Response::new(options.response, &request)?;
|
||||
|
||||
let max_retry_duration =
|
||||
std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS")
|
||||
.ok()
|
||||
.map(|p| p.parse().unwrap())
|
||||
.unwrap_or(60);
|
||||
|
||||
let max_retry_duration = std::time::Duration::from_secs(max_retry_duration);
|
||||
|
||||
let data = EmbedderData {
|
||||
client,
|
||||
bearer,
|
||||
@@ -204,6 +218,7 @@ impl Embedder {
|
||||
response,
|
||||
configuration_source,
|
||||
headers: options.headers,
|
||||
max_retry_duration,
|
||||
};
|
||||
|
||||
let dimensions = if let Some(dimensions) = options.dimensions {
|
||||
@@ -457,7 +472,7 @@ where
|
||||
}
|
||||
}?;
|
||||
|
||||
let retry_duration = retry_duration.min(std::time::Duration::from_secs(60)); // don't wait more than a minute
|
||||
let retry_duration = retry_duration.min(data.max_retry_duration); // don't wait more than the max duration
|
||||
|
||||
// randomly up to double the retry duration
|
||||
let retry_duration = retry_duration
|
||||
|
||||
@@ -5,7 +5,7 @@ use serde_json::Value;
|
||||
use super::error::EmbedError;
|
||||
use super::{Embedder, Embedding};
|
||||
use crate::progress::EmbedderStats;
|
||||
use crate::{DocumentId, Result, ThreadPoolNoAbort};
|
||||
use crate::{DocumentId, ThreadPoolNoAbort};
|
||||
type ExtractorId = u8;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -108,32 +108,28 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
|
||||
metadata: Metadata<'doc>,
|
||||
rendered: I,
|
||||
unused_vectors_distribution: &C::ErrorMetadata,
|
||||
) -> Result<()> {
|
||||
) {
|
||||
if self.inputs.len() < self.inputs.capacity() {
|
||||
self.inputs.push(rendered);
|
||||
self.metadata.push(metadata);
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
|
||||
self.embed_chunks(unused_vectors_distribution)
|
||||
}
|
||||
|
||||
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<C> {
|
||||
self.embed_chunks(unused_vectors_distribution)?;
|
||||
Ok(self.on_embed)
|
||||
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> C {
|
||||
self.embed_chunks(unused_vectors_distribution);
|
||||
self.on_embed
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn embed_chunks(&mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<()> {
|
||||
fn embed_chunks(&mut self, _unused_vectors_distribution: &C::ErrorMetadata) {
|
||||
if self.inputs.is_empty() {
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
let res = match I::embed_ref(
|
||||
self.inputs.as_slice(),
|
||||
self.embedder,
|
||||
self.threads,
|
||||
self.embedder_stats,
|
||||
) {
|
||||
match I::embed_ref(self.inputs.as_slice(), self.embedder, self.threads, self.embedder_stats)
|
||||
{
|
||||
Ok(embeddings) => {
|
||||
for (metadata, embedding) in self.metadata.iter().copied().zip(embeddings) {
|
||||
self.on_embed.process_embedding_response(EmbeddingResponse {
|
||||
@@ -141,27 +137,37 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
|
||||
embedding: Some(embedding),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(error) => {
|
||||
// reset metadata and inputs, and send metadata to the error processing.
|
||||
let doc_alloc = self.metadata.bump();
|
||||
let metadata = std::mem::replace(
|
||||
&mut self.metadata,
|
||||
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
|
||||
tracing::warn!(
|
||||
%error,
|
||||
"error embedding batch of documents, retrying one by one"
|
||||
);
|
||||
self.inputs.clear();
|
||||
return Err(self.on_embed.process_embedding_error(
|
||||
error,
|
||||
self.embedder_name,
|
||||
unused_vectors_distribution,
|
||||
metadata,
|
||||
));
|
||||
// retry with one call per input
|
||||
for (metadata, input) in self.metadata.iter().copied().zip(self.inputs.chunks(1)) {
|
||||
match I::embed_ref(input, self.embedder, self.threads, self.embedder_stats) {
|
||||
Ok(mut embeddings) => {
|
||||
let Some(embedding) = embeddings.pop() else {
|
||||
continue;
|
||||
};
|
||||
self.on_embed.process_embedding_response(EmbeddingResponse {
|
||||
metadata,
|
||||
embedding: Some(embedding),
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
docid = metadata.external_docid,
|
||||
%err,
|
||||
"error embedding document"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
self.inputs.clear();
|
||||
self.metadata.clear();
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) fn embedder_name(&self) -> &'doc str {
|
||||
|
||||
Reference in New Issue
Block a user