Compare commits

...

21 Commits

Author SHA1 Message Date
768f4278fa Fix doc comment
Co-authored-by: Clément Renault <clement@meilisearch.com>
2023-02-09 12:17:07 +01:00
e4ba704c80 Document generation and let it wrap-around as is it safe to do. 2023-02-09 12:14:10 +01:00
a9cb386469 Retry in case of timeout while reopening 2023-02-09 11:54:35 +01:00
b5ed2e36e7 Default to 100 concurrent indexes in unixes, 10 in Windows 2023-02-09 11:39:23 +01:00
ebb1ef2cdd Change default index map size to 10GiB 2023-02-09 11:07:08 +01:00
98a0cc14a3 Add basic tests for index eviction and resize 2023-02-09 10:30:15 +01:00
476a5e2e8f Fix TODO 2023-02-09 10:30:15 +01:00
9328d5abbc Rewrite where evicted indexes are added to the set 2023-02-09 10:30:15 +01:00
2b9a56e192 WIP: evict indexes in unavailable 2023-02-09 10:30:15 +01:00
d4af44cdbf Parameterize growth factor and index count 2023-02-09 10:30:15 +01:00
6ffe71dc3a Use LRU cache 2023-02-09 10:30:15 +01:00
55b4117aa7 Add LruMap 2023-02-09 10:30:15 +01:00
a8fe5fa487 Make sure we don't leave the in memory hashmap in an inconsistent state 2023-02-09 10:30:01 +01:00
1fe5e91d49 Resize indexes when they're full 2023-02-09 10:30:00 +01:00
60f902be87 Add IndexMapper::resize_index fn 2023-02-09 10:30:00 +01:00
dad20fad03 Add IndexStatus::BeingResized 2023-02-09 10:30:00 +01:00
9a61b97b50 IndexScheduler::tick returns a TickOutcome 2023-02-09 10:30:00 +01:00
66fa66f96a create_or_open_index takes a map_size argument 2023-02-09 10:30:00 +01:00
db1408b00d Add Batch::index_uid 2023-02-09 10:30:00 +01:00
0bf35c7cc6 Add prototype to analytics if any 2023-02-08 18:13:41 +01:00
541d38d9d4 If using a prototype, display its name at Meilisearch startup 2023-02-08 18:08:08 +01:00
9 changed files with 866 additions and 93 deletions

View File

@ -169,6 +169,22 @@ impl Batch {
Batch::IndexSwap { task } => vec![task.uid],
}
}
/// Return the index UID associated with this batch
pub fn index_uid(&self) -> Option<&str> {
use Batch::*;
match self {
TaskCancelation { .. }
| TaskDeletion(_)
| SnapshotCreation(_)
| Dump(_)
| IndexSwap { .. } => None,
IndexOperation { op, .. } => Some(op.index_uid()),
IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid, .. } => Some(index_uid),
}
}
}
impl IndexOperation {

View File

@ -1,20 +1,20 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::{fs, thread};
use log::error;
use meilisearch_types::heed::types::Str;
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::Index;
use time::OffsetDateTime;
use uuid::Uuid;
use self::IndexStatus::{Available, BeingDeleted};
use self::index_map::IndexMap;
use self::IndexStatus::{Available, BeingDeleted, Closing, Missing};
use crate::uuid_codec::UuidCodec;
use crate::{clamp_to_page_size, Error, Result};
use crate::{Error, Result};
const INDEX_MAPPING: &str = "index-mapping";
@ -25,26 +25,423 @@ const INDEX_MAPPING: &str = "index-mapping";
/// 2. Opening indexes and storing references to these opened indexes
/// 3. Accessing indexes through their uuid
/// 4. Mapping a user-defined name to each index uuid.
///
/// # Implementation notes
///
/// An index exists as 3 bits of data:
/// 1. The index data on disk, that can exist in 3 states: Missing, Present, or BeingDeleted.
/// 2. The persistent database containing the association between the index' name and its UUID,
/// that can exist in 2 states: Missing or Present.
/// 3. The state of the index in the in-memory `IndexMap`, that can exist in multiple states:
/// - Missing
/// - Available
/// - Closing (because an index needs resizing or was evicted from the cache)
/// - BeingDeleted
///
/// All of this data should be kept consistent between index operations, which is achieved by the `IndexMapper`
/// with the use of the following primitives:
/// - A RwLock on the `IndexMap`.
/// - Transactions on the association database.
/// - ClosingEvent signals emitted when closing an environment.
#[derive(Clone)]
pub struct IndexMapper {
/// Keep track of the opened indexes. Used mainly by the index resolver.
index_map: Arc<RwLock<HashMap<Uuid, IndexStatus>>>,
index_map: Arc<RwLock<IndexMap>>,
/// Map an index name with an index uuid currently available on disk.
pub(crate) index_mapping: Database<Str, UuidCodec>,
/// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf,
index_size: usize,
/// The map size an index is opened with on the first time.
index_base_map_size: usize,
/// The quantity by which the map size of an index is incremented upon reopening, in bytes.
index_growth_amount: usize,
pub indexer_config: Arc<IndexerConfig>,
}
mod index_map {
/// the map size to use when we don't succeed in reading it in indexes.
const DEFAULT_MAP_SIZE: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
use std::collections::BTreeMap;
use std::path::Path;
use std::time::Duration;
use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions};
use meilisearch_types::milli::Index;
use time::OffsetDateTime;
use uuid::Uuid;
use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing};
use crate::lru::{InsertionOutcome, LruMap};
use crate::{clamp_to_page_size, Result};
/// Keep an internally consistent view of the open indexes in memory.
///
/// This view is made of an LRU cache that will evict the least frequently used indexes when new indexes are opened.
/// Indexes that are being closed (for resizing or due to cache eviction) or deleted cannot be evicted from the cache and
/// are stored separately.
///
/// This view provides operations to change the state of the index as it is known in memory:
/// open an index (making it available for queries), close an index (specifying the new size it should be opened with),
/// delete an index.
///
/// External consistency with the other bits of data of an index is provided by the `IndexMapper` parent structure.
pub struct IndexMap {
/// A LRU map of indexes that are in the open state and available for queries.
available: LruMap<Uuid, Index>,
/// A map of indexes that are not available for queries, either because they are being deleted
/// or because they are being closed.
///
/// If they are being deleted, the UUID points to `None`.
unavailable: BTreeMap<Uuid, Option<ClosingIndex>>,
/// A monotonically increasing generation number, used to differentiate between multiple successive index closing requests.
///
/// Because multiple readers could be waiting on an index to close, the following could theoretically happen:
///
/// 1. Multiple readers wait for the index closing to occur.
/// 2. One of them "wins the race", takes the lock and then removes the index that finished closing from the map.
/// 3. The index is reopened, but must be closed again (such as being resized again).
/// 4. One reader that "lost the race" in (2) wakes up and tries to take the lock and remove the index from the map.
///
/// In that situation, the index may or may not have finished closing. The `generation` field allows to remember which
/// closing request was made, so the reader that "lost the race" has the old generation and will need to wait again for the index
/// to close.
generation: usize,
}
#[derive(Clone)]
pub struct ClosingIndex {
uuid: Uuid,
closing_event: EnvClosingEvent,
map_size: usize,
generation: usize,
}
impl ClosingIndex {
/// Waits for the index to be definitely closed.
///
/// To avoid blocking, users should relinquish their locks to the IndexMap before calling this function.
///
/// After the index is physically closed, the in memory map must still be updated to take this into account.
/// To do so, a `ReopenableIndex` is returned, that can be used to either definitely close or definitely open
/// the index without waiting anymore.
pub fn wait_timeout(self, timeout: Duration) -> Option<ReopenableIndex> {
self.closing_event.wait_timeout(timeout).then_some(ReopenableIndex {
uuid: self.uuid,
map_size: self.map_size,
generation: self.generation,
})
}
}
pub struct ReopenableIndex {
uuid: Uuid,
map_size: usize,
generation: usize,
}
impl ReopenableIndex {
/// Attempts to reopen the index, which can result in the index being reopened again or not
/// (e.g. if another thread already opened and closed the index again).
///
/// Use get again on the IndexMap to get the updated status.
///
/// Fails if the underlying index creation fails.
///
/// # Status table
///
/// | Previous Status | New Status |
/// |-----------------|------------|
/// | Missing | Missing |
/// | BeingDeleted | BeingDeleted |
/// | Closing | Available or Closing depending on generation |
/// | Available | Available |
///
pub fn reopen(self, map: &mut IndexMap, path: &Path) -> Result<()> {
if let Closing(reopen) = map.get(&self.uuid) {
if reopen.generation != self.generation {
return Ok(());
}
map.unavailable.remove(&self.uuid);
map.create(&self.uuid, path, None, self.map_size)?;
}
Ok(())
}
/// Attempts to close the index, which may or may not result in the index being closed
/// (e.g. if another thread already reopened the index again).
///
/// Use get again on the IndexMap to get the updated status.
///
/// # Status table
///
/// | Previous Status | New Status |
/// |-----------------|------------|
/// | Missing | Missing |
/// | BeingDeleted | BeingDeleted |
/// | Closing | Missing or Closing depending on generation |
/// | Available | Available |
pub fn close(self, map: &mut IndexMap) {
if let Closing(reopen) = map.get(&self.uuid) {
if reopen.generation != self.generation {
return;
}
map.unavailable.remove(&self.uuid);
}
}
}
impl IndexMap {
pub fn new(cap: usize) -> IndexMap {
Self { unavailable: Default::default(), available: LruMap::new(cap), generation: 0 }
}
/// Gets the current status of an index in the map.
///
/// If the index is available it can be accessed from the returned status.
pub fn get(&self, uuid: &Uuid) -> IndexStatus {
self.available
.get(uuid)
.map(|index| Available(index.clone()))
.unwrap_or_else(|| self.get_unavailable(uuid))
}
fn get_unavailable(&self, uuid: &Uuid) -> IndexStatus {
match self.unavailable.get(uuid) {
Some(Some(reopen)) => Closing(reopen.clone()),
Some(None) => BeingDeleted,
None => Missing,
}
}
/// Attempts to create a new index that wasn't existing before.
///
/// # Status table
///
/// | Previous Status | New Status |
/// |-----------------|------------|
/// | Missing | Available |
/// | BeingDeleted | panics |
/// | Closing | panics |
/// | Available | panics |
///
pub fn create(
&mut self,
uuid: &Uuid,
path: &Path,
date: Option<(OffsetDateTime, OffsetDateTime)>,
map_size: usize,
) -> Result<Index> {
if !matches!(self.get_unavailable(uuid), Missing) {
panic!("Attempt to open an index that was unavailable");
}
let index = create_or_open_index(path, date, map_size)?;
match self.available.insert(*uuid, index.clone()) {
InsertionOutcome::InsertedNew => (),
InsertionOutcome::Evicted(evicted_uuid, evicted_index) => {
self.close(evicted_uuid, evicted_index, 0);
}
InsertionOutcome::Replaced(_) => {
panic!("Attempt to open an index that was already opened")
}
}
Ok(index)
}
/// Increases the current generation. See documentation for this field.
///
/// In the unlikely event that the 2^64 generations would have been exhausted, we simply wrap-around.
///
/// For this to cause an issue, one should be able to stop a reader in time after it got a `ReopenableIndex` and before it takes the lock
/// to remove it from the unavailable map, and keep the reader in this frozen state for 2^64 closing of other indexes.
///
/// This seems overwhelmingly impossible to achieve in practice.
fn next_generation(&mut self) -> usize {
self.generation = self.generation.wrapping_add(1);
self.generation
}
/// Attempts to close an index.
///
/// # Status table
///
/// | Previous Status | New Status |
/// |-----------------|------------|
/// | Missing | Missing |
/// | BeingDeleted | BeingDeleted |
/// | Closing | Closing |
/// | Available | Closing |
///
pub fn close_for_resize(&mut self, uuid: &Uuid, map_size_growth: usize) {
let Some(index) = self.available.remove(uuid) else { return; };
self.close(*uuid, index, map_size_growth);
}
fn close(&mut self, uuid: Uuid, index: Index, map_size_growth: usize) {
let map_size = index.map_size().unwrap_or(DEFAULT_MAP_SIZE) + map_size_growth;
let closing_event = index.prepare_for_closing();
let generation = self.next_generation();
self.unavailable
.insert(uuid, Some(ClosingIndex { uuid, closing_event, map_size, generation }));
}
/// Attempts to delete and index.
///
/// `end_deletion` must be called just after.
///
/// # Status table
///
/// | Previous Status | New Status | Return value |
/// |-----------------|------------|--------------|
/// | Missing | BeingDeleted | Ok(None) |
/// | BeingDeleted | BeingDeleted | Err(None) |
/// | Closing | Closing | Err(Some(reopen)) |
/// | Available | BeingDeleted | Ok(Some(env_closing_event)) |
pub fn start_deletion(
&mut self,
uuid: &Uuid,
) -> std::result::Result<Option<EnvClosingEvent>, Option<ClosingIndex>> {
if let Some(index) = self.available.remove(uuid) {
return Ok(Some(index.prepare_for_closing()));
}
match self.unavailable.remove(uuid) {
Some(Some(reopen)) => Err(Some(reopen)),
Some(None) => Err(None),
None => Ok(None),
}
}
/// Marks that an index deletion finished.
///
/// Must be used after calling `start_deletion`.
///
/// # Status table
///
/// | Previous Status | New Status |
/// |-----------------|------------|
/// | Missing | Missing |
/// | BeingDeleted | Missing |
/// | Closing | panics |
/// | Available | panics |
pub fn end_deletion(&mut self, uuid: &Uuid) {
assert!(
self.available.get(uuid).is_none(),
"Attempt to finish deletion of an index that was not being deleted"
);
// Do not panic if the index was Missing or BeingDeleted
assert!(
!matches!(self.unavailable.remove(uuid), Some(Some(_))),
"Attempt to finish deletion of an index that was being closed"
);
}
}
/// Create or open an index in the specified path.
/// The path *must* exist or an error will be thrown.
fn create_or_open_index(
path: &Path,
date: Option<(OffsetDateTime, OffsetDateTime)>,
map_size: usize,
) -> Result<Index> {
let mut options = EnvOpenOptions::new();
options.map_size(clamp_to_page_size(map_size));
options.max_readers(1024);
if let Some((created, updated)) = date {
Ok(Index::new_with_creation_dates(options, path, created, updated)?)
} else {
Ok(Index::new(options, path)?)
}
}
/// Putting the tests of the LRU down there so we have access to the cache's private members
#[cfg(test)]
mod tests {
use meilisearch_types::heed::Env;
use meilisearch_types::Index;
use uuid::Uuid;
use super::super::IndexMapper;
use crate::tests::IndexSchedulerHandle;
use crate::utils::clamp_to_page_size;
use crate::IndexScheduler;
impl IndexMapper {
fn test() -> (Self, Env, IndexSchedulerHandle) {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
(index_scheduler.index_mapper, index_scheduler.env, handle)
}
}
fn check_first_unavailable(mapper: &IndexMapper, expected_uuid: Uuid, is_closing: bool) {
let index_map = mapper.index_map.read().unwrap();
let (uuid, state) = index_map.unavailable.first_key_value().unwrap();
assert_eq!(uuid, &expected_uuid);
assert_eq!(state.is_some(), is_closing);
}
#[test]
fn evict_indexes() {
let (mapper, env, _handle) = IndexMapper::test();
let mut uuids = vec![];
// LRU cap + 1
for i in 0..(5 + 1) {
let index_name = format!("index-{i}");
let wtxn = env.write_txn().unwrap();
mapper.create_index(wtxn, &index_name, None).unwrap();
let txn = env.read_txn().unwrap();
uuids.push(mapper.index_mapping.get(&txn, &index_name).unwrap().unwrap());
}
// index-0 was evicted
check_first_unavailable(&mapper, uuids[0], true);
// get back the evicted index
let wtxn = env.write_txn().unwrap();
mapper.create_index(wtxn, "index-0", None).unwrap();
// Least recently used is now index-1
check_first_unavailable(&mapper, uuids[1], true);
}
#[test]
fn resize_index() {
let (mapper, env, _handle) = IndexMapper::test();
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
assert_index_size(index, mapper.index_base_map_size);
mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap();
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount);
mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap();
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount * 2);
}
fn assert_index_size(index: Index, expected: usize) {
let expected = clamp_to_page_size(expected);
let index_map_size = index.map_size().unwrap();
assert_eq!(index_map_size, expected);
}
}
}
/// Whether the index is available for use or is forbidden to be inserted back in the index map
#[allow(clippy::large_enum_variant)]
#[derive(Clone)]
pub enum IndexStatus {
/// Not currently in the index map.
Missing,
/// Do not insert it back in the index map as it is currently being deleted.
BeingDeleted,
/// Temporarily do not insert the index in the index map as it is currently being resized/evicted from the map.
Closing(index_map::ClosingIndex),
/// You can use the index without worrying about anything.
Available(Index),
}
@ -53,36 +450,21 @@ impl IndexMapper {
pub fn new(
env: &Env,
base_path: PathBuf,
index_size: usize,
index_base_map_size: usize,
index_growth_amount: usize,
index_count: usize,
indexer_config: IndexerConfig,
) -> Result<Self> {
Ok(Self {
index_map: Arc::default(),
index_map: Arc::new(RwLock::new(IndexMap::new(index_count))),
index_mapping: env.create_database(Some(INDEX_MAPPING))?,
base_path,
index_size,
index_base_map_size,
index_growth_amount,
indexer_config: Arc::new(indexer_config),
})
}
/// Create or open an index in the specified path.
/// The path *must* exists or an error will be thrown.
fn create_or_open_index(
&self,
path: &Path,
date: Option<(OffsetDateTime, OffsetDateTime)>,
) -> Result<Index> {
let mut options = EnvOpenOptions::new();
options.map_size(clamp_to_page_size(self.index_size));
options.max_readers(1024);
if let Some((created, updated)) = date {
Ok(Index::new_with_creation_dates(options, path, created, updated)?)
} else {
Ok(Index::new(options, path)?)
}
}
/// Get or create the index.
pub fn create_index(
&self,
@ -102,15 +484,17 @@ impl IndexMapper {
let index_path = self.base_path.join(uuid.to_string());
fs::create_dir_all(&index_path)?;
let index = self.create_or_open_index(&index_path, date)?;
// Error if the UUIDv4 somehow already exists in the map, since it should be fresh.
// This is very unlikely to happen in practice.
// TODO: it would be better to lazily create the index. But we need an Index::open function for milli.
let index = self.index_map.write().unwrap().create(
&uuid,
&index_path,
date,
self.index_base_map_size,
)?;
wtxn.commit()?;
// TODO: it would be better to lazily create the index. But we need an Index::open function for milli.
if let Some(BeingDeleted) =
self.index_map.write().unwrap().insert(uuid, Available(index.clone()))
{
panic!("Uuid v4 conflict.");
}
Ok(index)
}
@ -130,14 +514,31 @@ impl IndexMapper {
assert!(self.index_mapping.delete(&mut wtxn, name)?);
wtxn.commit()?;
// We remove the index from the in-memory index map.
let mut lock = self.index_map.write().unwrap();
let closing_event = match lock.insert(uuid, BeingDeleted) {
Some(Available(index)) => Some(index.prepare_for_closing()),
_ => None,
};
drop(lock);
let mut tries = 0;
// We remove the index from the in-memory index map.
let closing_event = loop {
let mut lock = self.index_map.write().unwrap();
match lock.start_deletion(&uuid) {
Ok(env_closing) => break env_closing,
Err(Some(reopen)) => {
// drop the lock here so that we don't synchronously wait for the index to close.
drop(lock);
tries += 1;
if tries >= 100 {
panic!("Too many attempts to close index {name} prior to deletion.")
}
let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) {
reopen
} else {
continue;
};
reopen.close(&mut self.index_map.write().unwrap());
continue;
}
Err(None) => return Ok(()),
}
};
let index_map = self.index_map.clone();
let index_path = self.base_path.join(uuid.to_string());
@ -146,7 +547,7 @@ impl IndexMapper {
.name(String::from("index_deleter"))
.spawn(move || {
// We first wait to be sure that the previously opened index is effectively closed.
// This can take a lot of time, this is why we do that in a seperate thread.
// This can take a lot of time, this is why we do that in a separate thread.
if let Some(closing_event) = closing_event {
closing_event.wait();
}
@ -160,7 +561,7 @@ impl IndexMapper {
}
// Finally we remove the entry from the index map.
assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted)));
index_map.write().unwrap().end_deletion(&uuid);
})
.unwrap();
@ -171,6 +572,26 @@ impl IndexMapper {
Ok(self.index_mapping.get(rtxn, name)?.is_some())
}
/// Resizes the maximum size of the specified index to the double of its current maximum size.
///
/// This operation involves closing the underlying environment and so can take a long time to complete.
///
/// # Panics
///
/// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the
/// in memory hash map.
pub fn resize_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> {
let uuid = self
.index_mapping
.get(rtxn, name)?
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
// We remove the index from the in-memory index map.
self.index_map.write().unwrap().close_for_resize(&uuid, self.index_growth_amount);
Ok(())
}
/// Return an index, may open it if it wasn't already opened.
pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result<Index> {
let uuid = self
@ -179,31 +600,54 @@ impl IndexMapper {
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
// we clone here to drop the lock before entering the match
let index = self.index_map.read().unwrap().get(&uuid).cloned();
let index = match index {
Some(Available(index)) => index,
Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())),
// since we're lazy, it's possible that the index has not been opened yet.
None => {
let mut index_map = self.index_map.write().unwrap();
// between the read lock and the write lock it's not impossible
// that someone already opened the index (eg if two search happens
// at the same time), thus before opening it we check a second time
// if it's not already there.
// Since there is a good chance it's not already there we can use
// the entry method.
match index_map.entry(uuid) {
Entry::Vacant(entry) => {
let index_path = self.base_path.join(uuid.to_string());
let mut tries = 0;
let index = loop {
tries += 1;
if tries > 100 {
panic!("Too many spurious wake ups while the index is being resized");
}
let index = self.index_map.read().unwrap().get(&uuid);
let index = self.create_or_open_index(&index_path, None)?;
entry.insert(Available(index.clone()));
index
}
Entry::Occupied(entry) => match entry.get() {
Available(index) => index.clone(),
match index {
Available(index) => break index,
Closing(reopen) => {
// Avoiding deadlocks: no lock taken while doing this operation.
let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) {
reopen
} else {
continue;
};
let index_path = self.base_path.join(uuid.to_string());
// take the lock to reopen the environment.
reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)?;
continue;
}
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
// since we're lazy, it's possible that the index has not been opened yet.
Missing => {
let mut index_map = self.index_map.write().unwrap();
// between the read lock and the write lock it's not impossible
// that someone already opened the index (eg if two searches happen
// at the same time), thus before opening it we check a second time
// if it's not already there.
match index_map.get(&uuid) {
Missing => {
let index_path = self.base_path.join(uuid.to_string());
break index_map.create(
&uuid,
&index_path,
None,
self.index_base_map_size,
)?;
}
Available(index) => break index,
Closing(_) => {
// the reopening will be handled in the next loop operation
continue;
}
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
},
}
}
}
};

View File

@ -24,6 +24,7 @@ pub mod error;
mod index_mapper;
#[cfg(test)]
mod insta_snapshot;
mod lru;
mod utils;
mod uuid_codec;
@ -229,8 +230,12 @@ pub struct IndexSchedulerOptions {
pub dumps_path: PathBuf,
/// The maximum size, in bytes, of the task index.
pub task_db_size: usize,
/// The maximum size, in bytes, of each meilisearch index.
pub index_size: usize,
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
pub index_base_map_size: usize,
/// The size, in bytes, by which the map size of an index is increased when it resized due to being full.
pub index_growth_amount: usize,
/// The number of indexes that can be concurrently opened in memory.
pub index_count: usize,
/// Configuration used during indexing for each meilisearch index.
pub indexer_config: IndexerConfig,
/// Set to `true` iff the index scheduler is allowed to automatically
@ -382,7 +387,9 @@ impl IndexScheduler {
index_mapper: IndexMapper::new(
&env,
options.indexes_path,
options.index_size,
options.index_base_map_size,
options.index_growth_amount,
options.index_count,
options.indexer_config,
)?,
env,
@ -422,12 +429,12 @@ impl IndexScheduler {
#[cfg(test)]
run.breakpoint(Breakpoint::Init);
loop {
run.wake_up.wait();
run.wake_up.wait();
loop {
match run.tick() {
Ok(0) => (),
Ok(_) => run.wake_up.signal(),
Ok(TickOutcome::TickAgain(_)) => (),
Ok(TickOutcome::WaitForSignal) => run.wake_up.wait(),
Err(e) => {
log::error!("{}", e);
// Wait one second when an irrecoverable error occurs.
@ -440,7 +447,6 @@ impl IndexScheduler {
) {
std::thread::sleep(Duration::from_secs(1));
}
run.wake_up.signal();
}
}
}
@ -764,8 +770,8 @@ impl IndexScheduler {
Ok(task)
}
/// Register a new task comming from a dump in the scheduler.
/// By takinig a mutable ref we're pretty sure no one will ever import a dump while actix is running.
/// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_task(
&mut self,
task: TaskDump,
@ -926,7 +932,7 @@ impl IndexScheduler {
/// 5. Reset the in-memory list of processed tasks.
///
/// Returns the number of processed tasks.
fn tick(&self) -> Result<usize> {
fn tick(&self) -> Result<TickOutcome> {
#[cfg(test)]
{
*self.run_loop_iteration.write().unwrap() += 1;
@ -937,8 +943,9 @@ impl IndexScheduler {
let batch =
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
Some(batch) => batch,
None => return Ok(0),
None => return Ok(TickOutcome::WaitForSignal),
};
let index_uid = batch.index_uid().map(ToOwned::to_owned);
drop(rtxn);
// 1. store the starting date with the bitmap of processing tasks.
@ -1009,7 +1016,23 @@ impl IndexScheduler {
// the `started_at` date times and `processings` of the current processing tasks.
// This date time is used by the task cancelation to store the right `started_at`
// date in the task on disk.
return Ok(0);
return Ok(TickOutcome::TickAgain(0));
}
// If an index said it was full, we need to:
// 1. identify which index is full
// 2. close the associated environment
// 3. resize it
// 4. re-schedule tasks
Err(Error::Milli(milli::Error::UserError(
milli::UserError::MaxDatabaseSizeReached,
))) if index_uid.is_some() => {
// fixme: add index_uid to match to avoid the unwrap
let index_uid = index_uid.unwrap();
// fixme: handle error more gracefully? not sure when this could happen
self.index_mapper.resize_index(&wtxn, &index_uid)?;
wtxn.abort().map_err(Error::HeedTransaction)?;
return Ok(TickOutcome::TickAgain(0));
}
// In case of a failure we must get back and patch all the tasks with the error.
Err(err) => {
@ -1049,7 +1072,7 @@ impl IndexScheduler {
#[cfg(test)]
self.breakpoint(Breakpoint::AfterProcessing);
Ok(processed_tasks)
Ok(TickOutcome::TickAgain(processed_tasks))
}
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
@ -1084,6 +1107,16 @@ impl IndexScheduler {
}
}
/// The outcome of calling the [`IndexScheduler::tick`] function.
pub enum TickOutcome {
/// The scheduler should immediately attempt another `tick`.
///
/// The `usize` field contains the number of processed tasks.
TickAgain(usize),
/// The scheduler should wait for an external signal before attempting another `tick`.
WaitForSignal,
}
#[cfg(test)]
mod tests {
use std::io::{BufWriter, Seek, Write};
@ -1136,7 +1169,9 @@ mod tests {
snapshots_path: tempdir.path().join("snapshots"),
dumps_path: tempdir.path().join("dumps"),
task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
index_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
index_growth_amount: 1000 * 1000, // 1 MB
index_count: 5,
indexer_config: IndexerConfig::default(),
autobatching_enabled,
};

202
index-scheduler/src/lru.rs Normal file
View File

@ -0,0 +1,202 @@
//! Thread-safe `Vec`-backend LRU cache using [`std::sync::atomic::AtomicU64`] for synchronization.
use std::sync::atomic::{AtomicU64, Ordering};
/// Thread-safe `Vec`-backend LRU cache
#[derive(Debug)]
pub struct Lru<T> {
data: Vec<(AtomicU64, T)>,
generation: AtomicU64,
cap: usize,
}
impl<T> Lru<T> {
/// Creates a new LRU cache with the specified capacity.
///
/// The capacity is allocated up-front, and will never change through a [`Self::put`] operation.
///
/// # Panics
///
/// - If the capacity is 0.
/// - If the capacity exceeds `isize::MAX` bytes.
pub fn new(cap: usize) -> Self {
assert_ne!(cap, 0, "The capacity of a cache cannot be 0");
Self {
// Note: since the element of the vector contains an AtomicU64, it is definitely not zero-sized so cap will never be usize::MAX.
data: Vec::with_capacity(cap),
generation: AtomicU64::new(0),
cap,
}
}
/// The capacity of this LRU cache, that is the maximum number of elements it can hold before evicting elements from the cache.
///
/// The cache will contain at most this number of elements at any given time.
pub fn capacity(&self) -> usize {
self.cap
}
fn next_generation(&self) -> u64 {
// Acquire so this "happens-before" any potential store to a data cell (with Release ordering)
let generation = self.generation.fetch_add(1, Ordering::Acquire);
generation + 1
}
fn next_generation_mut(&mut self) -> u64 {
let generation = self.generation.get_mut();
*generation += 1;
*generation
}
/// Add a value in the cache, evicting an older value if necessary.
///
/// If a value was evicted from the cache, it is returned.
///
/// # Complexity
///
/// - If the cache is full, then linear in the capacity.
/// - Otherwise constant.
pub fn put(&mut self, value: T) -> Option<T> {
// no need for a memory fence: we assume that whichever mechanism provides us synchronization
// (very probably, a RwLock) takes care of fencing for us.
let next_generation = self.next_generation_mut();
let evicted = if self.is_full() { self.pop() } else { None };
self.data.push((AtomicU64::new(next_generation), value));
evicted
}
/// Evict the oldest value from the cache.
///
/// If the cache is empty, `None` will be returned.
///
/// # Complexity
///
/// - Linear in the capacity of the cache.
pub fn pop(&mut self) -> Option<T> {
// Iterator::min_by_key provides shared references to its elements,
// but we need (and can afford!) an exclusive one, so let's make an explicit loop
let mut min_generation_index = None;
for (index, (generation, _)) in self.data.iter_mut().enumerate() {
let generation = *generation.get_mut();
if let Some((_, min_generation)) = min_generation_index {
if min_generation > generation {
min_generation_index = Some((index, generation));
}
} else {
min_generation_index = Some((index, generation))
}
}
min_generation_index.map(|(min_index, _)| self.data.swap_remove(min_index).1)
}
/// The current number of elements in the cache.
///
/// This value is guaranteed to be less than or equal to [`Self::capacity`].
pub fn len(&self) -> usize {
self.data.len()
}
/// Returns `true` if putting any additional element in the cache would cause the eviction of an element.
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
}
pub struct LruMap<K, V>(Lru<(K, V)>);
impl<K, V> LruMap<K, V>
where
K: Eq,
{
/// Creates a new LRU cache map with the specified capacity.
///
/// The capacity is allocated up-front, and will never change through a [`Self::insert`] operation.
///
/// # Panics
///
/// - If the capacity is 0.
/// - If the capacity exceeds `isize::MAX` bytes.
pub fn new(cap: usize) -> Self {
Self(Lru::new(cap))
}
/// Gets a value in the cache map by its key.
///
/// If no value matches, `None` will be returned.
///
/// # Complexity
///
/// - Linear in the capacity of the cache.
pub fn get(&self, key: &K) -> Option<&V> {
for (generation, (candidate, value)) in self.0.data.iter() {
if key == candidate {
generation.store(self.0.next_generation(), Ordering::Release);
return Some(value);
}
}
None
}
/// Gets a value in the cache map by its key.
///
/// If no value matches, `None` will be returned.
///
/// # Complexity
///
/// - Linear in the capacity of the cache.
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
let next_generation = self.0.next_generation_mut();
for (generation, (candidate, value)) in self.0.data.iter_mut() {
if key == candidate {
*generation.get_mut() = next_generation;
return Some(value);
}
}
None
}
/// Inserts a value in the cache map by its key, replacing any existing value and returning any evicted value.
///
/// # Complexity
///
/// - Linear in the capacity of the cache.
pub fn insert(&mut self, key: K, mut value: V) -> InsertionOutcome<K, V> {
match self.get_mut(&key) {
Some(old_value) => {
std::mem::swap(old_value, &mut value);
InsertionOutcome::Replaced(value)
}
None => match self.0.put((key, value)) {
Some((key, value)) => InsertionOutcome::Evicted(key, value),
None => InsertionOutcome::InsertedNew,
},
}
}
/// Removes an element from the cache map by its key, returning its value.
///
/// Returns `None` if there was no element with this key in the cache.
///
/// # Complexity
///
/// - Linear in the capacity of the cache.
pub fn remove(&mut self, key: &K) -> Option<V> {
for (index, (_, (candidate, _))) in self.0.data.iter_mut().enumerate() {
if key == candidate {
return Some(self.0.data.swap_remove(index).1 .1);
}
}
None
}
}
/// The result of an insertion in a LRU map.
pub enum InsertionOutcome<K, V> {
/// The key was not in the cache, the key-value pair has been inserted.
InsertedNew,
/// The key was not in the cache and an old key-value pair was evicted from the cache to make room for its insertions.
Evicted(K, V),
/// The key was already in the cache map, its value has been updated.
Replaced(V),
}

View File

@ -1,7 +1,13 @@
use vergen::{vergen, Config};
use vergen::{vergen, Config, SemverKind};
fn main() {
if let Err(e) = vergen(Config::default()) {
let mut config = Config::default();
// allow using non-annotated tags
*config.git_mut().semver_kind_mut() = SemverKind::Lightweight;
// add -dirty suffix when we're not right on the tag
*config.git_mut().semver_dirty_mut() = Some("-dirty");
if let Err(e) = vergen(config) {
println!("cargo:warning=vergen: {}", e);
}

View File

@ -401,12 +401,19 @@ impl Segment {
if let Ok(stats) =
create_all_stats(index_scheduler.into(), auth_controller, &SearchRules::default())
{
// Replace the version number with the prototype name if any.
let version = if let Some(prototype) = crate::prototype_name() {
prototype
} else {
env!("CARGO_PKG_VERSION")
};
let _ = self
.batcher
.push(Identify {
context: Some(json!({
"app": {
"version": env!("CARGO_PKG_VERSION").to_string(),
"version": version.to_string(),
},
})),
user: self.user.clone(),

View File

@ -45,6 +45,33 @@ use option::ScheduleSnapshot;
use crate::error::MeilisearchHttpError;
/// Default number of simultaneously opened indexes,
/// lower for Windows that dedicates a smaller virtual address space to processes.
///
/// The value was chosen this way:
///
/// - Windows provides a small virtual address space of about 10TiB to processes.
/// - The chosen value allows for indexes to reach a safe size of 1TiB.
/// - This can accomodate an unlimited number of indexes as long as they stay below 1TiB size.
#[cfg(windows)]
const DEFAULT_INDEX_COUNT: usize = 10;
/// Default number of simultaneously opened indexes.
///
/// The higher, the better for avoiding reopening indexes.
///
/// The value was chosen this way:
///
/// - Opening an index consumes a file descriptor.
/// - The default on many unices is about 256 file descriptors for a process.
/// - 100 is a little bit less than half this value.
///
/// In the future, this value could be computed from the dynamic number of allowed file descriptors for the current process.
///
/// On Unices, this value is largely irrelevant to virtual address space, because due to index resizing the indexes should take virtual memory in the same ballpark
/// as their disk size and it is unlikely for a user to have a sum of index weighing 128TB on a single Meilisearch node.
#[cfg(not(windows))]
const DEFAULT_INDEX_COUNT: usize = 100;
/// Check if a db is empty. It does not provide any information on the
/// validity of the data in it.
/// We consider a database as non empty when it's a non empty directory.
@ -205,9 +232,11 @@ fn open_or_create_database_unchecked(
snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_size: opt.max_index_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,
})?)
};
@ -427,3 +456,35 @@ pub fn configure_metrics_route(config: &mut web::ServiceConfig, enable_metrics_r
);
}
}
/// Parses the output of
/// [`VERGEN_GIT_SEMVER_LIGHTWEIGHT`](https://docs.rs/vergen/latest/vergen/struct.Git.html#instructions)
/// as a prototype name.
///
/// Returns `Some(prototype_name)` if the following conditions are met on this value:
///
/// 1. starts with `prototype-`,
/// 2. does not end with `dirty-`,
/// 3. ends with `-<some_number>`,
/// 4. does not end with `<some_number>-<some_number>`.
///
/// Otherwise, returns `None`.
pub fn prototype_name() -> Option<&'static str> {
let prototype: &'static str = option_env!("VERGEN_GIT_SEMVER_LIGHTWEIGHT")?;
if prototype.ends_with("-dirty") {
return None;
}
if !prototype.starts_with("prototype-") {
return None;
}
let mut rsplit_prototype = prototype.rsplit('-');
// last component MUST be a number
rsplit_prototype.next()?.parse::<u64>().ok()?;
// before than last component SHALL NOT be a number
rsplit_prototype.next()?.parse::<u64>().err()?;
Some(prototype)
}

View File

@ -8,7 +8,7 @@ use actix_web::web::Data;
use actix_web::HttpServer;
use index_scheduler::IndexScheduler;
use meilisearch::analytics::Analytics;
use meilisearch::{analytics, create_app, setup_meilisearch, Opt};
use meilisearch::{analytics, create_app, prototype_name, setup_meilisearch, Opt};
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
@ -137,6 +137,9 @@ pub fn print_launch_resume(
eprintln!("Commit SHA:\t\t{:?}", commit_sha.to_string());
eprintln!("Commit date:\t\t{:?}", commit_date.to_string());
eprintln!("Package version:\t{:?}", env!("CARGO_PKG_VERSION").to_string());
if let Some(prototype) = prototype_name() {
eprintln!("Prototype:\t\t{:?}", prototype);
}
#[cfg(all(not(debug_assertions), feature = "analytics"))]
{

View File

@ -65,11 +65,10 @@ const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
const DEFAULT_LOG_EVERY_N: usize = 100_000;
// Each environment (index and task-db) is taking space in the virtual address space.
//
// The size of the virtual address space is limited by the OS. About 100TB for Linux and about 10TB for Windows.
// This means that the number of indexes is limited to about 200 for Linux and about 20 for Windows.
pub const INDEX_SIZE: u64 = 536_870_912_000; // 500 GiB
pub const TASK_DB_SIZE: u64 = 10_737_418_240; // 10 GiB
// When creating a new environment, it starts its life with 10GiB of virtual address space.
// It is then later resized if needs be.
pub const INDEX_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB
pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]