From de771a8bd726cf713bb1b153eab7a1abe71223a5 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 11 Jan 2023 17:34:46 +0100 Subject: [PATCH] Use LRU cache --- index-scheduler/src/index_mapper.rs | 243 ++++++++++++++++++++-------- 1 file changed, 177 insertions(+), 66 deletions(-) diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index d1fe7c57d..13ffb0dc3 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -1,5 +1,3 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::{fs, thread}; @@ -14,6 +12,7 @@ use time::OffsetDateTime; use uuid::Uuid; use self::IndexStatus::{Available, BeingDeleted, BeingResized}; +use crate::lru::{InsertionOutcome, LruMap}; use crate::uuid_codec::UuidCodec; use crate::{clamp_to_page_size, Error, Result}; @@ -29,7 +28,7 @@ const INDEX_MAPPING: &str = "index-mapping"; #[derive(Clone)] pub struct IndexMapper { /// Keep track of the opened indexes. Used mainly by the index resolver. - index_map: Arc>>, + index_map: Arc>, /// Map an index name with an index uuid currently available on disk. pub(crate) index_mapping: Database, @@ -40,6 +39,122 @@ pub struct IndexMapper { pub indexer_config: Arc, } +struct IndexMap { + unavailable: Vec<(Uuid, Option>)>, + available: LruMap, +} + +impl IndexMap { + pub fn new(cap: usize) -> IndexMap { + Self { unavailable: Vec::new(), available: LruMap::new(cap) } + } + + pub fn get(&self, uuid: &Uuid) -> Option { + self.get_if_unavailable(uuid) + .map(|signal| { + if let Some(signal) = signal { + IndexStatus::BeingResized(signal) + } else { + IndexStatus::BeingDeleted + } + }) + .or_else(|| self.available.get(uuid).map(|index| IndexStatus::Available(index.clone()))) + } + + /// Inserts a new index as available + /// + /// # Panics + /// + /// - If the index is already present, but currently unavailable. + pub fn insert(&mut self, uuid: &Uuid, index: Index) -> InsertionOutcome { + assert!( + matches!(self.get_if_unavailable(uuid), None), + "Attempted to insert an index that was not available" + ); + + self.available.insert(*uuid, index) + } + + /// Begins a resize operation. + /// + /// Returns `None` if the index is already unavailable, or not present at all. + pub fn start_resize(&mut self, uuid: &Uuid, signal: Arc) -> Option { + if self.get_if_unavailable(uuid).is_some() { + return None; + } + + let index = self.available.remove(uuid)?; + self.unavailable.push((*uuid, Some(signal))); + Some(index) + } + + /// Ends a resize operation that completed successfully. + /// + /// As the index becomes available again, it might evict another index from the cache. In that case, it is returned. + /// + /// # Panics + /// + /// - if the target index was not being resized. + /// - the index was also in the list of available indexes. + pub fn end_resize( + &mut self, + uuid: &Uuid, + index: Index, + ) -> (Arc, Option<(Uuid, Index)>) { + let signal = + self.pop_if_unavailable(uuid).flatten().expect("The index was not being resized"); + let evicted = match self.available.insert(*uuid, index) { + InsertionOutcome::InsertedNew => None, + InsertionOutcome::Evicted(uuid, index) => Some((uuid, index)), + InsertionOutcome::Replaced(_) => panic!("Inconsistent map state"), + }; + (signal, evicted) + } + + /// Ends a resize operation that failed for some reason. + /// + /// # Panics + /// + /// - if the target index was not being resized. + pub fn end_resize_failed(&mut self, uuid: &Uuid) -> Arc { + self.pop_if_unavailable(uuid).flatten().expect("The index was not being resized") + } + + /// Beings deleting an index. + /// + /// # Panics + /// + /// - if the index was already unavailable + pub fn start_deletion(&mut self, uuid: &Uuid) -> Option { + assert!( + matches!(self.get_if_unavailable(uuid), None), + "Attempt to start deleting an index that was already unavailable" + ); + + let index = self.available.remove(uuid)?; + self.unavailable.push((*uuid, None)); + Some(index) + } + + pub fn end_deletion(&mut self, uuid: &Uuid) { + self.pop_if_unavailable(uuid) + .expect("Attempted to delete an index that was not being deleted"); + } + + fn get_if_unavailable(&self, uuid: &Uuid) -> Option>> { + self.unavailable + .iter() + .find_map(|(candidate_uuid, signal)| (uuid == candidate_uuid).then_some(signal.clone())) + } + + fn pop_if_unavailable(&mut self, uuid: &Uuid) -> Option>> { + self.unavailable + .iter() + .position(|(candidate_uuid, _)| candidate_uuid == uuid) + .map(|index| self.unavailable.swap_remove(index).1) + } +} + /// Whether the index is available for use or is forbidden to be inserted back in the index map #[allow(clippy::large_enum_variant)] #[derive(Clone)] @@ -60,7 +175,7 @@ impl IndexMapper { indexer_config: IndexerConfig, ) -> Result { Ok(Self { - index_map: Arc::default(), + index_map: Arc::new(RwLock::new(IndexMap::new(20))), index_mapping: env.create_database(Some(INDEX_MAPPING))?, base_path, index_size, @@ -112,9 +227,15 @@ impl IndexMapper { // Error if the UUIDv4 somehow already exists in the map, since it should be fresh. // This is very unlikely to happen in practice. // TODO: it would be better to lazily create the index. But we need an Index::open function for milli. - if self.index_map.write().unwrap().insert(uuid, Available(index.clone())).is_some() - { - panic!("Uuid v4 conflict: index with UUID {uuid} already exists."); + match self.index_map.write().unwrap().insert(&uuid, index.clone()) { + InsertionOutcome::Evicted(uuid, evicted_index) => { + log::info!("Closing index with UUID {uuid}"); + evicted_index.prepare_for_closing(); + } + InsertionOutcome::Replaced(_) => { + panic!("Uuid v4 conflict: index with UUID {uuid} already exists.") + } + _ => (), } Ok(index) @@ -135,21 +256,18 @@ impl IndexMapper { assert!(self.index_mapping.delete(&mut wtxn, name)?); wtxn.commit()?; + // We remove the index from the in-memory index map. let closing_event = loop { let mut lock = self.index_map.write().unwrap(); - let resize_operation = match lock.insert(uuid, BeingDeleted) { - Some(Available(index)) => break Some(index.prepare_for_closing()), - // The target index is in the middle of a resize operation. - // Wait for this operation to complete, then try again. + let resize_operation = match lock.get(&uuid) { + Some(Available(index)) => { + lock.start_deletion(&uuid); + break index.prepare_for_closing(); + } Some(BeingResized(resize_operation)) => resize_operation.clone(), - // The index is already being deleted or doesn't exist. - // It's OK to remove it from the map again. - _ => break None, + Some(BeingDeleted) | None => return Ok(()), }; - - // Avoiding deadlocks: we need to drop the lock before waiting for the end of the resize, which - // will involve operations on the very map we're locking. drop(lock); resize_operation.wait(); }; @@ -162,9 +280,7 @@ impl IndexMapper { .spawn(move || { // We first wait to be sure that the previously opened index is effectively closed. // This can take a lot of time, this is why we do that in a seperate thread. - if let Some(closing_event) = closing_event { - closing_event.wait(); - } + closing_event.wait(); // Then we remove the content from disk. if let Err(e) = fs::remove_dir_all(&index_path) { @@ -175,7 +291,7 @@ impl IndexMapper { } // Finally we remove the entry from the index map. - assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted))); + index_map.write().unwrap().end_deletion(&uuid); }) .unwrap(); @@ -202,23 +318,9 @@ impl IndexMapper { .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; // We remove the index from the in-memory index map. - let mut lock = self.index_map.write().unwrap(); // signal that will be sent when the resize operation completes let resize_operation = Arc::new(SignalEvent::manual(false)); - let index = match lock.insert(uuid, BeingResized(resize_operation)) { - Some(Available(index)) => index, - Some(previous_status) => { - lock.insert(uuid, previous_status); - panic!( - "Attempting to resize index {name} that is already being resized or deleted." - ) - } - None => { - panic!("Could not find the status of index {name} in the in-memory index mapper.") - } - }; - - drop(lock); + let Some(index) = self.index_map.write().unwrap().start_resize(&uuid, resize_operation) else { return Ok(()) }; let resize_succeeded = (move || { let current_size = index.map_size()?; @@ -242,21 +344,17 @@ impl IndexMapper { // Even if there was an error we don't want to leave the map in an inconsistent state as it would cause // deadlocks. let mut lock = self.index_map.write().unwrap(); - let (resize_operation, resize_succeeded) = match resize_succeeded { + let (resize_operation, resize_succeeded, evicted) = match resize_succeeded { Ok(index) => { // insert the resized index - let Some(BeingResized(resize_operation)) = lock.insert(uuid, Available(index)) else { - panic!("Index state for index {name} was modified while it was being resized") - }; + let (resize_operation, evicted) = lock.end_resize(&uuid, index); - (resize_operation, Ok(())) + (resize_operation, Ok(()), evicted) } Err(error) => { // there was an error, not much we can do... delete the index from the in-memory map to prevent future errors - let Some(BeingResized(resize_operation)) = lock.remove(&uuid) else { - panic!("Index state for index {name} was modified while it was being resized") - }; - (resize_operation, Err(error)) + let resize_operation = lock.end_resize_failed(&uuid); + (resize_operation, Err(error), None) } }; @@ -264,6 +362,11 @@ impl IndexMapper { drop(lock); resize_operation.signal(); + if let Some((uuid, evicted_index)) = evicted { + log::info!("Closing index with UUID {uuid}"); + evicted_index.prepare_for_closing(); + } + resize_succeeded } @@ -275,11 +378,11 @@ impl IndexMapper { .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; // we clone here to drop the lock before entering the match - let index = loop { - let index = self.index_map.read().unwrap().get(&uuid).cloned(); + let (index, evicted_index) = loop { + let index = self.index_map.read().unwrap().get(&uuid); match index { - Some(Available(index)) => break index, + Some(Available(index)) => break (index, None), Some(BeingResized(ref resize_operation)) => { // Avoiding deadlocks: no lock taken while doing this operation. resize_operation.wait(); @@ -290,36 +393,44 @@ impl IndexMapper { None => { let mut index_map = self.index_map.write().unwrap(); // between the read lock and the write lock it's not impossible - // that someone already opened the index (eg if two search happens + // that someone already opened the index (eg if two searches happen // at the same time), thus before opening it we check a second time // if it's not already there. - // Since there is a good chance it's not already there we can use - // the entry method. - match index_map.entry(uuid) { - Entry::Vacant(entry) => { + match index_map.get(&uuid) { + None => { let index_path = self.base_path.join(uuid.to_string()); let index = self.create_or_open_index(&index_path, None, self.index_size)?; - entry.insert(Available(index.clone())); - break index; - } - Entry::Occupied(entry) => match entry.get() { - Available(index) => break index.clone(), - BeingResized(resize_operation) => { - // Avoiding the deadlock: we drop the lock before waiting - let resize_operation = resize_operation.clone(); - drop(index_map); - resize_operation.wait(); - continue; + match index_map.insert(&uuid, index.clone()) { + InsertionOutcome::InsertedNew => break (index, None), + InsertionOutcome::Evicted(evicted_uuid, evicted_index) => { + break (index, Some((evicted_uuid, evicted_index))) + } + InsertionOutcome::Replaced(_) => { + panic!("Inconsistent map state") + } } - BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), - }, + } + Some(Available(index)) => break (index, None), + Some(BeingResized(resize_operation)) => { + // Avoiding the deadlock: we drop the lock before waiting + let resize_operation = resize_operation.clone(); + drop(index_map); + resize_operation.wait(); + continue; + } + Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())), } } } }; + if let Some((evicted_uuid, evicted_index)) = evicted_index { + log::info!("Closing index with UUID {evicted_uuid}"); + evicted_index.prepare_for_closing(); + } + Ok(index) }