mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-06-17 10:02:32 +00:00
move index_map to file
This commit is contained in:
parent
431782f3ee
commit
71e7900c67
370
index-scheduler/src/index_mapper/index_map.rs
Normal file
370
index-scheduler/src/index_mapper/index_map.rs
Normal file
@ -0,0 +1,370 @@
|
|||||||
|
/// the map size to use when we don't succeed in reading it in indexes.
|
||||||
|
const DEFAULT_MAP_SIZE: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
|
||||||
|
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions};
|
||||||
|
use meilisearch_types::milli::Index;
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing};
|
||||||
|
use crate::lru::{InsertionOutcome, LruMap};
|
||||||
|
use crate::{clamp_to_page_size, Result};
|
||||||
|
|
||||||
|
/// Keep an internally consistent view of the open indexes in memory.
|
||||||
|
///
|
||||||
|
/// This view is made of an LRU cache that will evict the least frequently used indexes when new indexes are opened.
|
||||||
|
/// Indexes that are being closed (for resizing or due to cache eviction) or deleted cannot be evicted from the cache and
|
||||||
|
/// are stored separately.
|
||||||
|
///
|
||||||
|
/// This view provides operations to change the state of the index as it is known in memory:
|
||||||
|
/// open an index (making it available for queries), close an index (specifying the new size it should be opened with),
|
||||||
|
/// delete an index.
|
||||||
|
///
|
||||||
|
/// External consistency with the other bits of data of an index is provided by the `IndexMapper` parent structure.
|
||||||
|
pub struct IndexMap {
|
||||||
|
/// A LRU map of indexes that are in the open state and available for queries.
|
||||||
|
available: LruMap<Uuid, Index>,
|
||||||
|
/// A map of indexes that are not available for queries, either because they are being deleted
|
||||||
|
/// or because they are being closed.
|
||||||
|
///
|
||||||
|
/// If they are being deleted, the UUID points to `None`.
|
||||||
|
unavailable: BTreeMap<Uuid, Option<ClosingIndex>>,
|
||||||
|
|
||||||
|
/// A monotonically increasing generation number, used to differentiate between multiple successive index closing requests.
|
||||||
|
///
|
||||||
|
/// Because multiple readers could be waiting on an index to close, the following could theoretically happen:
|
||||||
|
///
|
||||||
|
/// 1. Multiple readers wait for the index closing to occur.
|
||||||
|
/// 2. One of them "wins the race", takes the lock and then removes the index that finished closing from the map.
|
||||||
|
/// 3. The index is reopened, but must be closed again (such as being resized again).
|
||||||
|
/// 4. One reader that "lost the race" in (2) wakes up and tries to take the lock and remove the index from the map.
|
||||||
|
///
|
||||||
|
/// In that situation, the index may or may not have finished closing. The `generation` field allows to remember which
|
||||||
|
/// closing request was made, so the reader that "lost the race" has the old generation and will need to wait again for the index
|
||||||
|
/// to close.
|
||||||
|
generation: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ClosingIndex {
|
||||||
|
uuid: Uuid,
|
||||||
|
closing_event: EnvClosingEvent,
|
||||||
|
map_size: usize,
|
||||||
|
generation: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClosingIndex {
|
||||||
|
/// Waits for the index to be definitely closed.
|
||||||
|
///
|
||||||
|
/// To avoid blocking, users should relinquish their locks to the IndexMap before calling this function.
|
||||||
|
///
|
||||||
|
/// After the index is physically closed, the in memory map must still be updated to take this into account.
|
||||||
|
/// To do so, a `ReopenableIndex` is returned, that can be used to either definitely close or definitely open
|
||||||
|
/// the index without waiting anymore.
|
||||||
|
pub fn wait_timeout(self, timeout: Duration) -> Option<ReopenableIndex> {
|
||||||
|
self.closing_event.wait_timeout(timeout).then_some(ReopenableIndex {
|
||||||
|
uuid: self.uuid,
|
||||||
|
map_size: self.map_size,
|
||||||
|
generation: self.generation,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ReopenableIndex {
|
||||||
|
uuid: Uuid,
|
||||||
|
map_size: usize,
|
||||||
|
generation: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReopenableIndex {
|
||||||
|
/// Attempts to reopen the index, which can result in the index being reopened again or not
|
||||||
|
/// (e.g. if another thread already opened and closed the index again).
|
||||||
|
///
|
||||||
|
/// Use get again on the IndexMap to get the updated status.
|
||||||
|
///
|
||||||
|
/// Fails if the underlying index creation fails.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|----------------------------------------------|
|
||||||
|
/// | Missing | Missing |
|
||||||
|
/// | BeingDeleted | BeingDeleted |
|
||||||
|
/// | Closing | Available or Closing depending on generation |
|
||||||
|
/// | Available | Available |
|
||||||
|
///
|
||||||
|
pub fn reopen(self, map: &mut IndexMap, path: &Path) -> Result<()> {
|
||||||
|
if let Closing(reopen) = map.get(&self.uuid) {
|
||||||
|
if reopen.generation != self.generation {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
map.unavailable.remove(&self.uuid);
|
||||||
|
map.create(&self.uuid, path, None, self.map_size)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to close the index, which may or may not result in the index being closed
|
||||||
|
/// (e.g. if another thread already reopened the index again).
|
||||||
|
///
|
||||||
|
/// Use get again on the IndexMap to get the updated status.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|--------------------------------------------|
|
||||||
|
/// | Missing | Missing |
|
||||||
|
/// | BeingDeleted | BeingDeleted |
|
||||||
|
/// | Closing | Missing or Closing depending on generation |
|
||||||
|
/// | Available | Available |
|
||||||
|
pub fn close(self, map: &mut IndexMap) {
|
||||||
|
if let Closing(reopen) = map.get(&self.uuid) {
|
||||||
|
if reopen.generation != self.generation {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
map.unavailable.remove(&self.uuid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IndexMap {
|
||||||
|
pub fn new(cap: usize) -> IndexMap {
|
||||||
|
Self { unavailable: Default::default(), available: LruMap::new(cap), generation: 0 }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the current status of an index in the map.
|
||||||
|
///
|
||||||
|
/// If the index is available it can be accessed from the returned status.
|
||||||
|
pub fn get(&self, uuid: &Uuid) -> IndexStatus {
|
||||||
|
self.available
|
||||||
|
.get(uuid)
|
||||||
|
.map(|index| Available(index.clone()))
|
||||||
|
.unwrap_or_else(|| self.get_unavailable(uuid))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_unavailable(&self, uuid: &Uuid) -> IndexStatus {
|
||||||
|
match self.unavailable.get(uuid) {
|
||||||
|
Some(Some(reopen)) => Closing(reopen.clone()),
|
||||||
|
Some(None) => BeingDeleted,
|
||||||
|
None => Missing,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to create a new index that wasn't existing before.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|------------|
|
||||||
|
/// | Missing | Available |
|
||||||
|
/// | BeingDeleted | panics |
|
||||||
|
/// | Closing | panics |
|
||||||
|
/// | Available | panics |
|
||||||
|
///
|
||||||
|
pub fn create(
|
||||||
|
&mut self,
|
||||||
|
uuid: &Uuid,
|
||||||
|
path: &Path,
|
||||||
|
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||||
|
map_size: usize,
|
||||||
|
) -> Result<Index> {
|
||||||
|
if !matches!(self.get_unavailable(uuid), Missing) {
|
||||||
|
panic!("Attempt to open an index that was unavailable");
|
||||||
|
}
|
||||||
|
let index = create_or_open_index(path, date, map_size)?;
|
||||||
|
match self.available.insert(*uuid, index.clone()) {
|
||||||
|
InsertionOutcome::InsertedNew => (),
|
||||||
|
InsertionOutcome::Evicted(evicted_uuid, evicted_index) => {
|
||||||
|
self.close(evicted_uuid, evicted_index, 0);
|
||||||
|
}
|
||||||
|
InsertionOutcome::Replaced(_) => {
|
||||||
|
panic!("Attempt to open an index that was already opened")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(index)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Increases the current generation. See documentation for this field.
|
||||||
|
///
|
||||||
|
/// In the unlikely event that the 2^64 generations would have been exhausted, we simply wrap-around.
|
||||||
|
///
|
||||||
|
/// For this to cause an issue, one should be able to stop a reader in time after it got a `ReopenableIndex` and before it takes the lock
|
||||||
|
/// to remove it from the unavailable map, and keep the reader in this frozen state for 2^64 closing of other indexes.
|
||||||
|
///
|
||||||
|
/// This seems overwhelmingly impossible to achieve in practice.
|
||||||
|
fn next_generation(&mut self) -> usize {
|
||||||
|
self.generation = self.generation.wrapping_add(1);
|
||||||
|
self.generation
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to close an index.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|---------------|
|
||||||
|
/// | Missing | Missing |
|
||||||
|
/// | BeingDeleted | BeingDeleted |
|
||||||
|
/// | Closing | Closing |
|
||||||
|
/// | Available | Closing |
|
||||||
|
///
|
||||||
|
pub fn close_for_resize(&mut self, uuid: &Uuid, map_size_growth: usize) {
|
||||||
|
let Some(index) = self.available.remove(uuid) else { return; };
|
||||||
|
self.close(*uuid, index, map_size_growth);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&mut self, uuid: Uuid, index: Index, map_size_growth: usize) {
|
||||||
|
let map_size = index.map_size().unwrap_or(DEFAULT_MAP_SIZE) + map_size_growth;
|
||||||
|
let closing_event = index.prepare_for_closing();
|
||||||
|
let generation = self.next_generation();
|
||||||
|
self.unavailable
|
||||||
|
.insert(uuid, Some(ClosingIndex { uuid, closing_event, map_size, generation }));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to delete and index.
|
||||||
|
///
|
||||||
|
/// `end_deletion` must be called just after.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status | Return value |
|
||||||
|
/// |-----------------|--------------|-----------------------------|
|
||||||
|
/// | Missing | BeingDeleted | Ok(None) |
|
||||||
|
/// | BeingDeleted | BeingDeleted | Err(None) |
|
||||||
|
/// | Closing | Closing | Err(Some(reopen)) |
|
||||||
|
/// | Available | BeingDeleted | Ok(Some(env_closing_event)) |
|
||||||
|
pub fn start_deletion(
|
||||||
|
&mut self,
|
||||||
|
uuid: &Uuid,
|
||||||
|
) -> std::result::Result<Option<EnvClosingEvent>, Option<ClosingIndex>> {
|
||||||
|
if let Some(index) = self.available.remove(uuid) {
|
||||||
|
self.unavailable.insert(*uuid, None);
|
||||||
|
return Ok(Some(index.prepare_for_closing()));
|
||||||
|
}
|
||||||
|
match self.unavailable.remove(uuid) {
|
||||||
|
Some(Some(reopen)) => Err(Some(reopen)),
|
||||||
|
Some(None) => Err(None),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Marks that an index deletion finished.
|
||||||
|
///
|
||||||
|
/// Must be used after calling `start_deletion`.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|------------|
|
||||||
|
/// | Missing | Missing |
|
||||||
|
/// | BeingDeleted | Missing |
|
||||||
|
/// | Closing | panics |
|
||||||
|
/// | Available | panics |
|
||||||
|
pub fn end_deletion(&mut self, uuid: &Uuid) {
|
||||||
|
assert!(
|
||||||
|
self.available.get(uuid).is_none(),
|
||||||
|
"Attempt to finish deletion of an index that was not being deleted"
|
||||||
|
);
|
||||||
|
// Do not panic if the index was Missing or BeingDeleted
|
||||||
|
assert!(
|
||||||
|
!matches!(self.unavailable.remove(uuid), Some(Some(_))),
|
||||||
|
"Attempt to finish deletion of an index that was being closed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create or open an index in the specified path.
|
||||||
|
/// The path *must* exist or an error will be thrown.
|
||||||
|
fn create_or_open_index(
|
||||||
|
path: &Path,
|
||||||
|
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||||
|
map_size: usize,
|
||||||
|
) -> Result<Index> {
|
||||||
|
let mut options = EnvOpenOptions::new();
|
||||||
|
options.map_size(clamp_to_page_size(map_size));
|
||||||
|
options.max_readers(1024);
|
||||||
|
|
||||||
|
if let Some((created, updated)) = date {
|
||||||
|
Ok(Index::new_with_creation_dates(options, path, created, updated)?)
|
||||||
|
} else {
|
||||||
|
Ok(Index::new(options, path)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Putting the tests of the LRU down there so we have access to the cache's private members
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
|
||||||
|
use meilisearch_types::heed::Env;
|
||||||
|
use meilisearch_types::Index;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use super::super::IndexMapper;
|
||||||
|
use crate::tests::IndexSchedulerHandle;
|
||||||
|
use crate::utils::clamp_to_page_size;
|
||||||
|
use crate::IndexScheduler;
|
||||||
|
|
||||||
|
impl IndexMapper {
|
||||||
|
fn test() -> (Self, Env, IndexSchedulerHandle) {
|
||||||
|
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
|
||||||
|
(index_scheduler.index_mapper, index_scheduler.env, handle)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_first_unavailable(mapper: &IndexMapper, expected_uuid: Uuid, is_closing: bool) {
|
||||||
|
let index_map = mapper.index_map.read().unwrap();
|
||||||
|
let (uuid, state) = index_map.unavailable.first_key_value().unwrap();
|
||||||
|
assert_eq!(uuid, &expected_uuid);
|
||||||
|
assert_eq!(state.is_some(), is_closing);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn evict_indexes() {
|
||||||
|
let (mapper, env, _handle) = IndexMapper::test();
|
||||||
|
let mut uuids = vec![];
|
||||||
|
// LRU cap + 1
|
||||||
|
for i in 0..(5 + 1) {
|
||||||
|
let index_name = format!("index-{i}");
|
||||||
|
let wtxn = env.write_txn().unwrap();
|
||||||
|
mapper.create_index(wtxn, &index_name, None).unwrap();
|
||||||
|
let txn = env.read_txn().unwrap();
|
||||||
|
uuids.push(mapper.index_mapping.get(&txn, &index_name).unwrap().unwrap());
|
||||||
|
}
|
||||||
|
// index-0 was evicted
|
||||||
|
check_first_unavailable(&mapper, uuids[0], true);
|
||||||
|
|
||||||
|
// get back the evicted index
|
||||||
|
let wtxn = env.write_txn().unwrap();
|
||||||
|
mapper.create_index(wtxn, "index-0", None).unwrap();
|
||||||
|
|
||||||
|
// Least recently used is now index-1
|
||||||
|
check_first_unavailable(&mapper, uuids[1], true);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn resize_index() {
|
||||||
|
let (mapper, env, _handle) = IndexMapper::test();
|
||||||
|
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
|
||||||
|
assert_index_size(index, mapper.index_base_map_size);
|
||||||
|
|
||||||
|
mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap();
|
||||||
|
|
||||||
|
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
|
||||||
|
assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount);
|
||||||
|
|
||||||
|
mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap();
|
||||||
|
|
||||||
|
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
|
||||||
|
assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount * 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn assert_index_size(index: Index, expected: usize) {
|
||||||
|
let expected = clamp_to_page_size(expected);
|
||||||
|
let index_map_size = index.map_size().unwrap();
|
||||||
|
assert_eq!(index_map_size, expected);
|
||||||
|
}
|
||||||
|
}
|
@ -16,6 +16,8 @@ use self::IndexStatus::{Available, BeingDeleted, Closing, Missing};
|
|||||||
use crate::uuid_codec::UuidCodec;
|
use crate::uuid_codec::UuidCodec;
|
||||||
use crate::{Error, Result};
|
use crate::{Error, Result};
|
||||||
|
|
||||||
|
mod index_map;
|
||||||
|
|
||||||
const INDEX_MAPPING: &str = "index-mapping";
|
const INDEX_MAPPING: &str = "index-mapping";
|
||||||
|
|
||||||
/// Structure managing meilisearch's indexes.
|
/// Structure managing meilisearch's indexes.
|
||||||
@ -60,379 +62,6 @@ pub struct IndexMapper {
|
|||||||
pub indexer_config: Arc<IndexerConfig>,
|
pub indexer_config: Arc<IndexerConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
mod index_map {
|
|
||||||
/// the map size to use when we don't succeed in reading it in indexes.
|
|
||||||
const DEFAULT_MAP_SIZE: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
|
|
||||||
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use std::path::Path;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions};
|
|
||||||
use meilisearch_types::milli::Index;
|
|
||||||
use time::OffsetDateTime;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing};
|
|
||||||
use crate::lru::{InsertionOutcome, LruMap};
|
|
||||||
use crate::{clamp_to_page_size, Result};
|
|
||||||
|
|
||||||
/// Keep an internally consistent view of the open indexes in memory.
|
|
||||||
///
|
|
||||||
/// This view is made of an LRU cache that will evict the least frequently used indexes when new indexes are opened.
|
|
||||||
/// Indexes that are being closed (for resizing or due to cache eviction) or deleted cannot be evicted from the cache and
|
|
||||||
/// are stored separately.
|
|
||||||
///
|
|
||||||
/// This view provides operations to change the state of the index as it is known in memory:
|
|
||||||
/// open an index (making it available for queries), close an index (specifying the new size it should be opened with),
|
|
||||||
/// delete an index.
|
|
||||||
///
|
|
||||||
/// External consistency with the other bits of data of an index is provided by the `IndexMapper` parent structure.
|
|
||||||
pub struct IndexMap {
|
|
||||||
/// A LRU map of indexes that are in the open state and available for queries.
|
|
||||||
available: LruMap<Uuid, Index>,
|
|
||||||
/// A map of indexes that are not available for queries, either because they are being deleted
|
|
||||||
/// or because they are being closed.
|
|
||||||
///
|
|
||||||
/// If they are being deleted, the UUID points to `None`.
|
|
||||||
unavailable: BTreeMap<Uuid, Option<ClosingIndex>>,
|
|
||||||
|
|
||||||
/// A monotonically increasing generation number, used to differentiate between multiple successive index closing requests.
|
|
||||||
///
|
|
||||||
/// Because multiple readers could be waiting on an index to close, the following could theoretically happen:
|
|
||||||
///
|
|
||||||
/// 1. Multiple readers wait for the index closing to occur.
|
|
||||||
/// 2. One of them "wins the race", takes the lock and then removes the index that finished closing from the map.
|
|
||||||
/// 3. The index is reopened, but must be closed again (such as being resized again).
|
|
||||||
/// 4. One reader that "lost the race" in (2) wakes up and tries to take the lock and remove the index from the map.
|
|
||||||
///
|
|
||||||
/// In that situation, the index may or may not have finished closing. The `generation` field allows to remember which
|
|
||||||
/// closing request was made, so the reader that "lost the race" has the old generation and will need to wait again for the index
|
|
||||||
/// to close.
|
|
||||||
generation: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct ClosingIndex {
|
|
||||||
uuid: Uuid,
|
|
||||||
closing_event: EnvClosingEvent,
|
|
||||||
map_size: usize,
|
|
||||||
generation: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ClosingIndex {
|
|
||||||
/// Waits for the index to be definitely closed.
|
|
||||||
///
|
|
||||||
/// To avoid blocking, users should relinquish their locks to the IndexMap before calling this function.
|
|
||||||
///
|
|
||||||
/// After the index is physically closed, the in memory map must still be updated to take this into account.
|
|
||||||
/// To do so, a `ReopenableIndex` is returned, that can be used to either definitely close or definitely open
|
|
||||||
/// the index without waiting anymore.
|
|
||||||
pub fn wait_timeout(self, timeout: Duration) -> Option<ReopenableIndex> {
|
|
||||||
self.closing_event.wait_timeout(timeout).then_some(ReopenableIndex {
|
|
||||||
uuid: self.uuid,
|
|
||||||
map_size: self.map_size,
|
|
||||||
generation: self.generation,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ReopenableIndex {
|
|
||||||
uuid: Uuid,
|
|
||||||
map_size: usize,
|
|
||||||
generation: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ReopenableIndex {
|
|
||||||
/// Attempts to reopen the index, which can result in the index being reopened again or not
|
|
||||||
/// (e.g. if another thread already opened and closed the index again).
|
|
||||||
///
|
|
||||||
/// Use get again on the IndexMap to get the updated status.
|
|
||||||
///
|
|
||||||
/// Fails if the underlying index creation fails.
|
|
||||||
///
|
|
||||||
/// # Status table
|
|
||||||
///
|
|
||||||
/// | Previous Status | New Status |
|
|
||||||
/// |-----------------|------------|
|
|
||||||
/// | Missing | Missing |
|
|
||||||
/// | BeingDeleted | BeingDeleted |
|
|
||||||
/// | Closing | Available or Closing depending on generation |
|
|
||||||
/// | Available | Available |
|
|
||||||
///
|
|
||||||
pub fn reopen(self, map: &mut IndexMap, path: &Path) -> Result<()> {
|
|
||||||
if let Closing(reopen) = map.get(&self.uuid) {
|
|
||||||
if reopen.generation != self.generation {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
map.unavailable.remove(&self.uuid);
|
|
||||||
map.create(&self.uuid, path, None, self.map_size)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempts to close the index, which may or may not result in the index being closed
|
|
||||||
/// (e.g. if another thread already reopened the index again).
|
|
||||||
///
|
|
||||||
/// Use get again on the IndexMap to get the updated status.
|
|
||||||
///
|
|
||||||
/// # Status table
|
|
||||||
///
|
|
||||||
/// | Previous Status | New Status |
|
|
||||||
/// |-----------------|------------|
|
|
||||||
/// | Missing | Missing |
|
|
||||||
/// | BeingDeleted | BeingDeleted |
|
|
||||||
/// | Closing | Missing or Closing depending on generation |
|
|
||||||
/// | Available | Available |
|
|
||||||
pub fn close(self, map: &mut IndexMap) {
|
|
||||||
if let Closing(reopen) = map.get(&self.uuid) {
|
|
||||||
if reopen.generation != self.generation {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
map.unavailable.remove(&self.uuid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexMap {
|
|
||||||
pub fn new(cap: usize) -> IndexMap {
|
|
||||||
Self { unavailable: Default::default(), available: LruMap::new(cap), generation: 0 }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Gets the current status of an index in the map.
|
|
||||||
///
|
|
||||||
/// If the index is available it can be accessed from the returned status.
|
|
||||||
pub fn get(&self, uuid: &Uuid) -> IndexStatus {
|
|
||||||
self.available
|
|
||||||
.get(uuid)
|
|
||||||
.map(|index| Available(index.clone()))
|
|
||||||
.unwrap_or_else(|| self.get_unavailable(uuid))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_unavailable(&self, uuid: &Uuid) -> IndexStatus {
|
|
||||||
match self.unavailable.get(uuid) {
|
|
||||||
Some(Some(reopen)) => Closing(reopen.clone()),
|
|
||||||
Some(None) => BeingDeleted,
|
|
||||||
None => Missing,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempts to create a new index that wasn't existing before.
|
|
||||||
///
|
|
||||||
/// # Status table
|
|
||||||
///
|
|
||||||
/// | Previous Status | New Status |
|
|
||||||
/// |-----------------|------------|
|
|
||||||
/// | Missing | Available |
|
|
||||||
/// | BeingDeleted | panics |
|
|
||||||
/// | Closing | panics |
|
|
||||||
/// | Available | panics |
|
|
||||||
///
|
|
||||||
pub fn create(
|
|
||||||
&mut self,
|
|
||||||
uuid: &Uuid,
|
|
||||||
path: &Path,
|
|
||||||
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
|
||||||
map_size: usize,
|
|
||||||
) -> Result<Index> {
|
|
||||||
if !matches!(self.get_unavailable(uuid), Missing) {
|
|
||||||
panic!("Attempt to open an index that was unavailable");
|
|
||||||
}
|
|
||||||
let index = create_or_open_index(path, date, map_size)?;
|
|
||||||
match self.available.insert(*uuid, index.clone()) {
|
|
||||||
InsertionOutcome::InsertedNew => (),
|
|
||||||
InsertionOutcome::Evicted(evicted_uuid, evicted_index) => {
|
|
||||||
self.close(evicted_uuid, evicted_index, 0);
|
|
||||||
}
|
|
||||||
InsertionOutcome::Replaced(_) => {
|
|
||||||
panic!("Attempt to open an index that was already opened")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(index)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Increases the current generation. See documentation for this field.
|
|
||||||
///
|
|
||||||
/// In the unlikely event that the 2^64 generations would have been exhausted, we simply wrap-around.
|
|
||||||
///
|
|
||||||
/// For this to cause an issue, one should be able to stop a reader in time after it got a `ReopenableIndex` and before it takes the lock
|
|
||||||
/// to remove it from the unavailable map, and keep the reader in this frozen state for 2^64 closing of other indexes.
|
|
||||||
///
|
|
||||||
/// This seems overwhelmingly impossible to achieve in practice.
|
|
||||||
fn next_generation(&mut self) -> usize {
|
|
||||||
self.generation = self.generation.wrapping_add(1);
|
|
||||||
self.generation
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempts to close an index.
|
|
||||||
///
|
|
||||||
/// # Status table
|
|
||||||
///
|
|
||||||
/// | Previous Status | New Status |
|
|
||||||
/// |-----------------|------------|
|
|
||||||
/// | Missing | Missing |
|
|
||||||
/// | BeingDeleted | BeingDeleted |
|
|
||||||
/// | Closing | Closing |
|
|
||||||
/// | Available | Closing |
|
|
||||||
///
|
|
||||||
pub fn close_for_resize(&mut self, uuid: &Uuid, map_size_growth: usize) {
|
|
||||||
let Some(index) = self.available.remove(uuid) else { return; };
|
|
||||||
self.close(*uuid, index, map_size_growth);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn close(&mut self, uuid: Uuid, index: Index, map_size_growth: usize) {
|
|
||||||
let map_size = index.map_size().unwrap_or(DEFAULT_MAP_SIZE) + map_size_growth;
|
|
||||||
let closing_event = index.prepare_for_closing();
|
|
||||||
let generation = self.next_generation();
|
|
||||||
self.unavailable
|
|
||||||
.insert(uuid, Some(ClosingIndex { uuid, closing_event, map_size, generation }));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempts to delete and index.
|
|
||||||
///
|
|
||||||
/// `end_deletion` must be called just after.
|
|
||||||
///
|
|
||||||
/// # Status table
|
|
||||||
///
|
|
||||||
/// | Previous Status | New Status | Return value |
|
|
||||||
/// |-----------------|------------|--------------|
|
|
||||||
/// | Missing | BeingDeleted | Ok(None) |
|
|
||||||
/// | BeingDeleted | BeingDeleted | Err(None) |
|
|
||||||
/// | Closing | Closing | Err(Some(reopen)) |
|
|
||||||
/// | Available | BeingDeleted | Ok(Some(env_closing_event)) |
|
|
||||||
pub fn start_deletion(
|
|
||||||
&mut self,
|
|
||||||
uuid: &Uuid,
|
|
||||||
) -> std::result::Result<Option<EnvClosingEvent>, Option<ClosingIndex>> {
|
|
||||||
if let Some(index) = self.available.remove(uuid) {
|
|
||||||
self.unavailable.insert(uuid, None);
|
|
||||||
return Ok(Some(index.prepare_for_closing()));
|
|
||||||
}
|
|
||||||
match self.unavailable.remove(uuid) {
|
|
||||||
Some(Some(reopen)) => Err(Some(reopen)),
|
|
||||||
Some(None) => Err(None),
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Marks that an index deletion finished.
|
|
||||||
///
|
|
||||||
/// Must be used after calling `start_deletion`.
|
|
||||||
///
|
|
||||||
/// # Status table
|
|
||||||
///
|
|
||||||
/// | Previous Status | New Status |
|
|
||||||
/// |-----------------|------------|
|
|
||||||
/// | Missing | Missing |
|
|
||||||
/// | BeingDeleted | Missing |
|
|
||||||
/// | Closing | panics |
|
|
||||||
/// | Available | panics |
|
|
||||||
pub fn end_deletion(&mut self, uuid: &Uuid) {
|
|
||||||
assert!(
|
|
||||||
self.available.get(uuid).is_none(),
|
|
||||||
"Attempt to finish deletion of an index that was not being deleted"
|
|
||||||
);
|
|
||||||
// Do not panic if the index was Missing or BeingDeleted
|
|
||||||
assert!(
|
|
||||||
!matches!(self.unavailable.remove(uuid), Some(Some(_))),
|
|
||||||
"Attempt to finish deletion of an index that was being closed"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create or open an index in the specified path.
|
|
||||||
/// The path *must* exist or an error will be thrown.
|
|
||||||
fn create_or_open_index(
|
|
||||||
path: &Path,
|
|
||||||
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
|
||||||
map_size: usize,
|
|
||||||
) -> Result<Index> {
|
|
||||||
let mut options = EnvOpenOptions::new();
|
|
||||||
options.map_size(clamp_to_page_size(map_size));
|
|
||||||
options.max_readers(1024);
|
|
||||||
|
|
||||||
if let Some((created, updated)) = date {
|
|
||||||
Ok(Index::new_with_creation_dates(options, path, created, updated)?)
|
|
||||||
} else {
|
|
||||||
Ok(Index::new(options, path)?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Putting the tests of the LRU down there so we have access to the cache's private members
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
|
|
||||||
use meilisearch_types::heed::Env;
|
|
||||||
use meilisearch_types::Index;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use super::super::IndexMapper;
|
|
||||||
use crate::tests::IndexSchedulerHandle;
|
|
||||||
use crate::utils::clamp_to_page_size;
|
|
||||||
use crate::IndexScheduler;
|
|
||||||
|
|
||||||
impl IndexMapper {
|
|
||||||
fn test() -> (Self, Env, IndexSchedulerHandle) {
|
|
||||||
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
|
|
||||||
(index_scheduler.index_mapper, index_scheduler.env, handle)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_first_unavailable(mapper: &IndexMapper, expected_uuid: Uuid, is_closing: bool) {
|
|
||||||
let index_map = mapper.index_map.read().unwrap();
|
|
||||||
let (uuid, state) = index_map.unavailable.first_key_value().unwrap();
|
|
||||||
assert_eq!(uuid, &expected_uuid);
|
|
||||||
assert_eq!(state.is_some(), is_closing);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn evict_indexes() {
|
|
||||||
let (mapper, env, _handle) = IndexMapper::test();
|
|
||||||
let mut uuids = vec![];
|
|
||||||
// LRU cap + 1
|
|
||||||
for i in 0..(5 + 1) {
|
|
||||||
let index_name = format!("index-{i}");
|
|
||||||
let wtxn = env.write_txn().unwrap();
|
|
||||||
mapper.create_index(wtxn, &index_name, None).unwrap();
|
|
||||||
let txn = env.read_txn().unwrap();
|
|
||||||
uuids.push(mapper.index_mapping.get(&txn, &index_name).unwrap().unwrap());
|
|
||||||
}
|
|
||||||
// index-0 was evicted
|
|
||||||
check_first_unavailable(&mapper, uuids[0], true);
|
|
||||||
|
|
||||||
// get back the evicted index
|
|
||||||
let wtxn = env.write_txn().unwrap();
|
|
||||||
mapper.create_index(wtxn, "index-0", None).unwrap();
|
|
||||||
|
|
||||||
// Least recently used is now index-1
|
|
||||||
check_first_unavailable(&mapper, uuids[1], true);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn resize_index() {
|
|
||||||
let (mapper, env, _handle) = IndexMapper::test();
|
|
||||||
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
|
|
||||||
assert_index_size(index, mapper.index_base_map_size);
|
|
||||||
|
|
||||||
mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap();
|
|
||||||
|
|
||||||
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
|
|
||||||
assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount);
|
|
||||||
|
|
||||||
mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap();
|
|
||||||
|
|
||||||
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
|
|
||||||
assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount * 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn assert_index_size(index: Index, expected: usize) {
|
|
||||||
let expected = clamp_to_page_size(expected);
|
|
||||||
let index_map_size = index.map_size().unwrap();
|
|
||||||
assert_eq!(index_map_size, expected);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Whether the index is available for use or is forbidden to be inserted back in the index map
|
/// Whether the index is available for use or is forbidden to be inserted back in the index map
|
||||||
#[allow(clippy::large_enum_variant)]
|
#[allow(clippy::large_enum_variant)]
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user