Merge pull request #5527 from nnethercott/all-cpus-in-import-dump

Use all CPUs during an import dump
This commit is contained in:
Many the fish 2025-06-02 15:24:59 +00:00 committed by GitHub
commit d5526cffff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 79 additions and 107 deletions

View File

@ -5,7 +5,7 @@ use meilisearch_types::milli::documents::PrimaryKey;
use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
use meilisearch_types::milli::update::DocumentAdditionResult; use meilisearch_types::milli::update::DocumentAdditionResult;
use meilisearch_types::milli::{self, ChannelCongestion, Filter, ThreadPoolNoAbortBuilder}; use meilisearch_types::milli::{self, ChannelCongestion, Filter};
use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use meilisearch_types::Index; use meilisearch_types::Index;
@ -113,18 +113,8 @@ impl IndexScheduler {
} }
} }
let local_pool;
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
let pool = match &indexer_config.thread_pool { let pool = &indexer_config.thread_pool;
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}"))
.build()
.unwrap();
&local_pool
}
};
progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges); progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges);
let (document_changes, operation_stats, primary_key) = indexer let (document_changes, operation_stats, primary_key) = indexer
@ -266,18 +256,8 @@ impl IndexScheduler {
let mut congestion = None; let mut congestion = None;
if task.error.is_none() { if task.error.is_none() {
let local_pool;
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
let pool = match &indexer_config.thread_pool { let pool = &indexer_config.thread_pool;
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}"))
.build()
.unwrap();
&local_pool
}
};
let candidates_count = candidates.len(); let candidates_count = candidates.len();
progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges); progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges);
@ -429,18 +409,8 @@ impl IndexScheduler {
let mut congestion = None; let mut congestion = None;
if !tasks.iter().all(|res| res.error.is_some()) { if !tasks.iter().all(|res| res.error.is_some()) {
let local_pool;
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
let pool = match &indexer_config.thread_pool { let pool = &indexer_config.thread_pool;
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}"))
.build()
.unwrap();
&local_pool
}
};
progress.update_progress(DocumentDeletionProgress::DeleteDocuments); progress.update_progress(DocumentDeletionProgress::DeleteDocuments);
let mut indexer = indexer::DocumentDeletion::new(); let mut indexer = indexer::DocumentDeletion::new();

View File

@ -37,7 +37,9 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_auth::{open_auth_store_env, AuthController};
use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::constants::VERSION_MAJOR;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; use meilisearch_types::milli::update::{
default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig,
};
use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::KindWithContent; use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::versioning::{ use meilisearch_types::versioning::{
@ -500,7 +502,19 @@ fn import_dump(
let network = dump_reader.network()?.cloned().unwrap_or_default(); let network = dump_reader.network()?.cloned().unwrap_or_default();
index_scheduler.put_network(network)?; index_scheduler.put_network(network)?;
let indexer_config = index_scheduler.indexer_config(); // 3.1 Use all cpus to process dump if `max_indexing_threads` not configured
let backup_config;
let base_config = index_scheduler.indexer_config();
let indexer_config = if base_config.max_threads.is_none() {
let (thread_pool, _) = default_thread_pool_and_threads();
let _config = IndexerConfig { thread_pool, ..*base_config };
backup_config = _config;
&backup_config
} else {
base_config
};
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
// try to process tasks while we're trying to import the indexes. // try to process tasks while we're trying to import the indexes.

View File

@ -746,10 +746,12 @@ impl IndexerOpts {
max_indexing_memory.to_string(), max_indexing_memory.to_string(),
); );
} }
export_to_env_if_not_present( if let Some(max_indexing_threads) = max_indexing_threads.0 {
MEILI_MAX_INDEXING_THREADS, export_to_env_if_not_present(
max_indexing_threads.0.to_string(), MEILI_MAX_INDEXING_THREADS,
); max_indexing_threads.to_string(),
);
}
} }
} }
@ -757,15 +759,15 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> { fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
let thread_pool = ThreadPoolNoAbortBuilder::new() let thread_pool = ThreadPoolNoAbortBuilder::new_for_indexing()
.thread_name(|index| format!("indexing-thread:{index}")) .num_threads(other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2))
.num_threads(*other.max_indexing_threads)
.build()?; .build()?;
Ok(Self { Ok(Self {
thread_pool,
log_every_n: Some(DEFAULT_LOG_EVERY_N), log_every_n: Some(DEFAULT_LOG_EVERY_N),
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
thread_pool: Some(thread_pool), max_threads: *other.max_indexing_threads,
max_positions_per_attributes: None, max_positions_per_attributes: None,
skip_index_budget: other.skip_index_budget, skip_index_budget: other.skip_index_budget,
..Default::default() ..Default::default()
@ -828,31 +830,31 @@ fn total_memory_bytes() -> Option<u64> {
} }
} }
#[derive(Debug, Clone, Copy, Deserialize, Serialize)] #[derive(Default, Debug, Clone, Copy, Deserialize, Serialize)]
pub struct MaxThreads(usize); pub struct MaxThreads(Option<usize>);
impl FromStr for MaxThreads { impl FromStr for MaxThreads {
type Err = ParseIntError; type Err = ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<MaxThreads, Self::Err> {
usize::from_str(s).map(Self) if s.is_empty() || s == "unlimited" {
} return Ok(MaxThreads::default());
} }
usize::from_str(s).map(Some).map(MaxThreads)
impl Default for MaxThreads {
fn default() -> Self {
MaxThreads(num_cpus::get() / 2)
} }
} }
impl fmt::Display for MaxThreads { impl fmt::Display for MaxThreads {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0) match self.0 {
Some(threads) => write!(f, "{}", threads),
None => write!(f, "unlimited"),
}
} }
} }
impl Deref for MaxThreads { impl Deref for MaxThreads {
type Target = usize; type Target = Option<usize>;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.0 &self.0

View File

@ -19,10 +19,7 @@ use crate::update::{
}; };
use crate::vector::settings::{EmbedderSource, EmbeddingSettings}; use crate::vector::settings::{EmbedderSource, EmbeddingSettings};
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::{ use crate::{db_snap, obkv_to_json, Filter, FilterableAttributesRule, Index, Search, SearchResult};
db_snap, obkv_to_json, Filter, FilterableAttributesRule, Index, Search, SearchResult,
ThreadPoolNoAbortBuilder,
};
pub(crate) struct TempIndex { pub(crate) struct TempIndex {
pub inner: Index, pub inner: Index,
@ -62,15 +59,8 @@ impl TempIndex {
wtxn: &mut RwTxn<'t>, wtxn: &mut RwTxn<'t>,
documents: Mmap, documents: Mmap,
) -> Result<(), crate::error::Error> { ) -> Result<(), crate::error::Error> {
let local_pool;
let indexer_config = &self.indexer_config; let indexer_config = &self.indexer_config;
let pool = match &indexer_config.thread_pool { let pool = &indexer_config.thread_pool;
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let rtxn = self.inner.read_txn()?; let rtxn = self.inner.read_txn()?;
let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?;
@ -153,15 +143,8 @@ impl TempIndex {
wtxn: &mut RwTxn<'t>, wtxn: &mut RwTxn<'t>,
external_document_ids: Vec<String>, external_document_ids: Vec<String>,
) -> Result<(), crate::error::Error> { ) -> Result<(), crate::error::Error> {
let local_pool;
let indexer_config = &self.indexer_config; let indexer_config = &self.indexer_config;
let pool = match &indexer_config.thread_pool { let pool = &indexer_config.thread_pool;
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let rtxn = self.inner.read_txn()?; let rtxn = self.inner.read_txn()?;
let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?;
@ -231,15 +214,8 @@ fn aborting_indexation() {
let mut wtxn = index.inner.write_txn().unwrap(); let mut wtxn = index.inner.write_txn().unwrap();
let should_abort = AtomicBool::new(false); let should_abort = AtomicBool::new(false);
let local_pool;
let indexer_config = &index.indexer_config; let indexer_config = &index.indexer_config;
let pool = match &indexer_config.thread_pool { let pool = &indexer_config.thread_pool;
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let rtxn = index.inner.read_txn().unwrap(); let rtxn = index.inner.read_txn().unwrap();
let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap();

View File

@ -54,6 +54,10 @@ impl ThreadPoolNoAbortBuilder {
ThreadPoolNoAbortBuilder::default() ThreadPoolNoAbortBuilder::default()
} }
pub fn new_for_indexing() -> ThreadPoolNoAbortBuilder {
ThreadPoolNoAbortBuilder::default().thread_name(|index| format!("indexing-thread:{index}"))
}
pub fn thread_name<F>(mut self, closure: F) -> Self pub fn thread_name<F>(mut self, closure: F) -> Self
where where
F: FnMut(usize) -> String + 'static, F: FnMut(usize) -> String + 'static,

View File

@ -33,7 +33,6 @@ use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError}; use crate::error::{Error, InternalError};
use crate::index::{PrefixSearch, PrefixSettings}; use crate::index::{PrefixSearch, PrefixSettings};
use crate::progress::Progress; use crate::progress::Progress;
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap; pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{ use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
@ -228,24 +227,7 @@ where
let possible_embedding_mistakes = let possible_embedding_mistakes =
crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution);
let backup_pool; let pool = &self.indexer_config.thread_pool;
let pool = match self.indexer_config.thread_pool {
Some(ref pool) => pool,
None => {
// We initialize a backup pool with the default
// settings if none have already been set.
#[allow(unused_mut)]
let mut pool_builder = ThreadPoolNoAbortBuilder::new();
#[cfg(test)]
{
pool_builder = pool_builder.num_threads(1);
}
backup_pool = pool_builder.build()?;
&backup_pool
}
};
// create LMDB writer channel // create LMDB writer channel
let (lmdb_writer_sx, lmdb_writer_rx): ( let (lmdb_writer_sx, lmdb_writer_rx): (

View File

@ -1,7 +1,7 @@
use grenad::CompressionType; use grenad::CompressionType;
use super::GrenadParameters; use super::GrenadParameters;
use crate::thread_pool_no_abort::ThreadPoolNoAbort; use crate::{thread_pool_no_abort::ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
#[derive(Debug)] #[derive(Debug)]
pub struct IndexerConfig { pub struct IndexerConfig {
@ -9,9 +9,10 @@ pub struct IndexerConfig {
pub max_nb_chunks: Option<usize>, pub max_nb_chunks: Option<usize>,
pub documents_chunk_size: Option<usize>, pub documents_chunk_size: Option<usize>,
pub max_memory: Option<usize>, pub max_memory: Option<usize>,
pub max_threads: Option<usize>,
pub chunk_compression_type: CompressionType, pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>, pub chunk_compression_level: Option<u32>,
pub thread_pool: Option<ThreadPoolNoAbort>, pub thread_pool: ThreadPoolNoAbort,
pub max_positions_per_attributes: Option<u32>, pub max_positions_per_attributes: Option<u32>,
pub skip_index_budget: bool, pub skip_index_budget: bool,
} }
@ -27,16 +28,39 @@ impl IndexerConfig {
} }
} }
/// By default use only 1 thread for indexing in tests
#[cfg(test)]
pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option<usize>) {
let pool = ThreadPoolNoAbortBuilder::new_for_indexing()
.num_threads(1)
.build()
.expect("failed to build default rayon thread pool");
(pool, Some(1))
}
#[cfg(not(test))]
pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option<usize>) {
let pool = ThreadPoolNoAbortBuilder::new_for_indexing()
.build()
.expect("failed to build default rayon thread pool");
(pool, None)
}
impl Default for IndexerConfig { impl Default for IndexerConfig {
fn default() -> Self { fn default() -> Self {
let (thread_pool, max_threads) = default_thread_pool_and_threads();
Self { Self {
max_threads,
thread_pool,
log_every_n: None, log_every_n: None,
max_nb_chunks: None, max_nb_chunks: None,
documents_chunk_size: None, documents_chunk_size: None,
max_memory: None, max_memory: None,
chunk_compression_type: CompressionType::None, chunk_compression_type: CompressionType::None,
chunk_compression_level: None, chunk_compression_level: None,
thread_pool: None,
max_positions_per_attributes: None, max_positions_per_attributes: None,
skip_index_budget: false, skip_index_budget: false,
} }

View File

@ -4,7 +4,7 @@ pub use self::concurrent_available_ids::ConcurrentAvailableIds;
pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::bulk::FacetsUpdateBulk;
pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::facet::incremental::FacetsUpdateIncrementalInner;
pub use self::index_documents::*; pub use self::index_documents::*;
pub use self::indexer_config::IndexerConfig; pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig};
pub use self::new::ChannelCongestion; pub use self::new::ChannelCongestion;
pub use self::settings::{validate_embedding_settings, Setting, Settings}; pub use self::settings::{validate_embedding_settings, Setting, Settings};
pub use self::update_step::UpdateIndexingStep; pub use self::update_step::UpdateIndexingStep;