From 648b2876f65a5f11d12f3e69781c710630bdd46d Mon Sep 17 00:00:00 2001 From: nnethercott Date: Sun, 27 Apr 2025 00:51:26 +0200 Subject: [PATCH 01/11] Create temp threadpool with all CPUs in dump --- .../src/scheduler/process_index_operation.rs | 9 ++++++--- crates/meilisearch/src/lib.rs | 19 +++++++++++++++++++ crates/meilisearch/src/option.rs | 4 ++-- crates/milli/src/index.rs | 9 ++++++--- .../milli/src/update/index_documents/mod.rs | 4 +++- crates/milli/src/update/indexer_config.rs | 6 ++++-- 6 files changed, 40 insertions(+), 11 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 9b12d61cf..68a1de25a 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -115,7 +115,8 @@ impl IndexScheduler { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -268,7 +269,8 @@ impl IndexScheduler { if task.error.is_none() { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -431,7 +433,8 @@ impl IndexScheduler { if !tasks.iter().all(|res| res.error.is_some()) { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 761726d83..4f31606f6 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -38,6 +38,7 @@ use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; +use meilisearch_types::milli::ThreadPoolNoAbortBuilder; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; use meilisearch_types::versioning::{ @@ -505,6 +506,18 @@ fn import_dump( let indexer_config = index_scheduler.indexer_config(); + // Use all cpus to index a dump + let pool_before = { + let all_cpus = num_cpus::get(); + + let temp_pool = ThreadPoolNoAbortBuilder::new() + .thread_name(|index| format!("indexing-thread:{index}")) + .num_threads(all_cpus) + .build()?; + + indexer_config.thread_pool.write().unwrap().replace(temp_pool) + }; + // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might // try to process tasks while we're trying to import the indexes. @@ -576,6 +589,12 @@ fn import_dump( index_scheduler.refresh_index_stats(&uid)?; } + // Restore original thread pool after dump + { + let mut guard = indexer_config.thread_pool.write().unwrap(); + *guard = pool_before; + } + // 5. Import the queue let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; // 5.1. Import the batches diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index c71bf16c0..41dd05651 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -6,7 +6,7 @@ use std::num::{NonZeroUsize, ParseIntError}; use std::ops::Deref; use std::path::PathBuf; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::{env, fmt, fs}; use byte_unit::{Byte, ParseError, UnitType}; @@ -765,7 +765,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { Ok(Self { log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), - thread_pool: Some(thread_pool), + thread_pool: RwLock::new(Some(thread_pool)), max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default() diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 1f006b316..779185ca2 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1936,7 +1936,8 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -2030,7 +2031,8 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -2109,7 +2111,8 @@ pub(crate) mod tests { let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 4acb78b9a..1f962ae9f 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -228,8 +228,10 @@ where let possible_embedding_mistakes = crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); + let pool_guard = self.indexer_config.thread_pool.read().unwrap(); + let backup_pool; - let pool = match self.indexer_config.thread_pool { + let pool = match &*pool_guard { Some(ref pool) => pool, None => { // We initialize a backup pool with the default diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index 6fb33ad78..b3559190f 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,3 +1,5 @@ +use std::sync::RwLock; + use grenad::CompressionType; use super::GrenadParameters; @@ -11,7 +13,7 @@ pub struct IndexerConfig { pub max_memory: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, - pub thread_pool: Option, + pub thread_pool: RwLock>, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, } @@ -36,7 +38,7 @@ impl Default for IndexerConfig { max_memory: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, - thread_pool: None, + thread_pool: RwLock::new(None), max_positions_per_attributes: None, skip_index_budget: false, } From 3b773b3416d945c5f1676d6addfc2672cdbc1193 Mon Sep 17 00:00:00 2001 From: nnethercott Date: Mon, 28 Apr 2025 11:45:21 +0200 Subject: [PATCH 02/11] Revert thread_pool type back to Option in config --- .../src/scheduler/process_index_operation.rs | 9 +++------ crates/meilisearch/src/lib.rs | 20 +++++++------------ crates/meilisearch/src/option.rs | 4 ++-- crates/milli/src/index.rs | 9 +++------ .../milli/src/update/index_documents/mod.rs | 4 +--- crates/milli/src/update/indexer_config.rs | 20 +++++++++++++++---- 6 files changed, 32 insertions(+), 34 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 68a1de25a..9b12d61cf 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -115,8 +115,7 @@ impl IndexScheduler { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -269,8 +268,7 @@ impl IndexScheduler { if task.error.is_none() { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -433,8 +431,7 @@ impl IndexScheduler { if !tasks.iter().all(|res| res.error.is_some()) { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 4f31606f6..df45dc63b 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; +use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig}; use meilisearch_types::milli::ThreadPoolNoAbortBuilder; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; @@ -504,10 +504,10 @@ fn import_dump( let network = dump_reader.network()?.cloned().unwrap_or_default(); index_scheduler.put_network(network)?; - let indexer_config = index_scheduler.indexer_config(); + let mut indexer_config = IndexerConfig::clone_no_threadpool(index_scheduler.indexer_config()); - // Use all cpus to index a dump - let pool_before = { + // 3.1 Use all cpus to index the import dump + indexer_config.thread_pool = { let all_cpus = num_cpus::get(); let temp_pool = ThreadPoolNoAbortBuilder::new() @@ -515,7 +515,7 @@ fn import_dump( .num_threads(all_cpus) .build()?; - indexer_config.thread_pool.write().unwrap().replace(temp_pool) + Some(temp_pool) }; // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might @@ -533,7 +533,7 @@ fn import_dump( let mut wtxn = index.write_txn()?; - let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config); + let mut builder = milli::update::Settings::new(&mut wtxn, &index, &indexer_config); // 4.1 Import the primary key if there is one. if let Some(ref primary_key) = metadata.primary_key { builder.set_primary_key(primary_key.to_string()); @@ -568,7 +568,7 @@ fn import_dump( let builder = milli::update::IndexDocuments::new( &mut wtxn, &index, - indexer_config, + &indexer_config, IndexDocumentsConfig { update_method: IndexDocumentsMethod::ReplaceDocuments, ..Default::default() @@ -589,12 +589,6 @@ fn import_dump( index_scheduler.refresh_index_stats(&uid)?; } - // Restore original thread pool after dump - { - let mut guard = indexer_config.thread_pool.write().unwrap(); - *guard = pool_before; - } - // 5. Import the queue let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; // 5.1. Import the batches diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index 41dd05651..c71bf16c0 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -6,7 +6,7 @@ use std::num::{NonZeroUsize, ParseIntError}; use std::ops::Deref; use std::path::PathBuf; use std::str::FromStr; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::{env, fmt, fs}; use byte_unit::{Byte, ParseError, UnitType}; @@ -765,7 +765,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { Ok(Self { log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), - thread_pool: RwLock::new(Some(thread_pool)), + thread_pool: Some(thread_pool), max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default() diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 779185ca2..1f006b316 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1936,8 +1936,7 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -2031,8 +2030,7 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -2111,8 +2109,7 @@ pub(crate) mod tests { let local_pool; let indexer_config = &index.indexer_config; - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 1f962ae9f..4acb78b9a 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -228,10 +228,8 @@ where let possible_embedding_mistakes = crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); - let pool_guard = self.indexer_config.thread_pool.read().unwrap(); - let backup_pool; - let pool = match &*pool_guard { + let pool = match self.indexer_config.thread_pool { Some(ref pool) => pool, None => { // We initialize a backup pool with the default diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index b3559190f..a534a21e9 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,5 +1,3 @@ -use std::sync::RwLock; - use grenad::CompressionType; use super::GrenadParameters; @@ -13,7 +11,7 @@ pub struct IndexerConfig { pub max_memory: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, - pub thread_pool: RwLock>, + pub thread_pool: Option, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, } @@ -27,6 +25,20 @@ impl IndexerConfig { max_nb_chunks: self.max_nb_chunks, } } + + pub fn clone_no_threadpool(other: &IndexerConfig) -> Self { + Self { + log_every_n: other.log_every_n.clone(), + max_nb_chunks: other.max_nb_chunks.clone(), + documents_chunk_size: other.documents_chunk_size.clone(), + max_memory: other.max_memory.clone(), + chunk_compression_type: other.chunk_compression_type.clone(), + chunk_compression_level: other.chunk_compression_level.clone(), + max_positions_per_attributes: other.max_positions_per_attributes.clone(), + skip_index_budget: other.skip_index_budget.clone(), + thread_pool: None, + } + } } impl Default for IndexerConfig { @@ -38,7 +50,7 @@ impl Default for IndexerConfig { max_memory: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, - thread_pool: RwLock::new(None), + thread_pool: None, max_positions_per_attributes: None, skip_index_budget: false, } From 89aff2081c81a2ed4e767228d4e19f073e5a698a Mon Sep 17 00:00:00 2001 From: nnethercott Date: Wed, 30 Apr 2025 14:17:32 +0200 Subject: [PATCH 03/11] Fix clippy warnings --- crates/milli/src/update/indexer_config.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index a534a21e9..433273fac 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -28,14 +28,14 @@ impl IndexerConfig { pub fn clone_no_threadpool(other: &IndexerConfig) -> Self { Self { - log_every_n: other.log_every_n.clone(), - max_nb_chunks: other.max_nb_chunks.clone(), - documents_chunk_size: other.documents_chunk_size.clone(), - max_memory: other.max_memory.clone(), - chunk_compression_type: other.chunk_compression_type.clone(), - chunk_compression_level: other.chunk_compression_level.clone(), - max_positions_per_attributes: other.max_positions_per_attributes.clone(), - skip_index_budget: other.skip_index_budget.clone(), + log_every_n: other.log_every_n, + max_nb_chunks: other.max_nb_chunks, + documents_chunk_size: other.documents_chunk_size, + max_memory: other.max_memory, + chunk_compression_type: other.chunk_compression_type, + chunk_compression_level: other.chunk_compression_level, + max_positions_per_attributes: other.max_positions_per_attributes, + skip_index_budget: other.skip_index_budget, thread_pool: None, } } From 2ac826edca956a71c7d4976a06474386a6baf395 Mon Sep 17 00:00:00 2001 From: Nate Nethercott <53127799+nnethercott@users.noreply.github.com> Date: Thu, 1 May 2025 14:48:59 +0200 Subject: [PATCH 04/11] Apply suggested changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Clément Renault Update crates/meilisearch/src/lib.rs Co-authored-by: Clément Renault --- crates/meilisearch/src/lib.rs | 4 ++-- crates/milli/src/update/indexer_config.rs | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index df45dc63b..3b3c94230 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig}; +use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; use meilisearch_types::milli::ThreadPoolNoAbortBuilder; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; @@ -504,7 +504,7 @@ fn import_dump( let network = dump_reader.network()?.cloned().unwrap_or_default(); index_scheduler.put_network(network)?; - let mut indexer_config = IndexerConfig::clone_no_threadpool(index_scheduler.indexer_config()); + let mut indexer_config = index_scheduler.indexer_config().clone_no_threadpool(); // 3.1 Use all cpus to index the import dump indexer_config.thread_pool = { diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index 433273fac..f9503c48e 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -26,16 +26,16 @@ impl IndexerConfig { } } - pub fn clone_no_threadpool(other: &IndexerConfig) -> Self { + pub fn clone_no_threadpool(&self) -> Self { Self { - log_every_n: other.log_every_n, - max_nb_chunks: other.max_nb_chunks, - documents_chunk_size: other.documents_chunk_size, - max_memory: other.max_memory, - chunk_compression_type: other.chunk_compression_type, - chunk_compression_level: other.chunk_compression_level, - max_positions_per_attributes: other.max_positions_per_attributes, - skip_index_budget: other.skip_index_budget, + log_every_n: self.log_every_n, + max_nb_chunks: self.max_nb_chunks, + documents_chunk_size: self.documents_chunk_size, + max_memory: self.max_memory, + chunk_compression_type: self.chunk_compression_type, + chunk_compression_level: self.chunk_compression_level, + max_positions_per_attributes: self.max_positions_per_attributes, + skip_index_budget: self.skip_index_budget, thread_pool: None, } } From 47a7ed93d334fd423619514c5e91d041995b8ea6 Mon Sep 17 00:00:00 2001 From: nnethercott Date: Tue, 6 May 2025 09:10:09 +0200 Subject: [PATCH 05/11] feat: Make MaxThreads None by default --- crates/meilisearch/src/lib.rs | 32 ++++++++++-------- crates/meilisearch/src/option.rs | 40 +++++++++++++---------- crates/milli/src/update/indexer_config.rs | 3 ++ 3 files changed, 44 insertions(+), 31 deletions(-) diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 3b3c94230..7310260f6 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -504,18 +504,22 @@ fn import_dump( let network = dump_reader.network()?.cloned().unwrap_or_default(); index_scheduler.put_network(network)?; - let mut indexer_config = index_scheduler.indexer_config().clone_no_threadpool(); - - // 3.1 Use all cpus to index the import dump - indexer_config.thread_pool = { - let all_cpus = num_cpus::get(); - - let temp_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|index| format!("indexing-thread:{index}")) - .num_threads(all_cpus) - .build()?; - - Some(temp_pool) + // 3.1 Use all cpus to process dump if max_indexing_threads not configured + let backup_config; + let indexer_config = if index_scheduler.indexer_config().max_threads.is_none() { + let mut _config = index_scheduler.indexer_config().clone_no_threadpool(); + _config.thread_pool = { + Some( + ThreadPoolNoAbortBuilder::new() + .thread_name(|index| format!("indexing-thread:{index}")) + .num_threads(num_cpus::get()) + .build()?, + ) + }; + backup_config = _config; + &backup_config + } else { + index_scheduler.indexer_config() }; // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might @@ -533,7 +537,7 @@ fn import_dump( let mut wtxn = index.write_txn()?; - let mut builder = milli::update::Settings::new(&mut wtxn, &index, &indexer_config); + let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config); // 4.1 Import the primary key if there is one. if let Some(ref primary_key) = metadata.primary_key { builder.set_primary_key(primary_key.to_string()); @@ -568,7 +572,7 @@ fn import_dump( let builder = milli::update::IndexDocuments::new( &mut wtxn, &index, - &indexer_config, + indexer_config, IndexDocumentsConfig { update_method: IndexDocumentsMethod::ReplaceDocuments, ..Default::default() diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index c71bf16c0..259fd501f 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -746,10 +746,12 @@ impl IndexerOpts { max_indexing_memory.to_string(), ); } - export_to_env_if_not_present( - MEILI_MAX_INDEXING_THREADS, - max_indexing_threads.0.to_string(), - ); + if let Some(max_indexing_threads) = max_indexing_threads.0 { + export_to_env_if_not_present( + MEILI_MAX_INDEXING_THREADS, + max_indexing_threads.to_string(), + ); + } } } @@ -757,14 +759,18 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { type Error = anyhow::Error; fn try_from(other: &IndexerOpts) -> Result { + // use 1/2 cpu threads if no value specified + let max_indexing_threads = other.max_indexing_threads.unwrap_or(num_cpus::get() / 2); + let thread_pool = ThreadPoolNoAbortBuilder::new() .thread_name(|index| format!("indexing-thread:{index}")) - .num_threads(*other.max_indexing_threads) + .num_threads(max_indexing_threads) .build()?; Ok(Self { log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), + max_threads: *other.max_indexing_threads, thread_pool: Some(thread_pool), max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, @@ -828,31 +834,31 @@ fn total_memory_bytes() -> Option { } } -#[derive(Debug, Clone, Copy, Deserialize, Serialize)] -pub struct MaxThreads(usize); +#[derive(Default, Debug, Clone, Copy, Deserialize, Serialize)] +pub struct MaxThreads(Option); impl FromStr for MaxThreads { type Err = ParseIntError; - fn from_str(s: &str) -> Result { - usize::from_str(s).map(Self) - } -} - -impl Default for MaxThreads { - fn default() -> Self { - MaxThreads(num_cpus::get() / 2) + fn from_str(s: &str) -> Result { + if s.is_empty() { + return Ok(MaxThreads::default()); + } + usize::from_str(s).map(Some).map(MaxThreads) } } impl fmt::Display for MaxThreads { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) + match self.0 { + Some(threads) => write!(f, "{}", threads), + None => Ok(()), + } } } impl Deref for MaxThreads { - type Target = usize; + type Target = Option; fn deref(&self) -> &Self::Target { &self.0 diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index f9503c48e..e19649a0d 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -9,6 +9,7 @@ pub struct IndexerConfig { pub max_nb_chunks: Option, pub documents_chunk_size: Option, pub max_memory: Option, + pub max_threads: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, pub thread_pool: Option, @@ -32,6 +33,7 @@ impl IndexerConfig { max_nb_chunks: self.max_nb_chunks, documents_chunk_size: self.documents_chunk_size, max_memory: self.max_memory, + max_threads: self.max_threads, chunk_compression_type: self.chunk_compression_type, chunk_compression_level: self.chunk_compression_level, max_positions_per_attributes: self.max_positions_per_attributes, @@ -48,6 +50,7 @@ impl Default for IndexerConfig { max_nb_chunks: None, documents_chunk_size: None, max_memory: None, + max_threads: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, thread_pool: None, From 53f32a7dd78f945d7c126b3696ace419da1ba7af Mon Sep 17 00:00:00 2001 From: nnethercott Date: Wed, 7 May 2025 17:00:08 +0200 Subject: [PATCH 06/11] refactor: change thread_pool from Option to ThreadPoolNoAbort --- .../src/scheduler/process_index_operation.rs | 38 ++----------------- crates/meilisearch/src/lib.rs | 26 ++++++------- crates/meilisearch/src/option.rs | 2 +- crates/milli/src/index.rs | 28 ++------------ .../milli/src/update/index_documents/mod.rs | 20 +--------- crates/milli/src/update/indexer_config.rs | 34 ++++++++--------- 6 files changed, 38 insertions(+), 110 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 9b12d61cf..093c6209d 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -5,7 +5,7 @@ use meilisearch_types::milli::documents::PrimaryKey; use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::DocumentAdditionResult; -use meilisearch_types::milli::{self, ChannelCongestion, Filter, ThreadPoolNoAbortBuilder}; +use meilisearch_types::milli::{self, ChannelCongestion, Filter}; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use meilisearch_types::Index; @@ -113,18 +113,8 @@ impl IndexScheduler { } } - let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) - .build() - .unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges); let (document_changes, operation_stats, primary_key) = indexer @@ -266,18 +256,8 @@ impl IndexScheduler { let mut congestion = None; if task.error.is_none() { - let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) - .build() - .unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let candidates_count = candidates.len(); progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges); @@ -429,18 +409,8 @@ impl IndexScheduler { let mut congestion = None; if !tasks.iter().all(|res| res.error.is_some()) { - let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|i| format!("indexing-thread-{i}")) - .build() - .unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; progress.update_progress(DocumentDeletionProgress::DeleteDocuments); let mut indexer = indexer::DocumentDeletion::new(); diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 7310260f6..9364bc83d 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; +use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig}; use meilisearch_types::milli::ThreadPoolNoAbortBuilder; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; @@ -504,22 +504,22 @@ fn import_dump( let network = dump_reader.network()?.cloned().unwrap_or_default(); index_scheduler.put_network(network)?; - // 3.1 Use all cpus to process dump if max_indexing_threads not configured + // 3.1 Use all cpus to process dump if a) `max_indexing_threads` not configured and + // b) we're not executing from within a test let backup_config; - let indexer_config = if index_scheduler.indexer_config().max_threads.is_none() { - let mut _config = index_scheduler.indexer_config().clone_no_threadpool(); - _config.thread_pool = { - Some( - ThreadPoolNoAbortBuilder::new() - .thread_name(|index| format!("indexing-thread:{index}")) - .num_threads(num_cpus::get()) - .build()?, - ) - }; + let base_config = index_scheduler.indexer_config(); + + let indexer_config = if base_config.max_threads.is_none() && !cfg!(test) { + let thread_pool = ThreadPoolNoAbortBuilder::new() + .thread_name(|index| format!("indexing-thread:{index}")) + .num_threads(num_cpus::get()) + .build()?; + + let _config = IndexerConfig { thread_pool, ..*base_config }; backup_config = _config; &backup_config } else { - index_scheduler.indexer_config() + base_config }; // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index 259fd501f..8dcbdcfca 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -768,10 +768,10 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { .build()?; Ok(Self { + thread_pool, log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), max_threads: *other.max_indexing_threads, - thread_pool: Some(thread_pool), max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default() diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 1f006b316..948d0fb0d 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1893,7 +1893,6 @@ pub(crate) mod tests { use crate::vector::EmbeddingConfigs; use crate::{ db_snap, obkv_to_json, Filter, FilterableAttributesRule, Index, Search, SearchResult, - ThreadPoolNoAbortBuilder, }; pub(crate) struct TempIndex { @@ -1934,15 +1933,8 @@ pub(crate) mod tests { wtxn: &mut RwTxn<'t>, documents: Mmap, ) -> Result<(), crate::error::Error> { - let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; @@ -2028,15 +2020,8 @@ pub(crate) mod tests { wtxn: &mut RwTxn<'t>, external_document_ids: Vec, ) -> Result<(), crate::error::Error> { - let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; @@ -2107,15 +2092,8 @@ pub(crate) mod tests { let mut wtxn = index.inner.write_txn().unwrap(); let should_abort = AtomicBool::new(false); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 4acb78b9a..379b991e0 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -33,7 +33,6 @@ use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError}; use crate::index::{PrefixSearch, PrefixSettings}; use crate::progress::Progress; -use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, @@ -228,24 +227,7 @@ where let possible_embedding_mistakes = crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); - let backup_pool; - let pool = match self.indexer_config.thread_pool { - Some(ref pool) => pool, - None => { - // We initialize a backup pool with the default - // settings if none have already been set. - #[allow(unused_mut)] - let mut pool_builder = ThreadPoolNoAbortBuilder::new(); - - #[cfg(test)] - { - pool_builder = pool_builder.num_threads(1); - } - - backup_pool = pool_builder.build()?; - &backup_pool - } - }; + let pool = &self.indexer_config.thread_pool; // create LMDB writer channel let (lmdb_writer_sx, lmdb_writer_rx): ( diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index e19649a0d..c6ae2b859 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,7 +1,7 @@ use grenad::CompressionType; use super::GrenadParameters; -use crate::thread_pool_no_abort::ThreadPoolNoAbort; +use crate::{thread_pool_no_abort::ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; #[derive(Debug)] pub struct IndexerConfig { @@ -12,7 +12,7 @@ pub struct IndexerConfig { pub max_threads: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, - pub thread_pool: Option, + pub thread_pool: ThreadPoolNoAbort, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, } @@ -26,25 +26,23 @@ impl IndexerConfig { max_nb_chunks: self.max_nb_chunks, } } - - pub fn clone_no_threadpool(&self) -> Self { - Self { - log_every_n: self.log_every_n, - max_nb_chunks: self.max_nb_chunks, - documents_chunk_size: self.documents_chunk_size, - max_memory: self.max_memory, - max_threads: self.max_threads, - chunk_compression_type: self.chunk_compression_type, - chunk_compression_level: self.chunk_compression_level, - max_positions_per_attributes: self.max_positions_per_attributes, - skip_index_budget: self.skip_index_budget, - thread_pool: None, - } - } } impl Default for IndexerConfig { fn default() -> Self { + #[allow(unused_mut)] + let mut pool_builder = ThreadPoolNoAbortBuilder::new(); + + #[cfg(test)] + { + pool_builder = pool_builder.num_threads(1); + } + + let thread_pool = pool_builder + .thread_name(|index| format!("indexing-thread:{index}")) + .build() + .expect("failed to build default rayon thread pool"); + Self { log_every_n: None, max_nb_chunks: None, @@ -53,9 +51,9 @@ impl Default for IndexerConfig { max_threads: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, - thread_pool: None, max_positions_per_attributes: None, skip_index_budget: false, + thread_pool, } } } From 15cdc6924b6b733d924d14d87ad9ce165cdd85dc Mon Sep 17 00:00:00 2001 From: nnethercott Date: Tue, 13 May 2025 09:12:34 +0200 Subject: [PATCH 07/11] refactor: remove runtime cfg!(test) check Won't work in integration tests and consequently all threads would be used. To remedy this we make explicit `max_threads=Some(1)` in the IndexerConfig::default --- crates/meilisearch/src/lib.rs | 5 ++-- crates/milli/src/test_index.rs | 32 +++-------------------- crates/milli/src/update/indexer_config.rs | 8 ++++-- 3 files changed, 12 insertions(+), 33 deletions(-) diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 57ef6d6f2..0a5c2f1f5 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -501,12 +501,11 @@ fn import_dump( let network = dump_reader.network()?.cloned().unwrap_or_default(); index_scheduler.put_network(network)?; - // 3.1 Use all cpus to process dump if a) `max_indexing_threads` not configured and - // b) we're not executing from within a test + // 3.1 Use all cpus to process dump if `max_indexing_threads` not configured let backup_config; let base_config = index_scheduler.indexer_config(); - let indexer_config = if base_config.max_threads.is_none() && !cfg!(test) { + let indexer_config = if base_config.max_threads.is_none() { let thread_pool = ThreadPoolNoAbortBuilder::new() .thread_name(|index| format!("indexing-thread:{index}")) .num_threads(num_cpus::get()) diff --git a/crates/milli/src/test_index.rs b/crates/milli/src/test_index.rs index 7759b3e18..dfd570b96 100644 --- a/crates/milli/src/test_index.rs +++ b/crates/milli/src/test_index.rs @@ -19,10 +19,7 @@ use crate::update::{ }; use crate::vector::settings::{EmbedderSource, EmbeddingSettings}; use crate::vector::EmbeddingConfigs; -use crate::{ - db_snap, obkv_to_json, Filter, FilterableAttributesRule, Index, Search, SearchResult, - ThreadPoolNoAbortBuilder, -}; +use crate::{db_snap, obkv_to_json, Filter, FilterableAttributesRule, Index, Search, SearchResult}; pub(crate) struct TempIndex { pub inner: Index, @@ -62,15 +59,8 @@ impl TempIndex { wtxn: &mut RwTxn<'t>, documents: Mmap, ) -> Result<(), crate::error::Error> { - let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; @@ -153,15 +143,8 @@ impl TempIndex { wtxn: &mut RwTxn<'t>, external_document_ids: Vec, ) -> Result<(), crate::error::Error> { - let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = self.inner.read_txn()?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; @@ -231,15 +214,8 @@ fn aborting_indexation() { let mut wtxn = index.inner.write_txn().unwrap(); let should_abort = AtomicBool::new(false); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let pool = &indexer_config.thread_pool; let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index c6ae2b859..33573aef6 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -33,9 +33,13 @@ impl Default for IndexerConfig { #[allow(unused_mut)] let mut pool_builder = ThreadPoolNoAbortBuilder::new(); + #[allow(unused_mut, unused_assignments)] + let mut max_threads = None; + #[cfg(test)] { pool_builder = pool_builder.num_threads(1); + max_threads = Some(1); } let thread_pool = pool_builder @@ -44,16 +48,16 @@ impl Default for IndexerConfig { .expect("failed to build default rayon thread pool"); Self { + max_threads, + thread_pool, log_every_n: None, max_nb_chunks: None, documents_chunk_size: None, max_memory: None, - max_threads: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, max_positions_per_attributes: None, skip_index_budget: false, - thread_pool, } } } From e96c1d4b0fa044da9606c5247e921e30cfe58abb Mon Sep 17 00:00:00 2001 From: nnethercott Date: Tue, 13 May 2025 12:16:34 +0200 Subject: [PATCH 08/11] style: change fmt from empty str to "unlimited" --- crates/meilisearch/src/option.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index 8dcbdcfca..e7d172b71 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -852,7 +852,7 @@ impl fmt::Display for MaxThreads { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.0 { Some(threads) => write!(f, "{}", threads), - None => Ok(()), + None => write!(f, "unlimited"), } } } From 806e983aa54ef6d303dde8ab0e4b1efa46cbcb46 Mon Sep 17 00:00:00 2001 From: Nate Nethercott <53127799+nnethercott@users.noreply.github.com> Date: Tue, 13 May 2025 14:14:48 +0200 Subject: [PATCH 09/11] fix: lazy computation in thread default Co-authored-by: Martin Grigorov --- crates/meilisearch/src/option.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index e7d172b71..acb4bc05e 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -760,7 +760,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { fn try_from(other: &IndexerOpts) -> Result { // use 1/2 cpu threads if no value specified - let max_indexing_threads = other.max_indexing_threads.unwrap_or(num_cpus::get() / 2); + let max_indexing_threads = other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2); let thread_pool = ThreadPoolNoAbortBuilder::new() .thread_name(|index| format!("indexing-thread:{index}")) From 865f24cfefbdfd2d0c6f6be2006266977f9d4cee Mon Sep 17 00:00:00 2001 From: nnethercott Date: Wed, 14 May 2025 23:45:24 +0200 Subject: [PATCH 10/11] refactor: helper methods for pool and max threads --- crates/meilisearch/src/lib.rs | 6 ++-- crates/meilisearch/src/option.rs | 10 ++---- crates/milli/src/thread_pool_no_abort.rs | 4 +++ crates/milli/src/update/indexer_config.rs | 37 +++++++++++++---------- 4 files changed, 30 insertions(+), 27 deletions(-) diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 0a5c2f1f5..441da0d7f 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -506,10 +506,8 @@ fn import_dump( let base_config = index_scheduler.indexer_config(); let indexer_config = if base_config.max_threads.is_none() { - let thread_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|index| format!("indexing-thread:{index}")) - .num_threads(num_cpus::get()) - .build()?; + let thread_pool = + ThreadPoolNoAbortBuilder::new_for_indexing().num_threads(num_cpus::get()).build()?; let _config = IndexerConfig { thread_pool, ..*base_config }; backup_config = _config; diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index acb4bc05e..d98b9aa8b 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -759,12 +759,8 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { type Error = anyhow::Error; fn try_from(other: &IndexerOpts) -> Result { - // use 1/2 cpu threads if no value specified - let max_indexing_threads = other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2); - - let thread_pool = ThreadPoolNoAbortBuilder::new() - .thread_name(|index| format!("indexing-thread:{index}")) - .num_threads(max_indexing_threads) + let thread_pool = ThreadPoolNoAbortBuilder::new_for_indexing() + .num_threads(other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2)) .build()?; Ok(Self { @@ -841,7 +837,7 @@ impl FromStr for MaxThreads { type Err = ParseIntError; fn from_str(s: &str) -> Result { - if s.is_empty() { + if s.is_empty() || s == "unlimited" { return Ok(MaxThreads::default()); } usize::from_str(s).map(Some).map(MaxThreads) diff --git a/crates/milli/src/thread_pool_no_abort.rs b/crates/milli/src/thread_pool_no_abort.rs index b57050a63..0c2fbb30d 100644 --- a/crates/milli/src/thread_pool_no_abort.rs +++ b/crates/milli/src/thread_pool_no_abort.rs @@ -54,6 +54,10 @@ impl ThreadPoolNoAbortBuilder { ThreadPoolNoAbortBuilder::default() } + pub fn new_for_indexing() -> ThreadPoolNoAbortBuilder { + ThreadPoolNoAbortBuilder::default().thread_name(|index| format!("indexing-thread:{index}")) + } + pub fn thread_name(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static, diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index 33573aef6..edca71e14 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -28,24 +28,29 @@ impl IndexerConfig { } } +/// By default use only 1 thread for indexing in tests +#[cfg(test)] +fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { + let pool = ThreadPoolNoAbortBuilder::new_for_indexing() + .num_threads(1) + .build() + .expect("failed to build default rayon thread pool"); + + (pool, Some(1)) +} + +#[cfg(not(test))] +fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { + let pool = ThreadPoolNoAbortBuilder::new_for_indexing() + .build() + .expect("failed to build default rayon thread pool"); + + (pool, None) +} + impl Default for IndexerConfig { fn default() -> Self { - #[allow(unused_mut)] - let mut pool_builder = ThreadPoolNoAbortBuilder::new(); - - #[allow(unused_mut, unused_assignments)] - let mut max_threads = None; - - #[cfg(test)] - { - pool_builder = pool_builder.num_threads(1); - max_threads = Some(1); - } - - let thread_pool = pool_builder - .thread_name(|index| format!("indexing-thread:{index}")) - .build() - .expect("failed to build default rayon thread pool"); + let (thread_pool, max_threads) = default_thread_pool_and_threads(); Self { max_threads, From 79db2e67fb25af4b52a239a3ea21f150832ce949 Mon Sep 17 00:00:00 2001 From: Nate Nethercott <53127799+nnethercott@users.noreply.github.com> Date: Thu, 15 May 2025 11:04:38 +0200 Subject: [PATCH 11/11] refactor: prefer helper over explicit pool construction Co-authored-by: Many the fish --- crates/meilisearch/src/lib.rs | 8 ++++---- crates/milli/src/update/indexer_config.rs | 4 ++-- crates/milli/src/update/mod.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 441da0d7f..d83786394 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -37,8 +37,9 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig}; -use meilisearch_types::milli::ThreadPoolNoAbortBuilder; +use meilisearch_types::milli::update::{ + default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, +}; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; use meilisearch_types::versioning::{ @@ -506,8 +507,7 @@ fn import_dump( let base_config = index_scheduler.indexer_config(); let indexer_config = if base_config.max_threads.is_none() { - let thread_pool = - ThreadPoolNoAbortBuilder::new_for_indexing().num_threads(num_cpus::get()).build()?; + let (thread_pool, _) = default_thread_pool_and_threads(); let _config = IndexerConfig { thread_pool, ..*base_config }; backup_config = _config; diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index edca71e14..eb7fbd4d5 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -30,7 +30,7 @@ impl IndexerConfig { /// By default use only 1 thread for indexing in tests #[cfg(test)] -fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { +pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { let pool = ThreadPoolNoAbortBuilder::new_for_indexing() .num_threads(1) .build() @@ -40,7 +40,7 @@ fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { } #[cfg(not(test))] -fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { +pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { let pool = ThreadPoolNoAbortBuilder::new_for_indexing() .build() .expect("failed to build default rayon thread pool"); diff --git a/crates/milli/src/update/mod.rs b/crates/milli/src/update/mod.rs index 9a783ffd2..ebb313dcf 100644 --- a/crates/milli/src/update/mod.rs +++ b/crates/milli/src/update/mod.rs @@ -4,7 +4,7 @@ pub use self::concurrent_available_ids::ConcurrentAvailableIds; pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::index_documents::*; -pub use self::indexer_config::IndexerConfig; +pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig}; pub use self::new::ChannelCongestion; pub use self::settings::{validate_embedding_settings, Setting, Settings}; pub use self::update_step::UpdateIndexingStep;