Compare commits

...

15 Commits

Author SHA1 Message Date
94b43001db Merge pull request #5492 from meilisearch/accept-cancelation-tasks-when-disk-full
make meilisearch accept cancelation tasks even when the disk is full
2025-04-03 15:46:46 +00:00
796a325972 Fix typos
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-04-03 15:53:42 +02:00
1db550ec7f make meilisearch accept cancelation tasks even when the disk is full 2025-04-03 15:47:56 +02:00
418fa47963 Merge pull request #5313 from barloes/fixRankingScoreThresholdRankingIssue
fix for rankingScoreThreshold changes the results' ranking
2025-04-01 13:10:55 +00:00
0656a0d515 Optimize roaring operation
Co-authored-by: Many the fish <many@meilisearch.com>
2025-04-01 14:25:27 +02:00
e36a8c50b9 Merge pull request #5478 from meilisearch/enforce-embedding-dimensions
Enforce embedding dimensions
2025-03-31 15:31:29 +00:00
08ff135ad6 Fix test 2025-03-31 15:27:49 +02:00
f729864466 Check dimension mismatch at insertion time 2025-03-31 15:27:49 +02:00
94ea263bef Add new error for dimensions mismatch during indexing 2025-03-31 15:27:49 +02:00
0e475cb5e6 fix warn and show what meilisearch understood of the vectors in the cursed test 2025-03-31 13:49:22 +02:00
62de70b73c Document problematic case in test and acknowledge PR comment 2025-03-31 13:49:22 +02:00
7707fb18dd add embedding with dimension mismatch test case 2025-03-31 13:49:22 +02:00
f9807ba32e Fix logic when results are below the threshold 2025-03-19 11:34:53 +01:00
8c8cc59a6c remove new line added by accident 2025-03-19 11:34:53 +01:00
f540a69ac3 add 1 to index so it points to correct position 2025-03-19 11:34:52 +01:00
8 changed files with 173 additions and 28 deletions

View File

@ -625,8 +625,8 @@ impl IndexScheduler {
task_id: Option<TaskId>,
dry_run: bool,
) -> Result<Task> {
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
// if the task doesn't delete or cancel anything and 40% of the task queue is full, we must refuse to enqueue the incoming task
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } | KindWithContent::TaskCancelation { tasks, .. } if !tasks.is_empty())
&& (self.env.non_free_pages_size()? * 100) / self.env.info().map_size as u64 > 40
{
return Err(Error::NoSpaceLeftInTaskQueue);

View File

@ -292,8 +292,6 @@ impl Queue {
return Ok(task);
}
// Get rid of the mutability.
let task = task;
self.tasks.register(wtxn, &task)?;
Ok(task)

View File

@ -364,7 +364,7 @@ fn test_task_queue_is_full() {
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
// Even the task deletion that doesn't delete anything shouldn't be accepted
// Even the task deletion and cancelation that don't delete anything should be refused
let result = index_scheduler
.register(
KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() },
@ -373,10 +373,39 @@ fn test_task_queue_is_full() {
)
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
let result = index_scheduler
.register(
KindWithContent::TaskCancelation { query: S("test"), tasks: RoaringBitmap::new() },
None,
false,
)
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
// But a task deletion that delete something should works
// But a task cancelation that cancel something should work
index_scheduler
.register(
KindWithContent::TaskCancelation { query: S("test"), tasks: (0..100).collect() },
None,
false,
)
.unwrap();
handle.advance_one_successful_batch();
// But we should still be forbidden from enqueuing new tasks
let result = index_scheduler
.register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None,
false,
)
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
// And a task deletion that delete something should works
index_scheduler
.register(
KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() },

View File

@ -454,7 +454,10 @@ impl ErrorCode for milli::Error {
}
UserError::CriterionError(_) => Code::InvalidSettingsRankingRules,
UserError::InvalidGeoField { .. } => Code::InvalidDocumentGeoField,
UserError::InvalidVectorDimensions { .. } => Code::InvalidVectorDimensions,
UserError::InvalidVectorDimensions { .. }
| UserError::InvalidIndexingVectorDimensions { .. } => {
Code::InvalidVectorDimensions
}
UserError::InvalidVectorsMapType { .. }
| UserError::InvalidVectorsEmbedderConf { .. } => Code::InvalidVectorsType,
UserError::TooManyVectors(_, _) => Code::TooManyVectors,

View File

@ -164,6 +164,87 @@ async fn add_remove_user_provided() {
"###);
}
#[actix_rt::test]
async fn user_provide_mismatched_embedding_dimension() {
let server = Server::new().await;
let index = server.index("doggo");
let (response, code) = index
.update_settings(json!({
"embedders": {
"manual": {
"source": "userProvided",
"dimensions": 3,
}
},
}))
.await;
snapshot!(code, @"202 Accepted");
server.wait_task(response.uid()).await.succeeded();
let documents = json!([
{"id": 0, "name": "kefir", "_vectors": { "manual": [0, 0] }},
]);
let (value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
let task = index.wait_task(value.uid()).await;
snapshot!(task, @r###"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "doggo",
"status": "failed",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 0
},
"error": {
"message": "Index `doggo`: Invalid vector dimensions in document with id `0` in `._vectors.manual`.\n - note: embedding #0 has dimensions 2\n - note: embedder `manual` requires 3",
"code": "invalid_vector_dimensions",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vector_dimensions"
},
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###);
let new_document = json!([
{"id": 0, "name": "kefir", "_vectors": { "manual": [[0, 0], [1, 1], [2, 2]] }},
]);
let (response, code) = index.add_documents(new_document, None).await;
snapshot!(code, @"202 Accepted");
let task = index.wait_task(response.uid()).await;
snapshot!(task, @r###"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "doggo",
"status": "failed",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 0
},
"error": {
"message": "Index `doggo`: Invalid vector dimensions in document with id `0` in `._vectors.manual`.\n - note: embedding #0 has dimensions 2\n - note: embedder `manual` requires 3",
"code": "invalid_vector_dimensions",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vector_dimensions"
},
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###);
}
async fn generate_default_user_provided_documents(server: &Server) -> Index {
let index = server.index("doggo");

View File

@ -129,6 +129,14 @@ and can not be more than 511 bytes.", .document_id.to_string()
InvalidGeoField(#[from] GeoError),
#[error("Invalid vector dimensions: expected: `{}`, found: `{}`.", .expected, .found)]
InvalidVectorDimensions { expected: usize, found: usize },
#[error("Invalid vector dimensions in document with id `{document_id}` in `._vectors.{embedder_name}`.\n - note: embedding #{embedding_index} has dimensions {found}\n - note: embedder `{embedder_name}` requires {expected}")]
InvalidIndexingVectorDimensions {
embedder_name: String,
document_id: String,
embedding_index: usize,
expected: usize,
found: usize,
},
#[error("The `_vectors` field in the document with id: `{document_id}` is not an object. Was expecting an object with a key for each embedder with manually provided vectors, but instead got `{value}`")]
InvalidVectorsMapType { document_id: String, value: Value },
#[error("Bad embedder configuration in the document with id: `{document_id}`. {error}")]

View File

@ -173,16 +173,18 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
ranking_rule_scores.push(ScoreDetails::Skipped);
// remove candidates from the universe without adding them to result if their score is below the threshold
if let Some(ranking_score_threshold) = ranking_score_threshold {
let is_below_threshold =
ranking_score_threshold.is_some_and(|ranking_score_threshold| {
let current_score = ScoreDetails::global_score(ranking_rule_scores.iter());
if current_score < ranking_score_threshold {
all_candidates -= bucket | &ranking_rule_universes[cur_ranking_rule_index];
back!();
continue;
}
}
current_score < ranking_score_threshold
});
if is_below_threshold {
all_candidates -= &bucket;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(bucket);
}
ranking_rule_scores.pop();
@ -237,23 +239,24 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
);
// remove candidates from the universe without adding them to result if their score is below the threshold
if let Some(ranking_score_threshold) = ranking_score_threshold {
let is_below_threshold = ranking_score_threshold.is_some_and(|ranking_score_threshold| {
let current_score = ScoreDetails::global_score(ranking_rule_scores.iter());
if current_score < ranking_score_threshold {
all_candidates -=
next_bucket.candidates | &ranking_rule_universes[cur_ranking_rule_index];
back!();
continue;
}
}
current_score < ranking_score_threshold
});
ranking_rule_universes[cur_ranking_rule_index] -= &next_bucket.candidates;
if cur_ranking_rule_index == ranking_rules_len - 1
|| (scoring_strategy == ScoringStrategy::Skip && next_bucket.candidates.len() <= 1)
|| cur_offset + (next_bucket.candidates.len() as usize) < from
|| is_below_threshold
{
if is_below_threshold {
all_candidates -= &next_bucket.candidates;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(next_bucket.candidates);
}
ranking_rule_scores.pop();
continue;
}

View File

@ -121,6 +121,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
// do we have set embeddings?
if let Some(embeddings) = new_vectors.embeddings {
chunks.set_vectors(
update.external_document_id(),
update.docid(),
embeddings
.into_vec(&context.doc_alloc, embedder_name)
@ -128,7 +129,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
document_id: update.external_document_id().to_string(),
error: error.to_string(),
})?,
);
)?;
} else if new_vectors.regenerate {
let new_rendered = prompt.render_document(
update.external_document_id(),
@ -209,6 +210,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
chunks.set_regenerate(insertion.docid(), new_vectors.regenerate);
if let Some(embeddings) = new_vectors.embeddings {
chunks.set_vectors(
insertion.external_document_id(),
insertion.docid(),
embeddings
.into_vec(&context.doc_alloc, embedder_name)
@ -218,7 +220,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
.to_string(),
error: error.to_string(),
})?,
);
)?;
} else if new_vectors.regenerate {
let rendered = prompt.render_document(
insertion.external_document_id(),
@ -273,6 +275,7 @@ struct Chunks<'a, 'b, 'extractor> {
embedder: &'a Embedder,
embedder_id: u8,
embedder_name: &'a str,
dimensions: usize,
prompt: &'a Prompt,
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
@ -297,6 +300,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
let texts = BVec::with_capacity_in(capacity, doc_alloc);
let ids = BVec::with_capacity_in(capacity, doc_alloc);
let dimensions = embedder.dimensions();
Self {
texts,
ids,
@ -309,6 +313,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
embedder_name,
user_provided,
has_manual_generation: None,
dimensions,
}
}
@ -490,7 +495,25 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
}
}
fn set_vectors(&self, docid: DocumentId, embeddings: Vec<Embedding>) {
fn set_vectors(
&self,
external_docid: &'a str,
docid: DocumentId,
embeddings: Vec<Embedding>,
) -> Result<()> {
for (embedding_index, embedding) in embeddings.iter().enumerate() {
if embedding.len() != self.dimensions {
return Err(UserError::InvalidIndexingVectorDimensions {
expected: self.dimensions,
found: embedding.len(),
embedder_name: self.embedder_name.to_string(),
document_id: external_docid.to_string(),
embedding_index,
}
.into());
}
}
self.sender.set_vectors(docid, self.embedder_id, embeddings).unwrap();
Ok(())
}
}