From 3b773b3416d945c5f1676d6addfc2672cdbc1193 Mon Sep 17 00:00:00 2001 From: nnethercott Date: Mon, 28 Apr 2025 11:45:21 +0200 Subject: [PATCH] Revert thread_pool type back to Option in config --- .../src/scheduler/process_index_operation.rs | 9 +++------ crates/meilisearch/src/lib.rs | 20 +++++++------------ crates/meilisearch/src/option.rs | 4 ++-- crates/milli/src/index.rs | 9 +++------ .../milli/src/update/index_documents/mod.rs | 4 +--- crates/milli/src/update/indexer_config.rs | 20 +++++++++++++++---- 6 files changed, 32 insertions(+), 34 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 68a1de25a..9b12d61cf 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -115,8 +115,7 @@ impl IndexScheduler { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -269,8 +268,7 @@ impl IndexScheduler { if task.error.is_none() { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -433,8 +431,7 @@ impl IndexScheduler { if !tasks.iter().all(|res| res.error.is_some()) { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 4f31606f6..df45dc63b 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; +use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig}; use meilisearch_types::milli::ThreadPoolNoAbortBuilder; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; @@ -504,10 +504,10 @@ fn import_dump( let network = dump_reader.network()?.cloned().unwrap_or_default(); index_scheduler.put_network(network)?; - let indexer_config = index_scheduler.indexer_config(); + let mut indexer_config = IndexerConfig::clone_no_threadpool(index_scheduler.indexer_config()); - // Use all cpus to index a dump - let pool_before = { + // 3.1 Use all cpus to index the import dump + indexer_config.thread_pool = { let all_cpus = num_cpus::get(); let temp_pool = ThreadPoolNoAbortBuilder::new() @@ -515,7 +515,7 @@ fn import_dump( .num_threads(all_cpus) .build()?; - indexer_config.thread_pool.write().unwrap().replace(temp_pool) + Some(temp_pool) }; // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might @@ -533,7 +533,7 @@ fn import_dump( let mut wtxn = index.write_txn()?; - let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config); + let mut builder = milli::update::Settings::new(&mut wtxn, &index, &indexer_config); // 4.1 Import the primary key if there is one. if let Some(ref primary_key) = metadata.primary_key { builder.set_primary_key(primary_key.to_string()); @@ -568,7 +568,7 @@ fn import_dump( let builder = milli::update::IndexDocuments::new( &mut wtxn, &index, - indexer_config, + &indexer_config, IndexDocumentsConfig { update_method: IndexDocumentsMethod::ReplaceDocuments, ..Default::default() @@ -589,12 +589,6 @@ fn import_dump( index_scheduler.refresh_index_stats(&uid)?; } - // Restore original thread pool after dump - { - let mut guard = indexer_config.thread_pool.write().unwrap(); - *guard = pool_before; - } - // 5. Import the queue let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; // 5.1. Import the batches diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index 41dd05651..c71bf16c0 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -6,7 +6,7 @@ use std::num::{NonZeroUsize, ParseIntError}; use std::ops::Deref; use std::path::PathBuf; use std::str::FromStr; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::{env, fmt, fs}; use byte_unit::{Byte, ParseError, UnitType}; @@ -765,7 +765,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { Ok(Self { log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), - thread_pool: RwLock::new(Some(thread_pool)), + thread_pool: Some(thread_pool), max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default() diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 779185ca2..1f006b316 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1936,8 +1936,7 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -2031,8 +2030,7 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -2111,8 +2109,7 @@ pub(crate) mod tests { let local_pool; let indexer_config = &index.indexer_config; - let pool_guard = indexer_config.thread_pool.read().unwrap(); - let pool = match &*pool_guard { + let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 1f962ae9f..4acb78b9a 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -228,10 +228,8 @@ where let possible_embedding_mistakes = crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); - let pool_guard = self.indexer_config.thread_pool.read().unwrap(); - let backup_pool; - let pool = match &*pool_guard { + let pool = match self.indexer_config.thread_pool { Some(ref pool) => pool, None => { // We initialize a backup pool with the default diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index b3559190f..a534a21e9 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,5 +1,3 @@ -use std::sync::RwLock; - use grenad::CompressionType; use super::GrenadParameters; @@ -13,7 +11,7 @@ pub struct IndexerConfig { pub max_memory: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, - pub thread_pool: RwLock>, + pub thread_pool: Option, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, } @@ -27,6 +25,20 @@ impl IndexerConfig { max_nb_chunks: self.max_nb_chunks, } } + + pub fn clone_no_threadpool(other: &IndexerConfig) -> Self { + Self { + log_every_n: other.log_every_n.clone(), + max_nb_chunks: other.max_nb_chunks.clone(), + documents_chunk_size: other.documents_chunk_size.clone(), + max_memory: other.max_memory.clone(), + chunk_compression_type: other.chunk_compression_type.clone(), + chunk_compression_level: other.chunk_compression_level.clone(), + max_positions_per_attributes: other.max_positions_per_attributes.clone(), + skip_index_budget: other.skip_index_budget.clone(), + thread_pool: None, + } + } } impl Default for IndexerConfig { @@ -38,7 +50,7 @@ impl Default for IndexerConfig { max_memory: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, - thread_pool: RwLock::new(None), + thread_pool: None, max_positions_per_attributes: None, skip_index_budget: false, }