mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-30 23:46:28 +00:00 
			
		
		
		
	Implement the IndexDeletion batch operation
This commit is contained in:
		
				
					committed by
					
						 Clément Renault
						Clément Renault
					
				
			
			
				
	
			
			
			
						parent
						
							da363a92ac
						
					
				
				
					commit
					2fbdd104b8
				
			| @@ -440,7 +440,13 @@ impl IndexScheduler { | ||||
|  | ||||
|                 Ok(vec![task]) | ||||
|             } | ||||
|             Batch::IndexDeletion { index_uid, tasks } => todo!(), | ||||
|             Batch::IndexDeletion { index_uid, tasks } => { | ||||
|                 let wtxn = self.env.write_txn()?; | ||||
|                 // The write transaction is directly owned and commited here. | ||||
|                 let index = self.index_mapper.delete_index(wtxn, &index_uid)?; | ||||
|  | ||||
|                 todo!("update the tasks and mark them as succeeded"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -1,9 +1,10 @@ | ||||
| use std::collections::hash_map::Entry; | ||||
| use std::collections::HashMap; | ||||
| use std::fs; | ||||
| use std::path::PathBuf; | ||||
| use std::sync::{Arc, RwLock}; | ||||
| use std::sync::{Arc, RwLock, RwLockWriteGuard}; | ||||
| use std::{fs, thread}; | ||||
|  | ||||
| use log::error; | ||||
| use milli::Index; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| @@ -11,6 +12,7 @@ use milli::heed::types::{SerdeBincode, Str}; | ||||
| use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; | ||||
| use milli::update::IndexerConfig; | ||||
|  | ||||
| use self::IndexStatus::{Available, BeingDeleted}; | ||||
| use crate::{Error, Result}; | ||||
|  | ||||
| const INDEX_MAPPING: &str = "index-mapping"; | ||||
| @@ -19,9 +21,10 @@ const INDEX_MAPPING: &str = "index-mapping"; | ||||
| pub struct IndexMapper { | ||||
|     // Keep track of the opened indexes and is used | ||||
|     // mainly by the index resolver. | ||||
|     index_map: Arc<RwLock<HashMap<Uuid, Index>>>, | ||||
|     index_map: Arc<RwLock<HashMap<Uuid, IndexStatus>>>, | ||||
|  | ||||
|     // Map an index name with an index uuid currentl available on disk. | ||||
|     // TODO create a UUID Codec that uses the 16 bytes representation | ||||
|     // Map an index name with an index uuid currently available on disk. | ||||
|     index_mapping: Database<Str, SerdeBincode<Uuid>>, | ||||
|  | ||||
|     base_path: PathBuf, | ||||
| @@ -29,6 +32,16 @@ pub struct IndexMapper { | ||||
|     indexer_config: Arc<IndexerConfig>, | ||||
| } | ||||
|  | ||||
| /// Weither the index must not be inserted back | ||||
| /// or it is available for use. | ||||
| #[derive(Clone)] | ||||
| pub enum IndexStatus { | ||||
|     /// Do not insert it back in the index map as it is currently being deleted. | ||||
|     BeingDeleted, | ||||
|     /// You can use the index without worrying about anything. | ||||
|     Available(Index), | ||||
| } | ||||
|  | ||||
| impl IndexMapper { | ||||
|     pub fn new( | ||||
|         env: &Env, | ||||
| @@ -47,8 +60,8 @@ impl IndexMapper { | ||||
|  | ||||
|     /// Get or create the index. | ||||
|     pub fn create_index(&self, wtxn: &mut RwTxn, name: &str) -> Result<Index> { | ||||
|         let index = match self.index(wtxn, name) { | ||||
|             Ok(index) => index, | ||||
|         match self.index(wtxn, name) { | ||||
|             Ok(index) => Ok(index), | ||||
|             Err(Error::IndexNotFound(_)) => { | ||||
|                 let uuid = Uuid::new_v4(); | ||||
|                 self.index_mapping.put(wtxn, name, &uuid)?; | ||||
| @@ -57,12 +70,60 @@ impl IndexMapper { | ||||
|                 fs::create_dir_all(&index_path)?; | ||||
|                 let mut options = EnvOpenOptions::new(); | ||||
|                 options.map_size(self.index_size); | ||||
|                 milli::Index::new(options, &index_path)? | ||||
|                 Ok(milli::Index::new(options, &index_path)?) | ||||
|             } | ||||
|             error => return error, | ||||
|             error => error, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Removes the index from the mapping table and the in-memory index map | ||||
|     /// but keeps the associated tasks. | ||||
|     pub fn delete_index(&self, mut wtxn: RwTxn, name: &str) -> Result<()> { | ||||
|         let uuid = self | ||||
|             .index_mapping | ||||
|             .get(&wtxn, name)? | ||||
|             .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; | ||||
|  | ||||
|         // Once we retrieved the UUID of the index we remove it from the mapping table. | ||||
|         assert!(self.index_mapping.delete(&mut wtxn, name)?); | ||||
|  | ||||
|         wtxn.commit()?; | ||||
|  | ||||
|         // We remove the index from the in-memory index map. | ||||
|         let mut lock = self.index_map.write().unwrap(); | ||||
|         let closing_event = match lock.insert(uuid, BeingDeleted) { | ||||
|             Some(Available(index)) => Some(index.prepare_for_closing()), | ||||
|             _ => None, | ||||
|         }; | ||||
|  | ||||
|         Ok(index) | ||||
|         drop(lock); | ||||
|  | ||||
|         let index_map = self.index_map.clone(); | ||||
|         let index_path = self.base_path.join(uuid.to_string()); | ||||
|         let index_name = name.to_string(); | ||||
|         thread::spawn(move || { | ||||
|             // We first wait to be sure that the previously opened index is effectively closed. | ||||
|             // This can take a lot of time, this is why we do that in a seperate thread. | ||||
|             if let Some(closing_event) = closing_event { | ||||
|                 closing_event.wait(); | ||||
|             } | ||||
|  | ||||
|             // Then we remove the content from disk. | ||||
|             if let Err(e) = fs::remove_dir_all(&index_path) { | ||||
|                 error!( | ||||
|                     "An error happened when deleting the index {} ({}): {}", | ||||
|                     index_name, uuid, e | ||||
|                 ); | ||||
|             } | ||||
|  | ||||
|             // Finally we remove the entry from the index map. | ||||
|             assert!(matches!( | ||||
|                 index_map.write().unwrap().remove(&uuid), | ||||
|                 Some(BeingDeleted) | ||||
|             )); | ||||
|         }); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     /// Return an index, may open it if it wasn't already opened. | ||||
| @@ -75,7 +136,8 @@ impl IndexMapper { | ||||
|         // we clone here to drop the lock before entering the match | ||||
|         let index = self.index_map.read().unwrap().get(&uuid).cloned(); | ||||
|         let index = match index { | ||||
|             Some(index) => index, | ||||
|             Some(Available(index)) => index, | ||||
|             Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())), | ||||
|             // since we're lazy, it's possible that the index has not been opened yet. | ||||
|             None => { | ||||
|                 let mut index_map = self.index_map.write().unwrap(); | ||||
| @@ -92,11 +154,13 @@ impl IndexMapper { | ||||
|                         let mut options = EnvOpenOptions::new(); | ||||
|                         options.map_size(self.index_size); | ||||
|                         let index = milli::Index::new(options, &index_path)?; | ||||
|  | ||||
|                         entry.insert(index.clone()); | ||||
|                         entry.insert(Available(index.clone())); | ||||
|                         index | ||||
|                     } | ||||
|                     Entry::Occupied(entry) => entry.get().clone(), | ||||
|                     Entry::Occupied(entry) => match entry.get() { | ||||
|                         Available(index) => index.clone(), | ||||
|                         BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), | ||||
|                     }, | ||||
|                 } | ||||
|             } | ||||
|         }; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user