diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 9b12d61cf..093c6209d 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -5,7 +5,7 @@ use meilisearch_types::milli::documents::PrimaryKey; use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::DocumentAdditionResult; -use meilisearch_types::milli::{self, ChannelCongestion, Filter, ThreadPoolNoAbortBuilder}; +use meilisearch_types::milli::{self, ChannelCongestion, Filter}; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use meilisearch_types::Index; @@ -113,18 +113,8 @@ impl IndexScheduler { } } - let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) - .build() - .unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges); let (document_changes, operation_stats, primary_key) = indexer @@ -266,18 +256,8 @@ impl IndexScheduler { let mut congestion = None; if task.error.is_none() { - let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) - .build() - .unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let candidates_count = candidates.len(); progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges); @@ -429,18 +409,8 @@ impl IndexScheduler { let mut congestion = None; if !tasks.iter().all(|res| res.error.is_some()) { - let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) - .build() - .unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; progress.update_progress(DocumentDeletionProgress::DeleteDocuments); let mut indexer = indexer::DocumentDeletion::new(); diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 40d318140..d83786394 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -37,7 +37,9 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; +use meilisearch_types::milli::update::{ + default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, +}; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; use meilisearch_types::versioning::{ @@ -500,7 +502,19 @@ fn import_dump( let network = dump_reader.network()?.cloned().unwrap_or_default(); index_scheduler.put_network(network)?; - let indexer_config = index_scheduler.indexer_config(); + // 3.1 Use all cpus to process dump if `max_indexing_threads` not configured + let backup_config; + let base_config = index_scheduler.indexer_config(); + + let indexer_config = if base_config.max_threads.is_none() { + let (thread_pool, _) = default_thread_pool_and_threads(); + + let _config = IndexerConfig { thread_pool, ..*base_config }; + backup_config = _config; + &backup_config + } else { + base_config + }; // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might // try to process tasks while we're trying to import the indexes. diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index c71bf16c0..d98b9aa8b 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -746,10 +746,12 @@ impl IndexerOpts { max_indexing_memory.to_string(), ); } - export_to_env_if_not_present( - MEILI_MAX_INDEXING_THREADS, - max_indexing_threads.0.to_string(), - ); + if let Some(max_indexing_threads) = max_indexing_threads.0 { + export_to_env_if_not_present( + MEILI_MAX_INDEXING_THREADS, + max_indexing_threads.to_string(), + ); + } } } @@ -757,15 +759,15 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { type Error = anyhow::Error; fn try_from(other: &IndexerOpts) -> Result { - let thread_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|index| format!("indexing-thread:{index}")) - .num_threads(*other.max_indexing_threads) + let thread_pool = ThreadPoolNoAbortBuilder::new_for_indexing() + .num_threads(other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2)) .build()?; Ok(Self { + thread_pool, log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), - thread_pool: Some(thread_pool), + max_threads: *other.max_indexing_threads, max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default() @@ -828,31 +830,31 @@ fn total_memory_bytes() -> Option { } } -#[derive(Debug, Clone, Copy, Deserialize, Serialize)] -pub struct MaxThreads(usize); +#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize)] +pub struct MaxThreads(Option); impl FromStr for MaxThreads { type Err = ParseIntError; - fn from_str(s: &str) -> Result { - usize::from_str(s).map(Self) - } -} - -impl Default for MaxThreads { - fn default() -> Self { - MaxThreads(num_cpus::get() / 2) + fn from_str(s: &str) -> Result { + if s.is_empty() || s == "unlimited" { + return Ok(MaxThreads::default()); + } + usize::from_str(s).map(Some).map(MaxThreads) } } impl fmt::Display for MaxThreads { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) + match self.0 { + Some(threads) => write!(f, "{}", threads), + None => write!(f, "unlimited"), + } } } impl Deref for MaxThreads { - type Target = usize; + type Target = Option; fn deref(&self) -> &Self::Target { &self.0 diff --git a/crates/milli/src/test_index.rs b/crates/milli/src/test_index.rs index 7759b3e18..dfd570b96 100644 --- a/crates/milli/src/test_index.rs +++ b/crates/milli/src/test_index.rs @@ -19,10 +19,7 @@ use crate::update::{ }; use crate::vector::settings::{EmbedderSource, EmbeddingSettings}; use crate::vector::EmbeddingConfigs; -use crate::{ - db_snap, obkv_to_json, Filter, FilterableAttributesRule, Index, Search, SearchResult, - ThreadPoolNoAbortBuilder, -}; +use crate::{db_snap, obkv_to_json, Filter, FilterableAttributesRule, Index, Search, SearchResult}; pub(crate) struct TempIndex { pub inner: Index, @@ -62,15 +59,8 @@ impl TempIndex { wtxn: &mut RwTxn<'t>, documents: Mmap, ) -> Result<(), crate::error::Error> { - let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; @@ -153,15 +143,8 @@ impl TempIndex { wtxn: &mut RwTxn<'t>, external_document_ids: Vec, ) -> Result<(), crate::error::Error> { - let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; @@ -231,15 +214,8 @@ fn aborting_indexation() { let mut wtxn = index.inner.write_txn().unwrap(); let should_abort = AtomicBool::new(false); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); diff --git a/crates/milli/src/thread_pool_no_abort.rs b/crates/milli/src/thread_pool_no_abort.rs index b57050a63..0c2fbb30d 100644 --- a/crates/milli/src/thread_pool_no_abort.rs +++ b/crates/milli/src/thread_pool_no_abort.rs @@ -54,6 +54,10 @@ impl ThreadPoolNoAbortBuilder { ThreadPoolNoAbortBuilder::default() } + pub fn new_for_indexing() -> ThreadPoolNoAbortBuilder { + ThreadPoolNoAbortBuilder::default().thread_name(|index| format!("indexing-thread:{index}")) + } + pub fn thread_name(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static, diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 4acb78b9a..379b991e0 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -33,7 +33,6 @@ use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError}; use crate::index::{PrefixSearch, PrefixSettings}; use crate::progress::Progress; -use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, @@ -228,24 +227,7 @@ where let possible_embedding_mistakes = crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); - let backup_pool; - let pool = match self.indexer_config.thread_pool { - Some(ref pool) => pool, - None => { - // We initialize a backup pool with the default - // settings if none have already been set. - #[allow(unused_mut)] - let mut pool_builder = ThreadPoolNoAbortBuilder::new(); - - #[cfg(test)] - { - pool_builder = pool_builder.num_threads(1); - } - - backup_pool = pool_builder.build()?; - &backup_pool - } - }; + let pool = &self.indexer_config.thread_pool; // create LMDB writer channel let (lmdb_writer_sx, lmdb_writer_rx): ( diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index 6fb33ad78..eb7fbd4d5 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,7 +1,7 @@ use grenad::CompressionType; use super::GrenadParameters; -use crate::thread_pool_no_abort::ThreadPoolNoAbort; +use crate::{thread_pool_no_abort::ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; #[derive(Debug)] pub struct IndexerConfig { @@ -9,9 +9,10 @@ pub struct IndexerConfig { pub max_nb_chunks: Option, pub documents_chunk_size: Option, pub max_memory: Option, + pub max_threads: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, - pub thread_pool: Option, + pub thread_pool: ThreadPoolNoAbort, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, } @@ -27,16 +28,39 @@ impl IndexerConfig { } } +/// By default use only 1 thread for indexing in tests +#[cfg(test)] +pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { + let pool = ThreadPoolNoAbortBuilder::new_for_indexing() + .num_threads(1) + .build() + .expect("failed to build default rayon thread pool"); + + (pool, Some(1)) +} + +#[cfg(not(test))] +pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { + let pool = ThreadPoolNoAbortBuilder::new_for_indexing() + .build() + .expect("failed to build default rayon thread pool"); + + (pool, None) +} + impl Default for IndexerConfig { fn default() -> Self { + let (thread_pool, max_threads) = default_thread_pool_and_threads(); + Self { + max_threads, + thread_pool, log_every_n: None, max_nb_chunks: None, documents_chunk_size: None, max_memory: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, - thread_pool: None, max_positions_per_attributes: None, skip_index_budget: false, } diff --git a/crates/milli/src/update/mod.rs b/crates/milli/src/update/mod.rs index 9a783ffd2..ebb313dcf 100644 --- a/crates/milli/src/update/mod.rs +++ b/crates/milli/src/update/mod.rs @@ -4,7 +4,7 @@ pub use self::concurrent_available_ids::ConcurrentAvailableIds; pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::index_documents::*; -pub use self::indexer_config::IndexerConfig; +pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig}; pub use self::new::ChannelCongestion; pub use self::settings::{validate_embedding_settings, Setting, Settings}; pub use self::update_step::UpdateIndexingStep;