From 2b9a56e192aea0dbdc3e4e25e888957a54515a3f Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 17 Jan 2023 16:52:55 +0100 Subject: [PATCH] WIP: evict indexes in unavailable --- index-scheduler/src/index_mapper.rs | 223 +++++++++++++++------------- 1 file changed, 117 insertions(+), 106 deletions(-) diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 7dcdd68c9..8feaeabb5 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -1,17 +1,18 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; +use std::time::Duration; use std::{fs, thread}; use log::error; use meilisearch_types::heed::types::Str; -use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; +use meilisearch_types::heed::{Database, Env, EnvClosingEvent, EnvOpenOptions, RoTxn, RwTxn}; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::Index; use synchronoise::SignalEvent; use time::OffsetDateTime; use uuid::Uuid; -use self::IndexStatus::{Available, BeingDeleted, BeingResized}; +use self::IndexStatus::{Available, DefinitelyUnavailable, TemporarilyUnavailable}; use crate::lru::{InsertionOutcome, LruMap}; use crate::uuid_codec::UuidCodec; use crate::{clamp_to_page_size, Error, Result}; @@ -43,10 +44,30 @@ pub struct IndexMapper { } struct IndexMap { - unavailable: Vec<(Uuid, Option>)>, + unavailable: Vec<(Uuid, Option)>, available: LruMap, } +enum Unavailable { + Temporarily, + Definitely, +} + +#[derive(Clone)] +pub enum ClosingSignal { + Resized(Arc), + Closed(EnvClosingEvent), +} + +impl ClosingSignal { + pub fn wait_timeout(&self, timeout: Duration) -> bool { + match self { + ClosingSignal::Resized(signal) => signal.wait_timeout(timeout), + ClosingSignal::Closed(signal) => signal.wait_timeout(timeout), + } + } +} + impl IndexMap { pub fn new(cap: usize) -> IndexMap { Self { unavailable: Vec::new(), available: LruMap::new(cap) } @@ -56,9 +77,9 @@ impl IndexMap { self.get_if_unavailable(uuid) .map(|signal| { if let Some(signal) = signal { - IndexStatus::BeingResized(signal) + IndexStatus::TemporarilyUnavailable(signal) } else { - IndexStatus::BeingDeleted + IndexStatus::DefinitelyUnavailable } }) .or_else(|| self.available.get(uuid).map(|index| IndexStatus::Available(index.clone()))) @@ -69,88 +90,86 @@ impl IndexMap { /// # Panics /// /// - If the index is already present, but currently unavailable. - pub fn insert(&mut self, uuid: &Uuid, index: Index) -> InsertionOutcome { + pub fn insert(&mut self, uuid: &Uuid, index: Index) -> Option { assert!( matches!(self.get_if_unavailable(uuid), None), "Attempted to insert an index that was not available" ); - self.available.insert(*uuid, index) + match self.available.insert(*uuid, index) { + InsertionOutcome::InsertedNew => None, + InsertionOutcome::Evicted(evicted_uuid, evicted_index) => { + self.evict(evicted_uuid, evicted_index); + None + } + InsertionOutcome::Replaced(replaced_index) => Some(replaced_index), + } } - /// Begins a resize operation. + fn evict(&mut self, uuid: Uuid, index: Index) { + let closed = index.prepare_for_closing(); + self.unavailable.push((uuid, Some(ClosingSignal::Closed(closed)))); + } + + /// Makes an index temporarily or permanently unavailable. /// - /// Returns `None` if the index is already unavailable, or not present at all. - pub fn start_resize(&mut self, uuid: &Uuid, signal: Arc) -> Option { + /// Does nothing if the target index is already unavailable. + pub fn make_unavailable(&mut self, uuid: &Uuid, unavailability: Unavailable) -> Option { if self.get_if_unavailable(uuid).is_some() { return None; } + let available_when = match unavailability { + Unavailable::Temporarily => { + Some(ClosingSignal::Resized(Arc::new(SignalEvent::manual(false)))) + } + Unavailable::Definitely => None, + }; + let index = self.available.remove(uuid)?; - self.unavailable.push((*uuid, Some(signal))); + self.unavailable.push((*uuid, available_when)); Some(index) } - /// Ends a resize operation that completed successfully. + /// Makes an index available again. /// /// As the index becomes available again, it might evict another index from the cache. In that case, it is returned. /// /// # Panics /// - /// - if the target index was not being resized. + /// - if the target index was not being temporarily resized. /// - the index was also in the list of available indexes. - pub fn end_resize( - &mut self, - uuid: &Uuid, - index: Index, - ) -> (Arc, Option<(Uuid, Index)>) { - let signal = - self.pop_if_unavailable(uuid).flatten().expect("The index was not being resized"); - let evicted = match self.available.insert(*uuid, index) { - InsertionOutcome::InsertedNew => None, - InsertionOutcome::Evicted(uuid, index) => Some((uuid, index)), + pub fn restore(&mut self, uuid: &Uuid, index: Index) -> ClosingSignal { + let signal = self + .pop_if_unavailable(uuid) + .flatten() + .expect("The index was not being temporarily resized"); + match self.available.insert(*uuid, index) { + InsertionOutcome::Evicted(evicted_uuid, evicted_index) => { + self.evict(evicted_uuid, evicted_index) + } + InsertionOutcome::InsertedNew => (), InsertionOutcome::Replaced(_) => panic!("Inconsistent map state"), - }; - (signal, evicted) + } + signal } - /// Ends a resize operation that failed for some reason. + /// Removes an unavailable index from the map. /// /// # Panics /// - /// - if the target index was not being resized. - pub fn end_resize_failed(&mut self, uuid: &Uuid) -> Arc { - self.pop_if_unavailable(uuid).flatten().expect("The index was not being resized") + /// - if the target index was not unavailable. + pub fn remove(&mut self, uuid: &Uuid) -> Option { + self.pop_if_unavailable(uuid).expect("The index could not be found") } - /// Beings deleting an index. - /// - /// # Panics - /// - /// - if the index was already unavailable - pub fn start_deletion(&mut self, uuid: &Uuid) -> Option { - assert!( - matches!(self.get_if_unavailable(uuid), None), - "Attempt to start deleting an index that was already unavailable" - ); - - let index = self.available.remove(uuid)?; - self.unavailable.push((*uuid, None)); - Some(index) - } - - pub fn end_deletion(&mut self, uuid: &Uuid) { - self.pop_if_unavailable(uuid) - .expect("Attempted to delete an index that was not being deleted"); - } - - fn get_if_unavailable(&self, uuid: &Uuid) -> Option>> { + fn get_if_unavailable(&self, uuid: &Uuid) -> Option> { self.unavailable .iter() .find_map(|(candidate_uuid, signal)| (uuid == candidate_uuid).then_some(signal.clone())) } - fn pop_if_unavailable(&mut self, uuid: &Uuid) -> Option>> { + fn pop_if_unavailable(&mut self, uuid: &Uuid) -> Option> { self.unavailable .iter() .position(|(candidate_uuid, _)| candidate_uuid == uuid) @@ -163,9 +182,9 @@ impl IndexMap { #[derive(Clone)] pub enum IndexStatus { /// Do not insert it back in the index map as it is currently being deleted. - BeingDeleted, - /// Temporarily do not insert the index in the index map as it is currently being resized. - BeingResized(Arc), + DefinitelyUnavailable, + /// Temporarily do not insert the index in the index map as it is currently being resized/evicted from the map. + TemporarilyUnavailable(ClosingSignal), /// You can use the index without worrying about anything. Available(Index), } @@ -234,15 +253,8 @@ impl IndexMapper { // Error if the UUIDv4 somehow already exists in the map, since it should be fresh. // This is very unlikely to happen in practice. // TODO: it would be better to lazily create the index. But we need an Index::open function for milli. - match self.index_map.write().unwrap().insert(&uuid, index.clone()) { - InsertionOutcome::Evicted(uuid, evicted_index) => { - log::info!("Closing index with UUID {uuid}"); - evicted_index.prepare_for_closing(); - } - InsertionOutcome::Replaced(_) => { - panic!("Uuid v4 conflict: index with UUID {uuid} already exists.") - } - _ => (), + if self.index_map.write().unwrap().insert(&uuid, index.clone()).is_some() { + panic!("Uuid v4 conflict: index with UUID {uuid} already exists.") } Ok(index) @@ -264,19 +276,25 @@ impl IndexMapper { wtxn.commit()?; + let mut tries = 0; // We remove the index from the in-memory index map. let closing_event = loop { let mut lock = self.index_map.write().unwrap(); let resize_operation = match lock.get(&uuid) { Some(Available(index)) => { - lock.start_deletion(&uuid); + lock.make_unavailable(&uuid, Unavailable::Definitely); break index.prepare_for_closing(); } - Some(BeingResized(resize_operation)) => resize_operation.clone(), - Some(BeingDeleted) | None => return Ok(()), + Some(TemporarilyUnavailable(resize_operation)) => resize_operation.clone(), + Some(DefinitelyUnavailable) | None => return Ok(()), }; + // Avoiding deadlock: we drop the lock before waiting on the resize operation. drop(lock); - resize_operation.wait(); + resize_operation.wait_timeout(Duration::from_secs(6)); + tries += 1; + if tries > 100 { + panic!("Too many spurious wakeups while waiting on a resize operation.") + } }; let index_map = self.index_map.clone(); @@ -298,7 +316,7 @@ impl IndexMapper { } // Finally we remove the entry from the index map. - index_map.write().unwrap().end_deletion(&uuid); + index_map.write().unwrap().remove(&uuid); }) .unwrap(); @@ -325,9 +343,7 @@ impl IndexMapper { .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; // We remove the index from the in-memory index map. - // signal that will be sent when the resize operation completes - let resize_operation = Arc::new(SignalEvent::manual(false)); - let Some(index) = self.index_map.write().unwrap().start_resize(&uuid, resize_operation) else { return Ok(()) }; + let Some(index) = self.index_map.write().unwrap().make_unavailable(&uuid, Unavailable::Temporarily) else { return Ok(()) }; let resize_succeeded = (move || { let current_size = index.map_size()?; @@ -351,28 +367,25 @@ impl IndexMapper { // Even if there was an error we don't want to leave the map in an inconsistent state as it would cause // deadlocks. let mut lock = self.index_map.write().unwrap(); - let (resize_operation, resize_succeeded, evicted) = match resize_succeeded { + let (resize_operation, resize_succeeded) = match resize_succeeded { Ok(index) => { // insert the resized index - let (resize_operation, evicted) = lock.end_resize(&uuid, index); - - (resize_operation, Ok(()), evicted) + let resize_operation = lock.restore(&uuid, index); + (resize_operation, Ok(())) } Err(error) => { // there was an error, not much we can do... delete the index from the in-memory map to prevent future errors - let resize_operation = lock.end_resize_failed(&uuid); - (resize_operation, Err(error), None) + let resize_operation = lock.remove(&uuid).expect("The index was not being resized"); + (resize_operation, Err(error)) } }; // drop the lock before signaling completion so that other threads don't immediately await on the lock after waking up. drop(lock); - resize_operation.signal(); + let ClosingSignal::Resized(resize_operation) = resize_operation else { + panic!("Index was closed while being resized") }; - if let Some((uuid, evicted_index)) = evicted { - log::info!("Closing index with UUID {uuid}"); - evicted_index.prepare_for_closing(); - } + resize_operation.signal(); resize_succeeded } @@ -385,17 +398,22 @@ impl IndexMapper { .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; // we clone here to drop the lock before entering the match - let (index, evicted_index) = loop { + let mut tries = 0; + let index = loop { + tries += 1; + if tries > 100 { + panic!("Too many spurious wake ups while the index is being resized"); + } let index = self.index_map.read().unwrap().get(&uuid); match index { - Some(Available(index)) => break (index, None), - Some(BeingResized(ref resize_operation)) => { + Some(Available(index)) => break index, + Some(TemporarilyUnavailable(ref closing_signal)) => { // Avoiding deadlocks: no lock taken while doing this operation. - resize_operation.wait(); + closing_signal.wait_timeout(Duration::from_secs(6)); continue; } - Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())), + Some(DefinitelyUnavailable) => return Err(Error::IndexNotFound(name.to_string())), // since we're lazy, it's possible that the index has not been opened yet. None => { let mut index_map = self.index_map.write().unwrap(); @@ -412,35 +430,28 @@ impl IndexMapper { None, self.index_base_map_size, )?; - match index_map.insert(&uuid, index.clone()) { - InsertionOutcome::InsertedNew => break (index, None), - InsertionOutcome::Evicted(evicted_uuid, evicted_index) => { - break (index, Some((evicted_uuid, evicted_index))) - } - InsertionOutcome::Replaced(_) => { - panic!("Inconsistent map state") - } - } + assert!( + index_map.insert(&uuid, index.clone()).is_none(), + "Inconsistent map state" + ); + break index; } - Some(Available(index)) => break (index, None), - Some(BeingResized(resize_operation)) => { + Some(Available(index)) => break index, + Some(TemporarilyUnavailable(resize_operation)) => { // Avoiding the deadlock: we drop the lock before waiting let resize_operation = resize_operation.clone(); drop(index_map); - resize_operation.wait(); + resize_operation.wait_timeout(Duration::from_secs(6)); continue; } - Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())), + Some(DefinitelyUnavailable) => { + return Err(Error::IndexNotFound(name.to_string())) + } } } } }; - if let Some((evicted_uuid, evicted_index)) = evicted_index { - log::info!("Closing index with UUID {evicted_uuid}"); - evicted_index.prepare_for_closing(); - } - Ok(index) }