From 52bf260f73d89200557344e94908f30eed7cb627 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 18 Jan 2023 16:55:43 +0100 Subject: [PATCH] Rewrite where evicted indexes are added to the set --- index-scheduler/src/index_mapper.rs | 549 ++++++++++++++++------------ 1 file changed, 321 insertions(+), 228 deletions(-) diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 8feaeabb5..a92b795ea 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -1,21 +1,20 @@ -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::{Arc, RwLock}; use std::time::Duration; use std::{fs, thread}; use log::error; use meilisearch_types::heed::types::Str; -use meilisearch_types::heed::{Database, Env, EnvClosingEvent, EnvOpenOptions, RoTxn, RwTxn}; +use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn}; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::Index; -use synchronoise::SignalEvent; use time::OffsetDateTime; use uuid::Uuid; -use self::IndexStatus::{Available, DefinitelyUnavailable, TemporarilyUnavailable}; -use crate::lru::{InsertionOutcome, LruMap}; +use self::index_map::IndexMap; +use self::IndexStatus::{Available, BeingDeleted, Closing, Missing}; use crate::uuid_codec::UuidCodec; -use crate::{clamp_to_page_size, Error, Result}; +use crate::{Error, Result}; const INDEX_MAPPING: &str = "index-mapping"; @@ -26,6 +25,24 @@ const INDEX_MAPPING: &str = "index-mapping"; /// 2. Opening indexes and storing references to these opened indexes /// 3. Accessing indexes through their uuid /// 4. Mapping a user-defined name to each index uuid. +/// +/// # Implementation notes +/// +/// An index exists as 3 bits of data: +/// 1. The index data on disk, that can exist in 3 states: Missing, Present, or BeingDeleted. +/// 2. The persistent database containing the association between the index' name and its UUID, +/// that can exist in 2 states: Missing or Present. +/// 3. The state of the index in the in-memory `IndexMap`, that can exist in multiple states: +/// - Missing +/// - Available +/// - Closing (because an index needs resizing or was evicted from the cache) +/// - BeingDeleted +/// +/// All of this data should be kept consistent between index operations, which is achieved by the `IndexMapper` +/// with the use of the following primitives: +/// - A RwLock on the `IndexMap`. +/// - Transactions on the association database. +/// - ClosingEvent signals emitted when closing an environment. #[derive(Clone)] pub struct IndexMapper { /// Keep track of the opened indexes. Used mainly by the index resolver. @@ -43,137 +60,276 @@ pub struct IndexMapper { pub indexer_config: Arc, } -struct IndexMap { - unavailable: Vec<(Uuid, Option)>, - available: LruMap, -} +mod index_map { + use std::collections::BTreeMap; + use std::path::Path; + use std::time::Duration; -enum Unavailable { - Temporarily, - Definitely, -} + use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions}; + use meilisearch_types::milli::Index; + use time::OffsetDateTime; + use uuid::Uuid; -#[derive(Clone)] -pub enum ClosingSignal { - Resized(Arc), - Closed(EnvClosingEvent), -} + use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing}; + use crate::lru::{InsertionOutcome, LruMap}; + use crate::{clamp_to_page_size, Result}; -impl ClosingSignal { - pub fn wait_timeout(&self, timeout: Duration) -> bool { - match self { - ClosingSignal::Resized(signal) => signal.wait_timeout(timeout), - ClosingSignal::Closed(signal) => signal.wait_timeout(timeout), + /// Keep an internally consistent view of the open indexes in memory. + /// + /// This view is made of an LRU cache that will evict the least frequently used indexes when new indexes are opened. + /// Indexes that are being closed (for resizing or due to cache eviction) or deleted cannot be evicted from the cache and + /// are stored separately. + /// + /// This view provides operations to change the state of the index as it is known in memory: + /// open an index (making it available for queries), close an index (specifying the new size it should be opened with), + /// delete an index. + /// + /// External consistency with the other bits of data of an index is provided by the `IndexMapper` parent structure. + pub struct IndexMap { + /// A LRU map of indexes that are in the open state and available for queries. + available: LruMap, + /// A map of indexes that are not available for queries, either because they are being deleted + /// or because they are being closed. + /// + /// If they are being deleted, the UUID point to `None`. + unavailable: BTreeMap>, + + generation: usize, + } + + #[derive(Clone)] + pub struct ClosingIndex { + uuid: Uuid, + closing_event: EnvClosingEvent, + map_size: usize, + generation: usize, + } + + impl ClosingIndex { + /// Waits for the index to be definitely closed. + /// + /// To avoid blocking, users should relinquish their locks to the IndexMap before calling this function. + /// + /// After the index is physically closed, the in memory map must still be updated to take this into account. + /// To do so, a `ReopenableIndex` is returned, that can be used to either definitely close or definitely open + /// the index without waiting anymore. + pub fn wait_timeout(self, timeout: Duration) -> ReopenableIndex { + self.closing_event.wait_timeout(timeout); + ReopenableIndex { + uuid: self.uuid, + map_size: self.map_size, + generation: self.generation, + } } } -} -impl IndexMap { - pub fn new(cap: usize) -> IndexMap { - Self { unavailable: Vec::new(), available: LruMap::new(cap) } + pub struct ReopenableIndex { + uuid: Uuid, + map_size: usize, + generation: usize, } - pub fn get(&self, uuid: &Uuid) -> Option { - self.get_if_unavailable(uuid) - .map(|signal| { - if let Some(signal) = signal { - IndexStatus::TemporarilyUnavailable(signal) - } else { - IndexStatus::DefinitelyUnavailable + impl ReopenableIndex { + /// Attempts to reopen the index, which can result in the index being reopened again or not + /// (e.g. if another thread already opened and closed the index again). + /// + /// Use get again on the IndexMap to get the updated status. + /// + /// Fails if the underlying index creation fails. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|------------| + /// | Missing | Missing | + /// | BeingDeleted | BeingDeleted | + /// | Closing | Available or Closing depending on generation | + /// | Available | Available | + /// + pub fn reopen(self, map: &mut IndexMap, path: &Path) -> Result<()> { + if let Closing(reopen) = map.get(&self.uuid) { + if reopen.generation != self.generation { + return Ok(()); } - }) - .or_else(|| self.available.get(uuid).map(|index| IndexStatus::Available(index.clone()))) - } - - /// Inserts a new index as available - /// - /// # Panics - /// - /// - If the index is already present, but currently unavailable. - pub fn insert(&mut self, uuid: &Uuid, index: Index) -> Option { - assert!( - matches!(self.get_if_unavailable(uuid), None), - "Attempted to insert an index that was not available" - ); - - match self.available.insert(*uuid, index) { - InsertionOutcome::InsertedNew => None, - InsertionOutcome::Evicted(evicted_uuid, evicted_index) => { - self.evict(evicted_uuid, evicted_index); - None + map.unavailable.remove(&self.uuid); + map.create(&self.uuid, path, None, self.map_size)?; + } + Ok(()) + } + + /// Attempts to close the index, which may or may not result in the index being closed + /// (e.g. if another thread already reopened the index again). + /// + /// Use get again on the IndexMap to get the updated status. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|------------| + /// | Missing | Missing | + /// | BeingDeleted | BeingDeleted | + /// | Closing | Missing or Closing depending on generation | + /// | Available | Available | + pub fn close(self, map: &mut IndexMap) { + if let Closing(reopen) = map.get(&self.uuid) { + if reopen.generation != self.generation { + return; + } + map.unavailable.remove(&self.uuid); } - InsertionOutcome::Replaced(replaced_index) => Some(replaced_index), } } - fn evict(&mut self, uuid: Uuid, index: Index) { - let closed = index.prepare_for_closing(); - self.unavailable.push((uuid, Some(ClosingSignal::Closed(closed)))); - } - - /// Makes an index temporarily or permanently unavailable. - /// - /// Does nothing if the target index is already unavailable. - pub fn make_unavailable(&mut self, uuid: &Uuid, unavailability: Unavailable) -> Option { - if self.get_if_unavailable(uuid).is_some() { - return None; + impl IndexMap { + pub fn new(cap: usize) -> IndexMap { + Self { unavailable: Default::default(), available: LruMap::new(cap), generation: 0 } } - let available_when = match unavailability { - Unavailable::Temporarily => { - Some(ClosingSignal::Resized(Arc::new(SignalEvent::manual(false)))) - } - Unavailable::Definitely => None, - }; - - let index = self.available.remove(uuid)?; - self.unavailable.push((*uuid, available_when)); - Some(index) - } - - /// Makes an index available again. - /// - /// As the index becomes available again, it might evict another index from the cache. In that case, it is returned. - /// - /// # Panics - /// - /// - if the target index was not being temporarily resized. - /// - the index was also in the list of available indexes. - pub fn restore(&mut self, uuid: &Uuid, index: Index) -> ClosingSignal { - let signal = self - .pop_if_unavailable(uuid) - .flatten() - .expect("The index was not being temporarily resized"); - match self.available.insert(*uuid, index) { - InsertionOutcome::Evicted(evicted_uuid, evicted_index) => { - self.evict(evicted_uuid, evicted_index) - } - InsertionOutcome::InsertedNew => (), - InsertionOutcome::Replaced(_) => panic!("Inconsistent map state"), + /// Gets the current status of an index in the map. + /// + /// If the index is available it can be accessed from the returned status. + pub fn get(&self, uuid: &Uuid) -> IndexStatus { + self.available + .get(uuid) + .map(|index| Available(index.clone())) + .unwrap_or_else(|| self.get_unavailable(uuid)) + } + + fn get_unavailable(&self, uuid: &Uuid) -> IndexStatus { + match self.unavailable.get(uuid) { + Some(Some(reopen)) => Closing(reopen.clone()), + Some(None) => BeingDeleted, + None => Missing, + } + } + + /// Attempts to create a new index that wasn't existing before. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|------------| + /// | Missing | Available | + /// | BeingDeleted | panics | + /// | Closing | panics | + /// | Available | panics | + /// + pub fn create( + &mut self, + uuid: &Uuid, + path: &Path, + date: Option<(OffsetDateTime, OffsetDateTime)>, + map_size: usize, + ) -> Result { + if !matches!(self.get_unavailable(uuid), Missing) { + panic!("Attempt to open an index that was unavailable"); + } + let index = create_or_open_index(path, date, map_size)?; + match self.available.insert(*uuid, index.clone()) { + InsertionOutcome::InsertedNew => (), + InsertionOutcome::Evicted(evicted_uuid, evicted_index) => { + self.close(evicted_uuid, evicted_index, 0); + } + InsertionOutcome::Replaced(_) => { + panic!("Attempt to open an index that was already opened") + } + } + Ok(index) + } + + fn next_generation(&mut self) -> usize { + self.generation = self.generation.checked_add(1).unwrap(); + self.generation + } + + /// Attempts to close an index. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|------------| + /// | Missing | Missing | + /// | BeingDeleted | BeingDeleted | + /// | Closing | Closing | + /// | Available | Closing | + /// + pub fn close_for_resize(&mut self, uuid: &Uuid, map_size_growth: usize) { + let Some(index) = self.available.remove(uuid) else { return; }; + self.close(*uuid, index, map_size_growth); + } + + fn close(&mut self, uuid: Uuid, index: Index, map_size_growth: usize) { + /// TODO: default map_size + let map_size = index.map_size().unwrap_or_default() + map_size_growth; + let closing_event = index.prepare_for_closing(); + let generation = self.next_generation(); + self.unavailable + .insert(uuid, Some(ClosingIndex { uuid, closing_event, map_size, generation })); + } + + /// Attempts to delete and index. + /// + /// # Status table + /// + /// | Previous Status | New Status | Return value | + /// |-----------------|------------|--------------| + /// | Missing | BeingDeleted | Ok(None) | + /// | BeingDeleted | BeingDeleted | Err(None) | + /// | Closing | Closing | Err(Some(reopen)) | + /// | Available | BeingDeleted | Ok(Some(env_closing_event)) | + pub fn start_deletion( + &mut self, + uuid: &Uuid, + ) -> std::result::Result, Option> { + if let Some(index) = self.available.remove(uuid) { + return Ok(Some(index.prepare_for_closing())); + } + match self.unavailable.remove(uuid) { + Some(Some(reopen)) => Err(Some(reopen)), + Some(None) => Err(None), + None => Ok(None), + } + } + + /// Marks that an index finished deletion. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|------------| + /// | Missing | Missing | + /// | BeingDeleted | Missing | + /// | Closing | panics | + /// | Available | panics | + pub fn end_deletion(&mut self, uuid: &Uuid) { + assert!( + self.available.get(uuid).is_none(), + "Attempt to finish deletion of an index that was not being deleted" + ); + // Do not panic if the index was Missing or BeingDeleted + assert!( + !matches!(self.unavailable.remove(uuid), Some(Some(_))), + "Attempt to finish deletion of an index that was being closed" + ); } - signal } - /// Removes an unavailable index from the map. - /// - /// # Panics - /// - /// - if the target index was not unavailable. - pub fn remove(&mut self, uuid: &Uuid) -> Option { - self.pop_if_unavailable(uuid).expect("The index could not be found") - } + /// Create or open an index in the specified path. + /// The path *must* exist or an error will be thrown. + fn create_or_open_index( + path: &Path, + date: Option<(OffsetDateTime, OffsetDateTime)>, + map_size: usize, + ) -> Result { + let mut options = EnvOpenOptions::new(); + options.map_size(clamp_to_page_size(map_size)); + options.max_readers(1024); - fn get_if_unavailable(&self, uuid: &Uuid) -> Option> { - self.unavailable - .iter() - .find_map(|(candidate_uuid, signal)| (uuid == candidate_uuid).then_some(signal.clone())) - } - - fn pop_if_unavailable(&mut self, uuid: &Uuid) -> Option> { - self.unavailable - .iter() - .position(|(candidate_uuid, _)| candidate_uuid == uuid) - .map(|index| self.unavailable.swap_remove(index).1) + if let Some((created, updated)) = date { + Ok(Index::new_with_creation_dates(options, path, created, updated)?) + } else { + Ok(Index::new(options, path)?) + } } } @@ -181,10 +337,12 @@ impl IndexMap { #[allow(clippy::large_enum_variant)] #[derive(Clone)] pub enum IndexStatus { + /// Not currently in the index map. + Missing, /// Do not insert it back in the index map as it is currently being deleted. - DefinitelyUnavailable, + BeingDeleted, /// Temporarily do not insert the index in the index map as it is currently being resized/evicted from the map. - TemporarilyUnavailable(ClosingSignal), + Closing(index_map::ClosingIndex), /// You can use the index without worrying about anything. Available(Index), } @@ -208,25 +366,6 @@ impl IndexMapper { }) } - /// Create or open an index in the specified path. - /// The path *must* exists or an error will be thrown. - fn create_or_open_index( - &self, - path: &Path, - date: Option<(OffsetDateTime, OffsetDateTime)>, - map_size: usize, - ) -> Result { - let mut options = EnvOpenOptions::new(); - options.map_size(clamp_to_page_size(map_size)); - options.max_readers(1024); - - if let Some((created, updated)) = date { - Ok(Index::new_with_creation_dates(options, path, created, updated)?) - } else { - Ok(Index::new(options, path)?) - } - } - /// Get or create the index. pub fn create_index( &self, @@ -246,16 +385,17 @@ impl IndexMapper { let index_path = self.base_path.join(uuid.to_string()); fs::create_dir_all(&index_path)?; - let index = - self.create_or_open_index(&index_path, date, self.index_base_map_size)?; - - wtxn.commit()?; // Error if the UUIDv4 somehow already exists in the map, since it should be fresh. // This is very unlikely to happen in practice. // TODO: it would be better to lazily create the index. But we need an Index::open function for milli. - if self.index_map.write().unwrap().insert(&uuid, index.clone()).is_some() { - panic!("Uuid v4 conflict: index with UUID {uuid} already exists.") - } + let index = self.index_map.write().unwrap().create( + &uuid, + &index_path, + date, + self.index_base_map_size, + )?; + + wtxn.commit()?; Ok(index) } @@ -280,20 +420,20 @@ impl IndexMapper { // We remove the index from the in-memory index map. let closing_event = loop { let mut lock = self.index_map.write().unwrap(); - let resize_operation = match lock.get(&uuid) { - Some(Available(index)) => { - lock.make_unavailable(&uuid, Unavailable::Definitely); - break index.prepare_for_closing(); + match lock.start_deletion(&uuid) { + Ok(env_closing) => break env_closing, + Err(Some(reopen)) => { + // drop the lock here so that we don't synchronously wait for the index to close. + drop(lock); + tries += 1; + if tries >= 100 { + panic!("Too many attempts to close index {name} prior to deletion.") + } + let reopen = reopen.wait_timeout(Duration::from_secs(6)); + reopen.close(&mut self.index_map.write().unwrap()); + continue; } - Some(TemporarilyUnavailable(resize_operation)) => resize_operation.clone(), - Some(DefinitelyUnavailable) | None => return Ok(()), - }; - // Avoiding deadlock: we drop the lock before waiting on the resize operation. - drop(lock); - resize_operation.wait_timeout(Duration::from_secs(6)); - tries += 1; - if tries > 100 { - panic!("Too many spurious wakeups while waiting on a resize operation.") + Err(None) => return Ok(()), } }; @@ -304,8 +444,10 @@ impl IndexMapper { .name(String::from("index_deleter")) .spawn(move || { // We first wait to be sure that the previously opened index is effectively closed. - // This can take a lot of time, this is why we do that in a seperate thread. - closing_event.wait(); + // This can take a lot of time, this is why we do that in a separate thread. + if let Some(closing_event) = closing_event { + closing_event.wait(); + } // Then we remove the content from disk. if let Err(e) = fs::remove_dir_all(&index_path) { @@ -316,7 +458,7 @@ impl IndexMapper { } // Finally we remove the entry from the index map. - index_map.write().unwrap().remove(&uuid); + index_map.write().unwrap().end_deletion(&uuid); }) .unwrap(); @@ -336,58 +478,15 @@ impl IndexMapper { /// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the /// in memory hash map. pub fn resize_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> { - // fixme: factor to a function? let uuid = self .index_mapping .get(rtxn, name)? .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; // We remove the index from the in-memory index map. - let Some(index) = self.index_map.write().unwrap().make_unavailable(&uuid, Unavailable::Temporarily) else { return Ok(()) }; + self.index_map.write().unwrap().close_for_resize(&uuid, self.index_growth_amount); - let resize_succeeded = (move || { - let current_size = index.map_size()?; - let new_size = current_size + self.index_growth_amount; - let closing_event = index.prepare_for_closing(); - - log::debug!("Waiting for index {name} to close"); - - if !closing_event.wait_timeout(std::time::Duration::from_secs(600)) { - // fail after 10 minutes waiting - panic!("Could not resize index {name} (unable to close it)"); - } - - log::info!("Resized index {name} from {current_size} to {new_size} bytes"); - let index_path = self.base_path.join(uuid.to_string()); - let index = self.create_or_open_index(&index_path, None, new_size)?; - Ok(index) - })(); - - // Put the map back to a consistent state. - // Even if there was an error we don't want to leave the map in an inconsistent state as it would cause - // deadlocks. - let mut lock = self.index_map.write().unwrap(); - let (resize_operation, resize_succeeded) = match resize_succeeded { - Ok(index) => { - // insert the resized index - let resize_operation = lock.restore(&uuid, index); - (resize_operation, Ok(())) - } - Err(error) => { - // there was an error, not much we can do... delete the index from the in-memory map to prevent future errors - let resize_operation = lock.remove(&uuid).expect("The index was not being resized"); - (resize_operation, Err(error)) - } - }; - - // drop the lock before signaling completion so that other threads don't immediately await on the lock after waking up. - drop(lock); - let ClosingSignal::Resized(resize_operation) = resize_operation else { - panic!("Index was closed while being resized") }; - - resize_operation.signal(); - - resize_succeeded + Ok(()) } /// Return an index, may open it if it wasn't already opened. @@ -407,46 +506,40 @@ impl IndexMapper { let index = self.index_map.read().unwrap().get(&uuid); match index { - Some(Available(index)) => break index, - Some(TemporarilyUnavailable(ref closing_signal)) => { + Available(index) => break index, + Closing(reopen) => { // Avoiding deadlocks: no lock taken while doing this operation. - closing_signal.wait_timeout(Duration::from_secs(6)); + let reopen = reopen.wait_timeout(Duration::from_secs(6)); + let index_path = self.base_path.join(uuid.to_string()); + // take the lock to reopen the environment. + reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)?; continue; } - Some(DefinitelyUnavailable) => return Err(Error::IndexNotFound(name.to_string())), + BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), // since we're lazy, it's possible that the index has not been opened yet. - None => { + Missing => { let mut index_map = self.index_map.write().unwrap(); // between the read lock and the write lock it's not impossible // that someone already opened the index (eg if two searches happen // at the same time), thus before opening it we check a second time // if it's not already there. match index_map.get(&uuid) { - None => { + Missing => { let index_path = self.base_path.join(uuid.to_string()); - let index = self.create_or_open_index( + break index_map.create( + &uuid, &index_path, None, self.index_base_map_size, )?; - assert!( - index_map.insert(&uuid, index.clone()).is_none(), - "Inconsistent map state" - ); - break index; } - Some(Available(index)) => break index, - Some(TemporarilyUnavailable(resize_operation)) => { - // Avoiding the deadlock: we drop the lock before waiting - let resize_operation = resize_operation.clone(); - drop(index_map); - resize_operation.wait_timeout(Duration::from_secs(6)); + Available(index) => break index, + Closing(_) => { + // the reopening will be handled in the next loop operation continue; } - Some(DefinitelyUnavailable) => { - return Err(Error::IndexNotFound(name.to_string())) - } + BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), } } }