diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 9b12d61cf..093c6209d 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -5,7 +5,7 @@ use meilisearch_types::milli::documents::PrimaryKey; use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::DocumentAdditionResult; -use meilisearch_types::milli::{self, ChannelCongestion, Filter, ThreadPoolNoAbortBuilder}; +use meilisearch_types::milli::{self, ChannelCongestion, Filter}; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use meilisearch_types::Index; @@ -113,18 +113,8 @@ impl IndexScheduler { } } - let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) - .build() - .unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges); let (document_changes, operation_stats, primary_key) = indexer @@ -266,18 +256,8 @@ impl IndexScheduler { let mut congestion = None; if task.error.is_none() { - let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) - .build() - .unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let candidates_count = candidates.len(); progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges); @@ -429,18 +409,8 @@ impl IndexScheduler { let mut congestion = None; if !tasks.iter().all(|res| res.error.is_some()) { - let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) - .build() - .unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; progress.update_progress(DocumentDeletionProgress::DeleteDocuments); let mut indexer = indexer::DocumentDeletion::new(); diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 7310260f6..9364bc83d 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; +use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig}; use meilisearch_types::milli::ThreadPoolNoAbortBuilder; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; @@ -504,22 +504,22 @@ fn import_dump( let network = dump_reader.network()?.cloned().unwrap_or_default(); index_scheduler.put_network(network)?; - // 3.1 Use all cpus to process dump if max_indexing_threads not configured + // 3.1 Use all cpus to process dump if a) `max_indexing_threads` not configured and + // b) we're not executing from within a test let backup_config; - let indexer_config = if index_scheduler.indexer_config().max_threads.is_none() { - let mut _config = index_scheduler.indexer_config().clone_no_threadpool(); - _config.thread_pool = { - Some( - ThreadPoolNoAbortBuilder::new() - .thread_name(|index| format!("indexing-thread:{index}")) - .num_threads(num_cpus::get()) - .build()?, - ) - }; + let base_config = index_scheduler.indexer_config(); + + let indexer_config = if base_config.max_threads.is_none() && !cfg!(test) { + let thread_pool = ThreadPoolNoAbortBuilder::new() + .thread_name(|index| format!("indexing-thread:{index}")) + .num_threads(num_cpus::get()) + .build()?; + + let _config = IndexerConfig { thread_pool, ..*base_config }; backup_config = _config; &backup_config } else { - index_scheduler.indexer_config() + base_config }; // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index 259fd501f..8dcbdcfca 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -768,10 +768,10 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { .build()?; Ok(Self { + thread_pool, log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), max_threads: *other.max_indexing_threads, - thread_pool: Some(thread_pool), max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default() diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 1f006b316..948d0fb0d 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1893,7 +1893,6 @@ pub(crate) mod tests { use crate::vector::EmbeddingConfigs; use crate::{ db_snap, obkv_to_json, Filter, FilterableAttributesRule, Index, Search, SearchResult, - ThreadPoolNoAbortBuilder, }; pub(crate) struct TempIndex { @@ -1934,15 +1933,8 @@ pub(crate) mod tests { wtxn: &mut RwTxn<'t>, documents: Mmap, ) -> Result<(), crate::error::Error> { - let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; @@ -2028,15 +2020,8 @@ pub(crate) mod tests { wtxn: &mut RwTxn<'t>, external_document_ids: Vec, ) -> Result<(), crate::error::Error> { - let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; @@ -2107,15 +2092,8 @@ pub(crate) mod tests { let mut wtxn = index.inner.write_txn().unwrap(); let should_abort = AtomicBool::new(false); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 4acb78b9a..379b991e0 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -33,7 +33,6 @@ use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError}; use crate::index::{PrefixSearch, PrefixSettings}; use crate::progress::Progress; -use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, @@ -228,24 +227,7 @@ where let possible_embedding_mistakes = crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); - let backup_pool; - let pool = match self.indexer_config.thread_pool { - Some(ref pool) => pool, - None => { - // We initialize a backup pool with the default - // settings if none have already been set. - #[allow(unused_mut)] - let mut pool_builder = ThreadPoolNoAbortBuilder::new(); - - #[cfg(test)] - { - pool_builder = pool_builder.num_threads(1); - } - - backup_pool = pool_builder.build()?; - &backup_pool - } - }; + let pool = &self.indexer_config.thread_pool; // create LMDB writer channel let (lmdb_writer_sx, lmdb_writer_rx): ( diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index e19649a0d..c6ae2b859 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,7 +1,7 @@ use grenad::CompressionType; use super::GrenadParameters; -use crate::thread_pool_no_abort::ThreadPoolNoAbort; +use crate::{thread_pool_no_abort::ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; #[derive(Debug)] pub struct IndexerConfig { @@ -12,7 +12,7 @@ pub struct IndexerConfig { pub max_threads: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, - pub thread_pool: Option, + pub thread_pool: ThreadPoolNoAbort, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, } @@ -26,25 +26,23 @@ impl IndexerConfig { max_nb_chunks: self.max_nb_chunks, } } - - pub fn clone_no_threadpool(&self) -> Self { - Self { - log_every_n: self.log_every_n, - max_nb_chunks: self.max_nb_chunks, - documents_chunk_size: self.documents_chunk_size, - max_memory: self.max_memory, - max_threads: self.max_threads, - chunk_compression_type: self.chunk_compression_type, - chunk_compression_level: self.chunk_compression_level, - max_positions_per_attributes: self.max_positions_per_attributes, - skip_index_budget: self.skip_index_budget, - thread_pool: None, - } - } } impl Default for IndexerConfig { fn default() -> Self { + #[allow(unused_mut)] + let mut pool_builder = ThreadPoolNoAbortBuilder::new(); + + #[cfg(test)] + { + pool_builder = pool_builder.num_threads(1); + } + + let thread_pool = pool_builder + .thread_name(|index| format!("indexing-thread:{index}")) + .build() + .expect("failed to build default rayon thread pool"); + Self { log_every_n: None, max_nb_chunks: None, @@ -53,9 +51,9 @@ impl Default for IndexerConfig { max_threads: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, - thread_pool: None, max_positions_per_attributes: None, skip_index_budget: false, + thread_pool, } } }