mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-30 23:46:28 +00:00 
			
		
		
		
	Merge #5123
5123: Fix batch details r=dureuill a=irevoire # Pull Request ## Related issue Fixes https://github.com/meilisearch/meilisearch/issues/5079 Fixes https://github.com/meilisearch/meilisearch/issues/5112 ## What does this PR do? - Make the processing tasks actually processing in the stats of the batch instead of enqueued - Stop counting one extra task for all non-prioritized batches in the stats - Add a test Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
		| @@ -497,7 +497,6 @@ impl IndexScheduler { | ||||
|         // 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. | ||||
|         let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; | ||||
|         let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; | ||||
|         current_batch.processing(Some(&mut task)); | ||||
|  | ||||
|         // If the task is not associated with any index, verify that it is an index swap and | ||||
|         // create the batch directly. Otherwise, get the index name associated with the task | ||||
| @@ -507,6 +506,7 @@ impl IndexScheduler { | ||||
|             index_name | ||||
|         } else { | ||||
|             assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty())); | ||||
|             current_batch.processing(Some(&mut task)); | ||||
|             return Ok(Some((Batch::IndexSwap { task }, current_batch))); | ||||
|         }; | ||||
|  | ||||
|   | ||||
| @@ -4319,10 +4319,35 @@ mod tests { | ||||
|         let proc = index_scheduler.processing_tasks.read().unwrap().clone(); | ||||
|  | ||||
|         let query = Query { statuses: Some(vec![Status::Processing]), ..Default::default() }; | ||||
|         let (batches, _) = index_scheduler | ||||
|             .get_batch_ids_from_authorized_indexes(&rtxn, &proc, &query, &AuthFilter::default()) | ||||
|         let (mut batches, _) = index_scheduler | ||||
|             .get_batches_from_authorized_indexes(query.clone(), &AuthFilter::default()) | ||||
|             .unwrap(); | ||||
|         snapshot!(snapshot_bitmap(&batches), @"[0,]"); // only the processing batch in the first tick | ||||
|         assert_eq!(batches.len(), 1); | ||||
|         batches[0].started_at = OffsetDateTime::UNIX_EPOCH; | ||||
|         // Insta cannot snapshot our batches because the batch stats contains an enum as key: https://github.com/mitsuhiko/insta/issues/689 | ||||
|         let batch = serde_json::to_string_pretty(&batches[0]).unwrap(); | ||||
|         snapshot!(batch, @r#" | ||||
|         { | ||||
|           "uid": 0, | ||||
|           "details": { | ||||
|             "primaryKey": "mouse" | ||||
|           }, | ||||
|           "stats": { | ||||
|             "totalNbTasks": 1, | ||||
|             "status": { | ||||
|               "processing": 1 | ||||
|             }, | ||||
|             "types": { | ||||
|               "indexCreation": 1 | ||||
|             }, | ||||
|             "indexUids": { | ||||
|               "catto": 1 | ||||
|             } | ||||
|           }, | ||||
|           "startedAt": "1970-01-01T00:00:00Z", | ||||
|           "finishedAt": null | ||||
|         } | ||||
|         "#); | ||||
|  | ||||
|         let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() }; | ||||
|         let (batches, _) = index_scheduler | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(1): | ||||
| [1,] | ||||
| {uid: 1, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"beavero":2}}, } | ||||
| {uid: 1, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"beavero":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, batch_uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(1): | ||||
| [1,] | ||||
| {uid: 1, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"beavero":2}}, } | ||||
| {uid: 1, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"beavero":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, batch_uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"dumpUid":null}, stats: {"totalNbTasks":1,"status":{"enqueued":1},"types":{"dumpCreation":1},"indexUids":{}}, } | ||||
| {uid: 0, details: {"dumpUid":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"dumpCreation":1},"indexUids":{}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"catto":2}}, } | ||||
| {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"catto":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"catto":2}}, } | ||||
| {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"catto":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"catto":2}}, } | ||||
| {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"catto":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, } | ||||
| {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, } | ||||
| {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, } | ||||
| {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"receivedDocuments":2,"indexedDocuments":null}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"documentAdditionOrUpdate":2},"indexUids":{"doggos":2}}, } | ||||
| {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":null}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"documentAdditionOrUpdate":1},"indexUids":{"doggos":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"index_a":2}}, } | ||||
| {uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"index_a":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"index_a":2}}, } | ||||
| {uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"index_a":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [0,] | ||||
| {uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"index_a":2}}, } | ||||
| {uid: 0, details: {"primaryKey":"id"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"index_a":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "index_a", primary_key: Some("id") }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(1): | ||||
| [1,] | ||||
| {uid: 1, details: {"primaryKey":"sheep"}, stats: {"totalNbTasks":2,"status":{"enqueued":2},"types":{"indexCreation":2},"indexUids":{"doggo":2}}, } | ||||
| {uid: 1, details: {"primaryKey":"sheep"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, batch_uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ snapshot_kind: text | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing batch Some(0): | ||||
| [3,] | ||||
| {uid: 0, details: {"matchedTasks":2,"deletedTasks":null,"originalFilter":"test_query"}, stats: {"totalNbTasks":1,"status":{"enqueued":1},"types":{"taskDeletion":1},"indexUids":{}}, } | ||||
| {uid: 0, details: {"matchedTasks":2,"deletedTasks":null,"originalFilter":"test_query"}, stats: {"totalNbTasks":1,"status":{"processing":1},"types":{"taskDeletion":1},"indexUids":{}}, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} | ||||
|   | ||||
| @@ -67,7 +67,7 @@ impl ProcessingBatch { | ||||
|             task.batch_uid = Some(self.uid); | ||||
|             // We don't store the statuses in the map since they're all enqueued but we must | ||||
|             // still store them in the stats since that can be displayed. | ||||
|             *self.stats.status.entry(task.status).or_default() += 1; | ||||
|             *self.stats.status.entry(Status::Processing).or_default() += 1; | ||||
|  | ||||
|             self.kinds.insert(task.kind.as_kind()); | ||||
|             *self.stats.types.entry(task.kind.as_kind()).or_default() += 1; | ||||
|   | ||||
| @@ -52,6 +52,25 @@ impl Value { | ||||
|         } | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     /// Return `true` if the `status` field is set to `failed`. | ||||
|     /// Panic if the `status` field doesn't exists. | ||||
|     #[track_caller] | ||||
|     pub fn is_fail(&self) -> bool { | ||||
|         if !self["status"].is_string() { | ||||
|             panic!("Called `is_fail` on {}", serde_json::to_string_pretty(&self.0).unwrap()); | ||||
|         } | ||||
|         self["status"] == serde_json::Value::String(String::from("failed")) | ||||
|     } | ||||
|  | ||||
|     // Panic if the json doesn't contain the `status` field set to "succeeded" | ||||
|     #[track_caller] | ||||
|     pub fn failed(&self) -> &Self { | ||||
|         if !self.is_fail() { | ||||
|             panic!("Called failed on {}", serde_json::to_string_pretty(&self.0).unwrap()); | ||||
|         } | ||||
|         self | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<serde_json::Value> for Value { | ||||
|   | ||||
| @@ -129,11 +129,11 @@ async fn perform_on_demand_snapshot() { | ||||
|  | ||||
|     index.load_test_set().await; | ||||
|  | ||||
|     server.index("doggo").create(Some("bone")).await; | ||||
|     index.wait_task(2).await; | ||||
|     let (task, _) = server.index("doggo").create(Some("bone")).await; | ||||
|     index.wait_task(task.uid()).await.succeeded(); | ||||
|  | ||||
|     server.index("doggo").create(Some("bone")).await; | ||||
|     index.wait_task(2).await; | ||||
|     let (task, _) = server.index("doggo").create(Some("bone")).await; | ||||
|     index.wait_task(task.uid()).await.failed(); | ||||
|  | ||||
|     let (task, code) = server.create_snapshot().await; | ||||
|     snapshot!(code, @"202 Accepted"); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user