mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 21:16:28 +00:00 
			
		
		
		
	implements the automatic batch deletion
This commit is contained in:
		| @@ -17,13 +17,14 @@ tasks individually, but should be much faster since we are only performing | ||||
| one indexing operation. | ||||
| */ | ||||
|  | ||||
| use std::collections::{BTreeSet, HashSet}; | ||||
| use std::collections::{BTreeSet, HashMap, HashSet}; | ||||
| use std::ffi::OsStr; | ||||
| use std::fmt; | ||||
| use std::fs::{self, File}; | ||||
| use std::io::BufWriter; | ||||
|  | ||||
| use dump::IndexMetadata; | ||||
| use meilisearch_types::batches::BatchId; | ||||
| use meilisearch_types::error::Code; | ||||
| use meilisearch_types::heed::{RoTxn, RwTxn}; | ||||
| use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; | ||||
| @@ -1675,6 +1676,8 @@ impl IndexScheduler { | ||||
|         let mut affected_statuses = HashSet::new(); | ||||
|         let mut affected_kinds = HashSet::new(); | ||||
|         let mut affected_canceled_by = RoaringBitmap::new(); | ||||
|         // The tasks that have been removed *per batches*. | ||||
|         let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new(); | ||||
|  | ||||
|         for task_id in to_delete_tasks.iter() { | ||||
|             let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; | ||||
| @@ -1696,18 +1699,21 @@ impl IndexScheduler { | ||||
|             if let Some(canceled_by) = task.canceled_by { | ||||
|                 affected_canceled_by.insert(canceled_by); | ||||
|             } | ||||
|             if let Some(batch_uid) = task.batch_uid { | ||||
|                 affected_batches.entry(batch_uid).or_default().insert(task_id); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         for index in affected_indexes { | ||||
|             self.update_index(wtxn, &index, |bitmap| *bitmap -= &to_delete_tasks)?; | ||||
|         for index in affected_indexes.iter() { | ||||
|             self.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?; | ||||
|         } | ||||
|  | ||||
|         for status in affected_statuses { | ||||
|             self.update_status(wtxn, status, |bitmap| *bitmap -= &to_delete_tasks)?; | ||||
|         for status in affected_statuses.iter() { | ||||
|             self.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?; | ||||
|         } | ||||
|  | ||||
|         for kind in affected_kinds { | ||||
|             self.update_kind(wtxn, kind, |bitmap| *bitmap -= &to_delete_tasks)?; | ||||
|         for kind in affected_kinds.iter() { | ||||
|             self.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?; | ||||
|         } | ||||
|  | ||||
|         for task in to_delete_tasks.iter() { | ||||
| @@ -1723,6 +1729,48 @@ impl IndexScheduler { | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         for (batch_id, to_delete_tasks) in affected_batches { | ||||
|             if let Some(mut tasks) = self.batch_to_tasks_mapping.get(wtxn, &batch_id)? { | ||||
|                 tasks -= &to_delete_tasks; | ||||
|                 // We must remove the batch entirely | ||||
|                 if tasks.is_empty() { | ||||
|                     self.all_batches.delete(wtxn, &batch_id)?; | ||||
|                     self.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; | ||||
|                 } | ||||
|                 // Anyway, we must remove the batch from all its reverse indexes. | ||||
|                 // The only way to do that is to check | ||||
|  | ||||
|                 for index in affected_indexes.iter() { | ||||
|                     let index_tasks = self.index_tasks(wtxn, index)?; | ||||
|                     let remaining_index_tasks = index_tasks & &tasks; | ||||
|                     if remaining_index_tasks.is_empty() { | ||||
|                         self.update_batch_index(wtxn, index, |bitmap| { | ||||
|                             bitmap.remove(batch_id); | ||||
|                         })?; | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 for status in affected_statuses.iter() { | ||||
|                     let status_tasks = self.get_status(wtxn, *status)?; | ||||
|                     let remaining_status_tasks = status_tasks & &tasks; | ||||
|                     if remaining_status_tasks.is_empty() { | ||||
|                         self.update_batch_status(wtxn, *status, |bitmap| { | ||||
|                             bitmap.remove(batch_id); | ||||
|                         })?; | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 for kind in affected_kinds.iter() { | ||||
|                     let kind_tasks = self.get_kind(wtxn, *kind)?; | ||||
|                     let remaining_kind_tasks = kind_tasks & &tasks; | ||||
|                     if remaining_kind_tasks.is_empty() { | ||||
|                         self.update_batch_kind(wtxn, *kind, |bitmap| { | ||||
|                             bitmap.remove(batch_id); | ||||
|                         })?; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(to_delete_tasks) | ||||
|     } | ||||
|   | ||||
| @@ -41,22 +41,19 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} } | ||||
| [timestamp] [2,3,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Batches: | ||||
| 0 {uid: 0, } | ||||
| 1 {uid: 1, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batch to tasks mapping: | ||||
| 0 [0,] | ||||
| 1 [2,3,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batches Status: | ||||
| succeeded [0,1,] | ||||
| succeeded [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batches Kind: | ||||
| "documentAdditionOrUpdate" [0,] | ||||
| "documentAdditionOrUpdate" [] | ||||
| "taskDeletion" [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batches Index Tasks: | ||||
| catto [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batches Enqueued At: | ||||
| [timestamp] [0,] | ||||
|   | ||||
| @@ -39,22 +39,19 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} } | ||||
| [timestamp] [2,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Batches: | ||||
| 0 {uid: 0, } | ||||
| 1 {uid: 1, } | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batch to tasks mapping: | ||||
| 0 [0,] | ||||
| 1 [2,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batches Status: | ||||
| succeeded [0,1,] | ||||
| succeeded [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batches Kind: | ||||
| "documentAdditionOrUpdate" [0,] | ||||
| "documentAdditionOrUpdate" [] | ||||
| "taskDeletion" [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batches Index Tasks: | ||||
| catto [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Batches Enqueued At: | ||||
| [timestamp] [0,] | ||||
|   | ||||
		Reference in New Issue
	
	Block a user