refactor: change thread_pool from Option<ThreadPoolNoAbort> to

ThreadPoolNoAbort
This commit is contained in:
nnethercott 2025-05-07 17:00:08 +02:00
parent 47a7ed93d3
commit 53f32a7dd7
6 changed files with 38 additions and 110 deletions

View File

@ -5,7 +5,7 @@ use meilisearch_types::milli::documents::PrimaryKey;
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
use meilisearch_types::milli::update::DocumentAdditionResult;
use meilisearch_types::milli::{self, ChannelCongestion, Filter, ThreadPoolNoAbortBuilder};
use meilisearch_types::milli::{self, ChannelCongestion, Filter};
use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use meilisearch_types::Index;
@ -113,18 +113,8 @@ impl IndexScheduler {
}
}
let local_pool;
let indexer_config = self.index_mapper.indexer_config();
let pool = match &indexer_config.thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}"))
.build()
.unwrap();
&local_pool
}
};
let pool = &indexer_config.thread_pool;
progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges);
let (document_changes, operation_stats, primary_key) = indexer
@ -266,18 +256,8 @@ impl IndexScheduler {
let mut congestion = None;
if task.error.is_none() {
let local_pool;
let indexer_config = self.index_mapper.indexer_config();
let pool = match &indexer_config.thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}"))
.build()
.unwrap();
&local_pool
}
};
let pool = &indexer_config.thread_pool;
let candidates_count = candidates.len();
progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges);
@ -429,18 +409,8 @@ impl IndexScheduler {
let mut congestion = None;
if !tasks.iter().all(|res| res.error.is_some()) {
let local_pool;
let indexer_config = self.index_mapper.indexer_config();
let pool = match &indexer_config.thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}"))
.build()
.unwrap();
&local_pool
}
};
let pool = &indexer_config.thread_pool;
progress.update_progress(DocumentDeletionProgress::DeleteDocuments);
let mut indexer = indexer::DocumentDeletion::new();

View File

@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::{open_auth_store_env, AuthController};
use meilisearch_types::milli::constants::VERSION_MAJOR;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig};
use meilisearch_types::milli::ThreadPoolNoAbortBuilder;
use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::KindWithContent;
@ -504,22 +504,22 @@ fn import_dump(
let network = dump_reader.network()?.cloned().unwrap_or_default();
index_scheduler.put_network(network)?;
// 3.1 Use all cpus to process dump if max_indexing_threads not configured
// 3.1 Use all cpus to process dump if a) `max_indexing_threads` not configured and
// b) we're not executing from within a test
let backup_config;
let indexer_config = if index_scheduler.indexer_config().max_threads.is_none() {
let mut _config = index_scheduler.indexer_config().clone_no_threadpool();
_config.thread_pool = {
Some(
ThreadPoolNoAbortBuilder::new()
let base_config = index_scheduler.indexer_config();
let indexer_config = if base_config.max_threads.is_none() && !cfg!(test) {
let thread_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|index| format!("indexing-thread:{index}"))
.num_threads(num_cpus::get())
.build()?,
)
};
.build()?;
let _config = IndexerConfig { thread_pool, ..*base_config };
backup_config = _config;
&backup_config
} else {
index_scheduler.indexer_config()
base_config
};
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might

View File

@ -768,10 +768,10 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
.build()?;
Ok(Self {
thread_pool,
log_every_n: Some(DEFAULT_LOG_EVERY_N),
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
max_threads: *other.max_indexing_threads,
thread_pool: Some(thread_pool),
max_positions_per_attributes: None,
skip_index_budget: other.skip_index_budget,
..Default::default()

View File

@ -1893,7 +1893,6 @@ pub(crate) mod tests {
use crate::vector::EmbeddingConfigs;
use crate::{
db_snap, obkv_to_json, Filter, FilterableAttributesRule, Index, Search, SearchResult,
ThreadPoolNoAbortBuilder,
};
pub(crate) struct TempIndex {
@ -1934,15 +1933,8 @@ pub(crate) mod tests {
wtxn: &mut RwTxn<'t>,
documents: Mmap,
) -> Result<(), crate::error::Error> {
let local_pool;
let indexer_config = &self.indexer_config;
let pool = match &indexer_config.thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let pool = &indexer_config.thread_pool;
let rtxn = self.inner.read_txn()?;
let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?;
@ -2028,15 +2020,8 @@ pub(crate) mod tests {
wtxn: &mut RwTxn<'t>,
external_document_ids: Vec<String>,
) -> Result<(), crate::error::Error> {
let local_pool;
let indexer_config = &self.indexer_config;
let pool = match &indexer_config.thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let pool = &indexer_config.thread_pool;
let rtxn = self.inner.read_txn()?;
let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?;
@ -2107,15 +2092,8 @@ pub(crate) mod tests {
let mut wtxn = index.inner.write_txn().unwrap();
let should_abort = AtomicBool::new(false);
let local_pool;
let indexer_config = &index.indexer_config;
let pool = match &indexer_config.thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let pool = &indexer_config.thread_pool;
let rtxn = index.inner.read_txn().unwrap();
let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap();

View File

@ -33,7 +33,6 @@ use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError};
use crate::index::{PrefixSearch, PrefixSettings};
use crate::progress::Progress;
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
@ -228,24 +227,7 @@ where
let possible_embedding_mistakes =
crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution);
let backup_pool;
let pool = match self.indexer_config.thread_pool {
Some(ref pool) => pool,
None => {
// We initialize a backup pool with the default
// settings if none have already been set.
#[allow(unused_mut)]
let mut pool_builder = ThreadPoolNoAbortBuilder::new();
#[cfg(test)]
{
pool_builder = pool_builder.num_threads(1);
}
backup_pool = pool_builder.build()?;
&backup_pool
}
};
let pool = &self.indexer_config.thread_pool;
// create LMDB writer channel
let (lmdb_writer_sx, lmdb_writer_rx): (

View File

@ -1,7 +1,7 @@
use grenad::CompressionType;
use super::GrenadParameters;
use crate::thread_pool_no_abort::ThreadPoolNoAbort;
use crate::{thread_pool_no_abort::ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
#[derive(Debug)]
pub struct IndexerConfig {
@ -12,7 +12,7 @@ pub struct IndexerConfig {
pub max_threads: Option<usize>,
pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>,
pub thread_pool: Option<ThreadPoolNoAbort>,
pub thread_pool: ThreadPoolNoAbort,
pub max_positions_per_attributes: Option<u32>,
pub skip_index_budget: bool,
}
@ -26,25 +26,23 @@ impl IndexerConfig {
max_nb_chunks: self.max_nb_chunks,
}
}
pub fn clone_no_threadpool(&self) -> Self {
Self {
log_every_n: self.log_every_n,
max_nb_chunks: self.max_nb_chunks,
documents_chunk_size: self.documents_chunk_size,
max_memory: self.max_memory,
max_threads: self.max_threads,
chunk_compression_type: self.chunk_compression_type,
chunk_compression_level: self.chunk_compression_level,
max_positions_per_attributes: self.max_positions_per_attributes,
skip_index_budget: self.skip_index_budget,
thread_pool: None,
}
}
}
impl Default for IndexerConfig {
fn default() -> Self {
#[allow(unused_mut)]
let mut pool_builder = ThreadPoolNoAbortBuilder::new();
#[cfg(test)]
{
pool_builder = pool_builder.num_threads(1);
}
let thread_pool = pool_builder
.thread_name(|index| format!("indexing-thread:{index}"))
.build()
.expect("failed to build default rayon thread pool");
Self {
log_every_n: None,
max_nb_chunks: None,
@ -53,9 +51,9 @@ impl Default for IndexerConfig {
max_threads: None,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
thread_pool: None,
max_positions_per_attributes: None,
skip_index_budget: false,
thread_pool,
}
}
}