From 648b2876f65a5f11d12f3e69781c710630bdd46d Mon Sep 17 00:00:00 2001 From: nnethercott Date: Sun, 27 Apr 2025 00:51:26 +0200 Subject: [PATCH] Create temp threadpool with all CPUs in dump --- .../src/scheduler/process_index_operation.rs | 9 ++++++--- crates/meilisearch/src/lib.rs | 19 +++++++++++++++++++ crates/meilisearch/src/option.rs | 4 ++-- crates/milli/src/index.rs | 9 ++++++--- .../milli/src/update/index_documents/mod.rs | 4 +++- crates/milli/src/update/indexer_config.rs | 6 ++++-- 6 files changed, 40 insertions(+), 11 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 9b12d61cf..68a1de25a 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -115,7 +115,8 @@ impl IndexScheduler { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -268,7 +269,8 @@ impl IndexScheduler { if task.error.is_none() { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() @@ -431,7 +433,8 @@ impl IndexScheduler { if !tasks.iter().all(|res| res.error.is_some()) { let local_pool; let indexer_config = self.index_mapper.indexer_config(); - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new() diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 761726d83..4f31606f6 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -38,6 +38,7 @@ use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; +use meilisearch_types::milli::ThreadPoolNoAbortBuilder; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; use meilisearch_types::versioning::{ @@ -505,6 +506,18 @@ fn import_dump( let indexer_config = index_scheduler.indexer_config(); + // Use all cpus to index a dump + let pool_before = { + let all_cpus = num_cpus::get(); + + let temp_pool = ThreadPoolNoAbortBuilder::new() + .thread_name(|index| format!("indexing-thread:{index}")) + .num_threads(all_cpus) + .build()?; + + indexer_config.thread_pool.write().unwrap().replace(temp_pool) + }; + // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might // try to process tasks while we're trying to import the indexes. @@ -576,6 +589,12 @@ fn import_dump( index_scheduler.refresh_index_stats(&uid)?; } + // Restore original thread pool after dump + { + let mut guard = indexer_config.thread_pool.write().unwrap(); + *guard = pool_before; + } + // 5. Import the queue let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; // 5.1. Import the batches diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index c71bf16c0..41dd05651 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -6,7 +6,7 @@ use std::num::{NonZeroUsize, ParseIntError}; use std::ops::Deref; use std::path::PathBuf; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::{env, fmt, fs}; use byte_unit::{Byte, ParseError, UnitType}; @@ -765,7 +765,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { Ok(Self { log_every_n: Some(DEFAULT_LOG_EVERY_N), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), - thread_pool: Some(thread_pool), + thread_pool: RwLock::new(Some(thread_pool)), max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, ..Default::default() diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 1f006b316..779185ca2 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1936,7 +1936,8 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -2030,7 +2031,8 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let local_pool; let indexer_config = &self.indexer_config; - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); @@ -2109,7 +2111,8 @@ pub(crate) mod tests { let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { + let pool_guard = indexer_config.thread_pool.read().unwrap(); + let pool = match &*pool_guard { Some(pool) => pool, None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 4acb78b9a..1f962ae9f 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -228,8 +228,10 @@ where let possible_embedding_mistakes = crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); + let pool_guard = self.indexer_config.thread_pool.read().unwrap(); + let backup_pool; - let pool = match self.indexer_config.thread_pool { + let pool = match &*pool_guard { Some(ref pool) => pool, None => { // We initialize a backup pool with the default diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index 6fb33ad78..b3559190f 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,3 +1,5 @@ +use std::sync::RwLock; + use grenad::CompressionType; use super::GrenadParameters; @@ -11,7 +13,7 @@ pub struct IndexerConfig { pub max_memory: Option, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, - pub thread_pool: Option, + pub thread_pool: RwLock>, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, } @@ -36,7 +38,7 @@ impl Default for IndexerConfig { max_memory: None, chunk_compression_type: CompressionType::None, chunk_compression_level: None, - thread_pool: None, + thread_pool: RwLock::new(None), max_positions_per_attributes: None, skip_index_budget: false, }