mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-27 00:31:02 +00:00
Rewrite where evicted indexes are added to the set
This commit is contained in:
@ -1,21 +1,20 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use std::{fs, thread};
|
||||
|
||||
use log::error;
|
||||
use meilisearch_types::heed::types::Str;
|
||||
use meilisearch_types::heed::{Database, Env, EnvClosingEvent, EnvOpenOptions, RoTxn, RwTxn};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
use meilisearch_types::milli::Index;
|
||||
use synchronoise::SignalEvent;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
use self::IndexStatus::{Available, DefinitelyUnavailable, TemporarilyUnavailable};
|
||||
use crate::lru::{InsertionOutcome, LruMap};
|
||||
use self::index_map::IndexMap;
|
||||
use self::IndexStatus::{Available, BeingDeleted, Closing, Missing};
|
||||
use crate::uuid_codec::UuidCodec;
|
||||
use crate::{clamp_to_page_size, Error, Result};
|
||||
use crate::{Error, Result};
|
||||
|
||||
const INDEX_MAPPING: &str = "index-mapping";
|
||||
|
||||
@ -26,6 +25,24 @@ const INDEX_MAPPING: &str = "index-mapping";
|
||||
/// 2. Opening indexes and storing references to these opened indexes
|
||||
/// 3. Accessing indexes through their uuid
|
||||
/// 4. Mapping a user-defined name to each index uuid.
|
||||
///
|
||||
/// # Implementation notes
|
||||
///
|
||||
/// An index exists as 3 bits of data:
|
||||
/// 1. The index data on disk, that can exist in 3 states: Missing, Present, or BeingDeleted.
|
||||
/// 2. The persistent database containing the association between the index' name and its UUID,
|
||||
/// that can exist in 2 states: Missing or Present.
|
||||
/// 3. The state of the index in the in-memory `IndexMap`, that can exist in multiple states:
|
||||
/// - Missing
|
||||
/// - Available
|
||||
/// - Closing (because an index needs resizing or was evicted from the cache)
|
||||
/// - BeingDeleted
|
||||
///
|
||||
/// All of this data should be kept consistent between index operations, which is achieved by the `IndexMapper`
|
||||
/// with the use of the following primitives:
|
||||
/// - A RwLock on the `IndexMap`.
|
||||
/// - Transactions on the association database.
|
||||
/// - ClosingEvent signals emitted when closing an environment.
|
||||
#[derive(Clone)]
|
||||
pub struct IndexMapper {
|
||||
/// Keep track of the opened indexes. Used mainly by the index resolver.
|
||||
@ -43,137 +60,276 @@ pub struct IndexMapper {
|
||||
pub indexer_config: Arc<IndexerConfig>,
|
||||
}
|
||||
|
||||
struct IndexMap {
|
||||
unavailable: Vec<(Uuid, Option<ClosingSignal>)>,
|
||||
available: LruMap<Uuid, Index>,
|
||||
}
|
||||
mod index_map {
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
enum Unavailable {
|
||||
Temporarily,
|
||||
Definitely,
|
||||
}
|
||||
use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions};
|
||||
use meilisearch_types::milli::Index;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum ClosingSignal {
|
||||
Resized(Arc<SignalEvent>),
|
||||
Closed(EnvClosingEvent),
|
||||
}
|
||||
use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing};
|
||||
use crate::lru::{InsertionOutcome, LruMap};
|
||||
use crate::{clamp_to_page_size, Result};
|
||||
|
||||
impl ClosingSignal {
|
||||
pub fn wait_timeout(&self, timeout: Duration) -> bool {
|
||||
match self {
|
||||
ClosingSignal::Resized(signal) => signal.wait_timeout(timeout),
|
||||
ClosingSignal::Closed(signal) => signal.wait_timeout(timeout),
|
||||
/// Keep an internally consistent view of the open indexes in memory.
|
||||
///
|
||||
/// This view is made of an LRU cache that will evict the least frequently used indexes when new indexes are opened.
|
||||
/// Indexes that are being closed (for resizing or due to cache eviction) or deleted cannot be evicted from the cache and
|
||||
/// are stored separately.
|
||||
///
|
||||
/// This view provides operations to change the state of the index as it is known in memory:
|
||||
/// open an index (making it available for queries), close an index (specifying the new size it should be opened with),
|
||||
/// delete an index.
|
||||
///
|
||||
/// External consistency with the other bits of data of an index is provided by the `IndexMapper` parent structure.
|
||||
pub struct IndexMap {
|
||||
/// A LRU map of indexes that are in the open state and available for queries.
|
||||
available: LruMap<Uuid, Index>,
|
||||
/// A map of indexes that are not available for queries, either because they are being deleted
|
||||
/// or because they are being closed.
|
||||
///
|
||||
/// If they are being deleted, the UUID point to `None`.
|
||||
unavailable: BTreeMap<Uuid, Option<ClosingIndex>>,
|
||||
|
||||
generation: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClosingIndex {
|
||||
uuid: Uuid,
|
||||
closing_event: EnvClosingEvent,
|
||||
map_size: usize,
|
||||
generation: usize,
|
||||
}
|
||||
|
||||
impl ClosingIndex {
|
||||
/// Waits for the index to be definitely closed.
|
||||
///
|
||||
/// To avoid blocking, users should relinquish their locks to the IndexMap before calling this function.
|
||||
///
|
||||
/// After the index is physically closed, the in memory map must still be updated to take this into account.
|
||||
/// To do so, a `ReopenableIndex` is returned, that can be used to either definitely close or definitely open
|
||||
/// the index without waiting anymore.
|
||||
pub fn wait_timeout(self, timeout: Duration) -> ReopenableIndex {
|
||||
self.closing_event.wait_timeout(timeout);
|
||||
ReopenableIndex {
|
||||
uuid: self.uuid,
|
||||
map_size: self.map_size,
|
||||
generation: self.generation,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexMap {
|
||||
pub fn new(cap: usize) -> IndexMap {
|
||||
Self { unavailable: Vec::new(), available: LruMap::new(cap) }
|
||||
pub struct ReopenableIndex {
|
||||
uuid: Uuid,
|
||||
map_size: usize,
|
||||
generation: usize,
|
||||
}
|
||||
|
||||
pub fn get(&self, uuid: &Uuid) -> Option<IndexStatus> {
|
||||
self.get_if_unavailable(uuid)
|
||||
.map(|signal| {
|
||||
if let Some(signal) = signal {
|
||||
IndexStatus::TemporarilyUnavailable(signal)
|
||||
} else {
|
||||
IndexStatus::DefinitelyUnavailable
|
||||
impl ReopenableIndex {
|
||||
/// Attempts to reopen the index, which can result in the index being reopened again or not
|
||||
/// (e.g. if another thread already opened and closed the index again).
|
||||
///
|
||||
/// Use get again on the IndexMap to get the updated status.
|
||||
///
|
||||
/// Fails if the underlying index creation fails.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|------------|
|
||||
/// | Missing | Missing |
|
||||
/// | BeingDeleted | BeingDeleted |
|
||||
/// | Closing | Available or Closing depending on generation |
|
||||
/// | Available | Available |
|
||||
///
|
||||
pub fn reopen(self, map: &mut IndexMap, path: &Path) -> Result<()> {
|
||||
if let Closing(reopen) = map.get(&self.uuid) {
|
||||
if reopen.generation != self.generation {
|
||||
return Ok(());
|
||||
}
|
||||
})
|
||||
.or_else(|| self.available.get(uuid).map(|index| IndexStatus::Available(index.clone())))
|
||||
}
|
||||
|
||||
/// Inserts a new index as available
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - If the index is already present, but currently unavailable.
|
||||
pub fn insert(&mut self, uuid: &Uuid, index: Index) -> Option<Index> {
|
||||
assert!(
|
||||
matches!(self.get_if_unavailable(uuid), None),
|
||||
"Attempted to insert an index that was not available"
|
||||
);
|
||||
|
||||
match self.available.insert(*uuid, index) {
|
||||
InsertionOutcome::InsertedNew => None,
|
||||
InsertionOutcome::Evicted(evicted_uuid, evicted_index) => {
|
||||
self.evict(evicted_uuid, evicted_index);
|
||||
None
|
||||
map.unavailable.remove(&self.uuid);
|
||||
map.create(&self.uuid, path, None, self.map_size)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Attempts to close the index, which may or may not result in the index being closed
|
||||
/// (e.g. if another thread already reopened the index again).
|
||||
///
|
||||
/// Use get again on the IndexMap to get the updated status.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|------------|
|
||||
/// | Missing | Missing |
|
||||
/// | BeingDeleted | BeingDeleted |
|
||||
/// | Closing | Missing or Closing depending on generation |
|
||||
/// | Available | Available |
|
||||
pub fn close(self, map: &mut IndexMap) {
|
||||
if let Closing(reopen) = map.get(&self.uuid) {
|
||||
if reopen.generation != self.generation {
|
||||
return;
|
||||
}
|
||||
map.unavailable.remove(&self.uuid);
|
||||
}
|
||||
InsertionOutcome::Replaced(replaced_index) => Some(replaced_index),
|
||||
}
|
||||
}
|
||||
|
||||
fn evict(&mut self, uuid: Uuid, index: Index) {
|
||||
let closed = index.prepare_for_closing();
|
||||
self.unavailable.push((uuid, Some(ClosingSignal::Closed(closed))));
|
||||
}
|
||||
|
||||
/// Makes an index temporarily or permanently unavailable.
|
||||
///
|
||||
/// Does nothing if the target index is already unavailable.
|
||||
pub fn make_unavailable(&mut self, uuid: &Uuid, unavailability: Unavailable) -> Option<Index> {
|
||||
if self.get_if_unavailable(uuid).is_some() {
|
||||
return None;
|
||||
impl IndexMap {
|
||||
pub fn new(cap: usize) -> IndexMap {
|
||||
Self { unavailable: Default::default(), available: LruMap::new(cap), generation: 0 }
|
||||
}
|
||||
|
||||
let available_when = match unavailability {
|
||||
Unavailable::Temporarily => {
|
||||
Some(ClosingSignal::Resized(Arc::new(SignalEvent::manual(false))))
|
||||
}
|
||||
Unavailable::Definitely => None,
|
||||
};
|
||||
|
||||
let index = self.available.remove(uuid)?;
|
||||
self.unavailable.push((*uuid, available_when));
|
||||
Some(index)
|
||||
}
|
||||
|
||||
/// Makes an index available again.
|
||||
///
|
||||
/// As the index becomes available again, it might evict another index from the cache. In that case, it is returned.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - if the target index was not being temporarily resized.
|
||||
/// - the index was also in the list of available indexes.
|
||||
pub fn restore(&mut self, uuid: &Uuid, index: Index) -> ClosingSignal {
|
||||
let signal = self
|
||||
.pop_if_unavailable(uuid)
|
||||
.flatten()
|
||||
.expect("The index was not being temporarily resized");
|
||||
match self.available.insert(*uuid, index) {
|
||||
InsertionOutcome::Evicted(evicted_uuid, evicted_index) => {
|
||||
self.evict(evicted_uuid, evicted_index)
|
||||
}
|
||||
InsertionOutcome::InsertedNew => (),
|
||||
InsertionOutcome::Replaced(_) => panic!("Inconsistent map state"),
|
||||
/// Gets the current status of an index in the map.
|
||||
///
|
||||
/// If the index is available it can be accessed from the returned status.
|
||||
pub fn get(&self, uuid: &Uuid) -> IndexStatus {
|
||||
self.available
|
||||
.get(uuid)
|
||||
.map(|index| Available(index.clone()))
|
||||
.unwrap_or_else(|| self.get_unavailable(uuid))
|
||||
}
|
||||
|
||||
fn get_unavailable(&self, uuid: &Uuid) -> IndexStatus {
|
||||
match self.unavailable.get(uuid) {
|
||||
Some(Some(reopen)) => Closing(reopen.clone()),
|
||||
Some(None) => BeingDeleted,
|
||||
None => Missing,
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to create a new index that wasn't existing before.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|------------|
|
||||
/// | Missing | Available |
|
||||
/// | BeingDeleted | panics |
|
||||
/// | Closing | panics |
|
||||
/// | Available | panics |
|
||||
///
|
||||
pub fn create(
|
||||
&mut self,
|
||||
uuid: &Uuid,
|
||||
path: &Path,
|
||||
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||
map_size: usize,
|
||||
) -> Result<Index> {
|
||||
if !matches!(self.get_unavailable(uuid), Missing) {
|
||||
panic!("Attempt to open an index that was unavailable");
|
||||
}
|
||||
let index = create_or_open_index(path, date, map_size)?;
|
||||
match self.available.insert(*uuid, index.clone()) {
|
||||
InsertionOutcome::InsertedNew => (),
|
||||
InsertionOutcome::Evicted(evicted_uuid, evicted_index) => {
|
||||
self.close(evicted_uuid, evicted_index, 0);
|
||||
}
|
||||
InsertionOutcome::Replaced(_) => {
|
||||
panic!("Attempt to open an index that was already opened")
|
||||
}
|
||||
}
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
fn next_generation(&mut self) -> usize {
|
||||
self.generation = self.generation.checked_add(1).unwrap();
|
||||
self.generation
|
||||
}
|
||||
|
||||
/// Attempts to close an index.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|------------|
|
||||
/// | Missing | Missing |
|
||||
/// | BeingDeleted | BeingDeleted |
|
||||
/// | Closing | Closing |
|
||||
/// | Available | Closing |
|
||||
///
|
||||
pub fn close_for_resize(&mut self, uuid: &Uuid, map_size_growth: usize) {
|
||||
let Some(index) = self.available.remove(uuid) else { return; };
|
||||
self.close(*uuid, index, map_size_growth);
|
||||
}
|
||||
|
||||
fn close(&mut self, uuid: Uuid, index: Index, map_size_growth: usize) {
|
||||
/// TODO: default map_size
|
||||
let map_size = index.map_size().unwrap_or_default() + map_size_growth;
|
||||
let closing_event = index.prepare_for_closing();
|
||||
let generation = self.next_generation();
|
||||
self.unavailable
|
||||
.insert(uuid, Some(ClosingIndex { uuid, closing_event, map_size, generation }));
|
||||
}
|
||||
|
||||
/// Attempts to delete and index.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status | Return value |
|
||||
/// |-----------------|------------|--------------|
|
||||
/// | Missing | BeingDeleted | Ok(None) |
|
||||
/// | BeingDeleted | BeingDeleted | Err(None) |
|
||||
/// | Closing | Closing | Err(Some(reopen)) |
|
||||
/// | Available | BeingDeleted | Ok(Some(env_closing_event)) |
|
||||
pub fn start_deletion(
|
||||
&mut self,
|
||||
uuid: &Uuid,
|
||||
) -> std::result::Result<Option<EnvClosingEvent>, Option<ClosingIndex>> {
|
||||
if let Some(index) = self.available.remove(uuid) {
|
||||
return Ok(Some(index.prepare_for_closing()));
|
||||
}
|
||||
match self.unavailable.remove(uuid) {
|
||||
Some(Some(reopen)) => Err(Some(reopen)),
|
||||
Some(None) => Err(None),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks that an index finished deletion.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|------------|
|
||||
/// | Missing | Missing |
|
||||
/// | BeingDeleted | Missing |
|
||||
/// | Closing | panics |
|
||||
/// | Available | panics |
|
||||
pub fn end_deletion(&mut self, uuid: &Uuid) {
|
||||
assert!(
|
||||
self.available.get(uuid).is_none(),
|
||||
"Attempt to finish deletion of an index that was not being deleted"
|
||||
);
|
||||
// Do not panic if the index was Missing or BeingDeleted
|
||||
assert!(
|
||||
!matches!(self.unavailable.remove(uuid), Some(Some(_))),
|
||||
"Attempt to finish deletion of an index that was being closed"
|
||||
);
|
||||
}
|
||||
signal
|
||||
}
|
||||
|
||||
/// Removes an unavailable index from the map.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - if the target index was not unavailable.
|
||||
pub fn remove(&mut self, uuid: &Uuid) -> Option<ClosingSignal> {
|
||||
self.pop_if_unavailable(uuid).expect("The index could not be found")
|
||||
}
|
||||
/// Create or open an index in the specified path.
|
||||
/// The path *must* exist or an error will be thrown.
|
||||
fn create_or_open_index(
|
||||
path: &Path,
|
||||
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||
map_size: usize,
|
||||
) -> Result<Index> {
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(clamp_to_page_size(map_size));
|
||||
options.max_readers(1024);
|
||||
|
||||
fn get_if_unavailable(&self, uuid: &Uuid) -> Option<Option<ClosingSignal>> {
|
||||
self.unavailable
|
||||
.iter()
|
||||
.find_map(|(candidate_uuid, signal)| (uuid == candidate_uuid).then_some(signal.clone()))
|
||||
}
|
||||
|
||||
fn pop_if_unavailable(&mut self, uuid: &Uuid) -> Option<Option<ClosingSignal>> {
|
||||
self.unavailable
|
||||
.iter()
|
||||
.position(|(candidate_uuid, _)| candidate_uuid == uuid)
|
||||
.map(|index| self.unavailable.swap_remove(index).1)
|
||||
if let Some((created, updated)) = date {
|
||||
Ok(Index::new_with_creation_dates(options, path, created, updated)?)
|
||||
} else {
|
||||
Ok(Index::new(options, path)?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,10 +337,12 @@ impl IndexMap {
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Clone)]
|
||||
pub enum IndexStatus {
|
||||
/// Not currently in the index map.
|
||||
Missing,
|
||||
/// Do not insert it back in the index map as it is currently being deleted.
|
||||
DefinitelyUnavailable,
|
||||
BeingDeleted,
|
||||
/// Temporarily do not insert the index in the index map as it is currently being resized/evicted from the map.
|
||||
TemporarilyUnavailable(ClosingSignal),
|
||||
Closing(index_map::ClosingIndex),
|
||||
/// You can use the index without worrying about anything.
|
||||
Available(Index),
|
||||
}
|
||||
@ -208,25 +366,6 @@ impl IndexMapper {
|
||||
})
|
||||
}
|
||||
|
||||
/// Create or open an index in the specified path.
|
||||
/// The path *must* exists or an error will be thrown.
|
||||
fn create_or_open_index(
|
||||
&self,
|
||||
path: &Path,
|
||||
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||
map_size: usize,
|
||||
) -> Result<Index> {
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(clamp_to_page_size(map_size));
|
||||
options.max_readers(1024);
|
||||
|
||||
if let Some((created, updated)) = date {
|
||||
Ok(Index::new_with_creation_dates(options, path, created, updated)?)
|
||||
} else {
|
||||
Ok(Index::new(options, path)?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create the index.
|
||||
pub fn create_index(
|
||||
&self,
|
||||
@ -246,16 +385,17 @@ impl IndexMapper {
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
fs::create_dir_all(&index_path)?;
|
||||
|
||||
let index =
|
||||
self.create_or_open_index(&index_path, date, self.index_base_map_size)?;
|
||||
|
||||
wtxn.commit()?;
|
||||
// Error if the UUIDv4 somehow already exists in the map, since it should be fresh.
|
||||
// This is very unlikely to happen in practice.
|
||||
// TODO: it would be better to lazily create the index. But we need an Index::open function for milli.
|
||||
if self.index_map.write().unwrap().insert(&uuid, index.clone()).is_some() {
|
||||
panic!("Uuid v4 conflict: index with UUID {uuid} already exists.")
|
||||
}
|
||||
let index = self.index_map.write().unwrap().create(
|
||||
&uuid,
|
||||
&index_path,
|
||||
date,
|
||||
self.index_base_map_size,
|
||||
)?;
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
@ -280,20 +420,20 @@ impl IndexMapper {
|
||||
// We remove the index from the in-memory index map.
|
||||
let closing_event = loop {
|
||||
let mut lock = self.index_map.write().unwrap();
|
||||
let resize_operation = match lock.get(&uuid) {
|
||||
Some(Available(index)) => {
|
||||
lock.make_unavailable(&uuid, Unavailable::Definitely);
|
||||
break index.prepare_for_closing();
|
||||
match lock.start_deletion(&uuid) {
|
||||
Ok(env_closing) => break env_closing,
|
||||
Err(Some(reopen)) => {
|
||||
// drop the lock here so that we don't synchronously wait for the index to close.
|
||||
drop(lock);
|
||||
tries += 1;
|
||||
if tries >= 100 {
|
||||
panic!("Too many attempts to close index {name} prior to deletion.")
|
||||
}
|
||||
let reopen = reopen.wait_timeout(Duration::from_secs(6));
|
||||
reopen.close(&mut self.index_map.write().unwrap());
|
||||
continue;
|
||||
}
|
||||
Some(TemporarilyUnavailable(resize_operation)) => resize_operation.clone(),
|
||||
Some(DefinitelyUnavailable) | None => return Ok(()),
|
||||
};
|
||||
// Avoiding deadlock: we drop the lock before waiting on the resize operation.
|
||||
drop(lock);
|
||||
resize_operation.wait_timeout(Duration::from_secs(6));
|
||||
tries += 1;
|
||||
if tries > 100 {
|
||||
panic!("Too many spurious wakeups while waiting on a resize operation.")
|
||||
Err(None) => return Ok(()),
|
||||
}
|
||||
};
|
||||
|
||||
@ -304,8 +444,10 @@ impl IndexMapper {
|
||||
.name(String::from("index_deleter"))
|
||||
.spawn(move || {
|
||||
// We first wait to be sure that the previously opened index is effectively closed.
|
||||
// This can take a lot of time, this is why we do that in a seperate thread.
|
||||
closing_event.wait();
|
||||
// This can take a lot of time, this is why we do that in a separate thread.
|
||||
if let Some(closing_event) = closing_event {
|
||||
closing_event.wait();
|
||||
}
|
||||
|
||||
// Then we remove the content from disk.
|
||||
if let Err(e) = fs::remove_dir_all(&index_path) {
|
||||
@ -316,7 +458,7 @@ impl IndexMapper {
|
||||
}
|
||||
|
||||
// Finally we remove the entry from the index map.
|
||||
index_map.write().unwrap().remove(&uuid);
|
||||
index_map.write().unwrap().end_deletion(&uuid);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@ -336,58 +478,15 @@ impl IndexMapper {
|
||||
/// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the
|
||||
/// in memory hash map.
|
||||
pub fn resize_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> {
|
||||
// fixme: factor to a function?
|
||||
let uuid = self
|
||||
.index_mapping
|
||||
.get(rtxn, name)?
|
||||
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
||||
|
||||
// We remove the index from the in-memory index map.
|
||||
let Some(index) = self.index_map.write().unwrap().make_unavailable(&uuid, Unavailable::Temporarily) else { return Ok(()) };
|
||||
self.index_map.write().unwrap().close_for_resize(&uuid, self.index_growth_amount);
|
||||
|
||||
let resize_succeeded = (move || {
|
||||
let current_size = index.map_size()?;
|
||||
let new_size = current_size + self.index_growth_amount;
|
||||
let closing_event = index.prepare_for_closing();
|
||||
|
||||
log::debug!("Waiting for index {name} to close");
|
||||
|
||||
if !closing_event.wait_timeout(std::time::Duration::from_secs(600)) {
|
||||
// fail after 10 minutes waiting
|
||||
panic!("Could not resize index {name} (unable to close it)");
|
||||
}
|
||||
|
||||
log::info!("Resized index {name} from {current_size} to {new_size} bytes");
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
let index = self.create_or_open_index(&index_path, None, new_size)?;
|
||||
Ok(index)
|
||||
})();
|
||||
|
||||
// Put the map back to a consistent state.
|
||||
// Even if there was an error we don't want to leave the map in an inconsistent state as it would cause
|
||||
// deadlocks.
|
||||
let mut lock = self.index_map.write().unwrap();
|
||||
let (resize_operation, resize_succeeded) = match resize_succeeded {
|
||||
Ok(index) => {
|
||||
// insert the resized index
|
||||
let resize_operation = lock.restore(&uuid, index);
|
||||
(resize_operation, Ok(()))
|
||||
}
|
||||
Err(error) => {
|
||||
// there was an error, not much we can do... delete the index from the in-memory map to prevent future errors
|
||||
let resize_operation = lock.remove(&uuid).expect("The index was not being resized");
|
||||
(resize_operation, Err(error))
|
||||
}
|
||||
};
|
||||
|
||||
// drop the lock before signaling completion so that other threads don't immediately await on the lock after waking up.
|
||||
drop(lock);
|
||||
let ClosingSignal::Resized(resize_operation) = resize_operation else {
|
||||
panic!("Index was closed while being resized") };
|
||||
|
||||
resize_operation.signal();
|
||||
|
||||
resize_succeeded
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return an index, may open it if it wasn't already opened.
|
||||
@ -407,46 +506,40 @@ impl IndexMapper {
|
||||
let index = self.index_map.read().unwrap().get(&uuid);
|
||||
|
||||
match index {
|
||||
Some(Available(index)) => break index,
|
||||
Some(TemporarilyUnavailable(ref closing_signal)) => {
|
||||
Available(index) => break index,
|
||||
Closing(reopen) => {
|
||||
// Avoiding deadlocks: no lock taken while doing this operation.
|
||||
closing_signal.wait_timeout(Duration::from_secs(6));
|
||||
let reopen = reopen.wait_timeout(Duration::from_secs(6));
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
// take the lock to reopen the environment.
|
||||
reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)?;
|
||||
continue;
|
||||
}
|
||||
Some(DefinitelyUnavailable) => return Err(Error::IndexNotFound(name.to_string())),
|
||||
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
||||
// since we're lazy, it's possible that the index has not been opened yet.
|
||||
None => {
|
||||
Missing => {
|
||||
let mut index_map = self.index_map.write().unwrap();
|
||||
// between the read lock and the write lock it's not impossible
|
||||
// that someone already opened the index (eg if two searches happen
|
||||
// at the same time), thus before opening it we check a second time
|
||||
// if it's not already there.
|
||||
match index_map.get(&uuid) {
|
||||
None => {
|
||||
Missing => {
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
|
||||
let index = self.create_or_open_index(
|
||||
break index_map.create(
|
||||
&uuid,
|
||||
&index_path,
|
||||
None,
|
||||
self.index_base_map_size,
|
||||
)?;
|
||||
assert!(
|
||||
index_map.insert(&uuid, index.clone()).is_none(),
|
||||
"Inconsistent map state"
|
||||
);
|
||||
break index;
|
||||
}
|
||||
Some(Available(index)) => break index,
|
||||
Some(TemporarilyUnavailable(resize_operation)) => {
|
||||
// Avoiding the deadlock: we drop the lock before waiting
|
||||
let resize_operation = resize_operation.clone();
|
||||
drop(index_map);
|
||||
resize_operation.wait_timeout(Duration::from_secs(6));
|
||||
Available(index) => break index,
|
||||
Closing(_) => {
|
||||
// the reopening will be handled in the next loop operation
|
||||
continue;
|
||||
}
|
||||
Some(DefinitelyUnavailable) => {
|
||||
return Err(Error::IndexNotFound(name.to_string()))
|
||||
}
|
||||
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user