mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-23 05:06:11 +00:00
Compare commits
4 Commits
prototype-
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f17ab51d4 | ||
|
|
5ecb4eb79e | ||
|
|
0a91c091c6 | ||
|
|
47a15fbe24 |
@@ -1192,12 +1192,12 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
|
||||
Metadata { docid, external_docid: "", extractor_id },
|
||||
value,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// send last chunk
|
||||
let on_embed = session.drain(unused_vectors_distribution)?;
|
||||
let on_embed = session.drain(unused_vectors_distribution);
|
||||
on_embed.finish()
|
||||
}
|
||||
|
||||
|
||||
@@ -684,7 +684,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
metadata,
|
||||
input,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
ExtractorDiff::Unchanged => { /* nothing to do */ }
|
||||
}
|
||||
@@ -724,7 +724,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
if old_is_user_provided || full_reindex {
|
||||
session.on_embed_mut().clear_vectors(docid);
|
||||
}
|
||||
session.request_embedding(metadata, input, unused_vectors_distribution)?;
|
||||
session.request_embedding(metadata, input, unused_vectors_distribution);
|
||||
}
|
||||
ExtractorDiff::Unchanged => { /* do nothing */ }
|
||||
}
|
||||
@@ -764,7 +764,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
document_template,
|
||||
doc_alloc,
|
||||
new_fields_ids_map,
|
||||
);
|
||||
)
|
||||
.ignore_errors();
|
||||
|
||||
update_autogenerated(
|
||||
docid,
|
||||
@@ -850,7 +851,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
document_template,
|
||||
doc_alloc,
|
||||
new_fields_ids_map,
|
||||
);
|
||||
)
|
||||
.ignore_errors();
|
||||
|
||||
insert_autogenerated(
|
||||
docid,
|
||||
@@ -885,10 +887,10 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
|
||||
match self.kind {
|
||||
ChunkType::DocumentTemplate { document_template: _, session } => {
|
||||
session.drain(unused_vectors_distribution)?;
|
||||
session.drain(unused_vectors_distribution);
|
||||
}
|
||||
ChunkType::Fragments { fragments: _, session } => {
|
||||
session.drain(unused_vectors_distribution)?;
|
||||
session.drain(unused_vectors_distribution);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -1036,7 +1038,7 @@ where
|
||||
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
|
||||
|
||||
if let Some(new_rendered) = new_rendered {
|
||||
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)?
|
||||
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)
|
||||
} else {
|
||||
// remove any existing embedding
|
||||
OnEmbed::process_embedding_response(
|
||||
@@ -1072,7 +1074,7 @@ where
|
||||
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() },
|
||||
new_rendered,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ use serde_json::Value;
|
||||
use super::error::EmbedError;
|
||||
use super::{Embedder, Embedding};
|
||||
use crate::progress::EmbedderStats;
|
||||
use crate::{DocumentId, Result, ThreadPoolNoAbort};
|
||||
use crate::{DocumentId, ThreadPoolNoAbort};
|
||||
type ExtractorId = u8;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -108,32 +108,28 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
|
||||
metadata: Metadata<'doc>,
|
||||
rendered: I,
|
||||
unused_vectors_distribution: &C::ErrorMetadata,
|
||||
) -> Result<()> {
|
||||
) {
|
||||
if self.inputs.len() < self.inputs.capacity() {
|
||||
self.inputs.push(rendered);
|
||||
self.metadata.push(metadata);
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
|
||||
self.embed_chunks(unused_vectors_distribution)
|
||||
}
|
||||
|
||||
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<C> {
|
||||
self.embed_chunks(unused_vectors_distribution)?;
|
||||
Ok(self.on_embed)
|
||||
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> C {
|
||||
self.embed_chunks(unused_vectors_distribution);
|
||||
self.on_embed
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn embed_chunks(&mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<()> {
|
||||
fn embed_chunks(&mut self, _unused_vectors_distribution: &C::ErrorMetadata) {
|
||||
if self.inputs.is_empty() {
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
let res = match I::embed_ref(
|
||||
self.inputs.as_slice(),
|
||||
self.embedder,
|
||||
self.threads,
|
||||
self.embedder_stats,
|
||||
) {
|
||||
match I::embed_ref(self.inputs.as_slice(), self.embedder, self.threads, self.embedder_stats)
|
||||
{
|
||||
Ok(embeddings) => {
|
||||
for (metadata, embedding) in self.metadata.iter().copied().zip(embeddings) {
|
||||
self.on_embed.process_embedding_response(EmbeddingResponse {
|
||||
@@ -141,27 +137,37 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
|
||||
embedding: Some(embedding),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(error) => {
|
||||
// reset metadata and inputs, and send metadata to the error processing.
|
||||
let doc_alloc = self.metadata.bump();
|
||||
let metadata = std::mem::replace(
|
||||
&mut self.metadata,
|
||||
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
|
||||
tracing::warn!(
|
||||
%error,
|
||||
"error embedding batch of documents, retrying one by one"
|
||||
);
|
||||
self.inputs.clear();
|
||||
return Err(self.on_embed.process_embedding_error(
|
||||
error,
|
||||
self.embedder_name,
|
||||
unused_vectors_distribution,
|
||||
metadata,
|
||||
));
|
||||
// retry with one call per input
|
||||
for (metadata, input) in self.metadata.iter().copied().zip(self.inputs.chunks(1)) {
|
||||
match I::embed_ref(input, self.embedder, self.threads, self.embedder_stats) {
|
||||
Ok(mut embeddings) => {
|
||||
let Some(embedding) = embeddings.pop() else {
|
||||
continue;
|
||||
};
|
||||
self.on_embed.process_embedding_response(EmbeddingResponse {
|
||||
metadata,
|
||||
embedding: Some(embedding),
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
docid = metadata.external_docid,
|
||||
%err,
|
||||
"error embedding document"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
self.inputs.clear();
|
||||
self.metadata.clear();
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) fn embedder_name(&self) -> &'doc str {
|
||||
|
||||
Reference in New Issue
Block a user