mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-06-06 12:15:45 +00:00
Create temp threadpool with all CPUs in dump
This commit is contained in:
parent
9fd9fcb03e
commit
648b2876f6
@ -115,7 +115,8 @@ impl IndexScheduler {
|
||||
|
||||
let local_pool;
|
||||
let indexer_config = self.index_mapper.indexer_config();
|
||||
let pool = match &indexer_config.thread_pool {
|
||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
||||
let pool = match &*pool_guard {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
local_pool = ThreadPoolNoAbortBuilder::new()
|
||||
@ -268,7 +269,8 @@ impl IndexScheduler {
|
||||
if task.error.is_none() {
|
||||
let local_pool;
|
||||
let indexer_config = self.index_mapper.indexer_config();
|
||||
let pool = match &indexer_config.thread_pool {
|
||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
||||
let pool = match &*pool_guard {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
local_pool = ThreadPoolNoAbortBuilder::new()
|
||||
@ -431,7 +433,8 @@ impl IndexScheduler {
|
||||
if !tasks.iter().all(|res| res.error.is_some()) {
|
||||
let local_pool;
|
||||
let indexer_config = self.index_mapper.indexer_config();
|
||||
let pool = match &indexer_config.thread_pool {
|
||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
||||
let pool = match &*pool_guard {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
local_pool = ThreadPoolNoAbortBuilder::new()
|
||||
|
@ -38,6 +38,7 @@ use meilisearch_auth::{open_auth_store_env, AuthController};
|
||||
use meilisearch_types::milli::constants::VERSION_MAJOR;
|
||||
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
|
||||
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
|
||||
use meilisearch_types::milli::ThreadPoolNoAbortBuilder;
|
||||
use meilisearch_types::settings::apply_settings_to_builder;
|
||||
use meilisearch_types::tasks::KindWithContent;
|
||||
use meilisearch_types::versioning::{
|
||||
@ -505,6 +506,18 @@ fn import_dump(
|
||||
|
||||
let indexer_config = index_scheduler.indexer_config();
|
||||
|
||||
// Use all cpus to index a dump
|
||||
let pool_before = {
|
||||
let all_cpus = num_cpus::get();
|
||||
|
||||
let temp_pool = ThreadPoolNoAbortBuilder::new()
|
||||
.thread_name(|index| format!("indexing-thread:{index}"))
|
||||
.num_threads(all_cpus)
|
||||
.build()?;
|
||||
|
||||
indexer_config.thread_pool.write().unwrap().replace(temp_pool)
|
||||
};
|
||||
|
||||
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
||||
// try to process tasks while we're trying to import the indexes.
|
||||
|
||||
@ -576,6 +589,12 @@ fn import_dump(
|
||||
index_scheduler.refresh_index_stats(&uid)?;
|
||||
}
|
||||
|
||||
// Restore original thread pool after dump
|
||||
{
|
||||
let mut guard = indexer_config.thread_pool.write().unwrap();
|
||||
*guard = pool_before;
|
||||
}
|
||||
|
||||
// 5. Import the queue
|
||||
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
|
||||
// 5.1. Import the batches
|
||||
|
@ -6,7 +6,7 @@ use std::num::{NonZeroUsize, ParseIntError};
|
||||
use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::{env, fmt, fs};
|
||||
|
||||
use byte_unit::{Byte, ParseError, UnitType};
|
||||
@ -765,7 +765,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
Ok(Self {
|
||||
log_every_n: Some(DEFAULT_LOG_EVERY_N),
|
||||
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
|
||||
thread_pool: Some(thread_pool),
|
||||
thread_pool: RwLock::new(Some(thread_pool)),
|
||||
max_positions_per_attributes: None,
|
||||
skip_index_budget: other.skip_index_budget,
|
||||
..Default::default()
|
||||
|
@ -1936,7 +1936,8 @@ pub(crate) mod tests {
|
||||
) -> Result<(), crate::error::Error> {
|
||||
let local_pool;
|
||||
let indexer_config = &self.indexer_config;
|
||||
let pool = match &indexer_config.thread_pool {
|
||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
||||
let pool = match &*pool_guard {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||
@ -2030,7 +2031,8 @@ pub(crate) mod tests {
|
||||
) -> Result<(), crate::error::Error> {
|
||||
let local_pool;
|
||||
let indexer_config = &self.indexer_config;
|
||||
let pool = match &indexer_config.thread_pool {
|
||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
||||
let pool = match &*pool_guard {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||
@ -2109,7 +2111,8 @@ pub(crate) mod tests {
|
||||
|
||||
let local_pool;
|
||||
let indexer_config = &index.indexer_config;
|
||||
let pool = match &indexer_config.thread_pool {
|
||||
let pool_guard = indexer_config.thread_pool.read().unwrap();
|
||||
let pool = match &*pool_guard {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||
|
@ -228,8 +228,10 @@ where
|
||||
let possible_embedding_mistakes =
|
||||
crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution);
|
||||
|
||||
let pool_guard = self.indexer_config.thread_pool.read().unwrap();
|
||||
|
||||
let backup_pool;
|
||||
let pool = match self.indexer_config.thread_pool {
|
||||
let pool = match &*pool_guard {
|
||||
Some(ref pool) => pool,
|
||||
None => {
|
||||
// We initialize a backup pool with the default
|
||||
|
@ -1,3 +1,5 @@
|
||||
use std::sync::RwLock;
|
||||
|
||||
use grenad::CompressionType;
|
||||
|
||||
use super::GrenadParameters;
|
||||
@ -11,7 +13,7 @@ pub struct IndexerConfig {
|
||||
pub max_memory: Option<usize>,
|
||||
pub chunk_compression_type: CompressionType,
|
||||
pub chunk_compression_level: Option<u32>,
|
||||
pub thread_pool: Option<ThreadPoolNoAbort>,
|
||||
pub thread_pool: RwLock<Option<ThreadPoolNoAbort>>,
|
||||
pub max_positions_per_attributes: Option<u32>,
|
||||
pub skip_index_budget: bool,
|
||||
}
|
||||
@ -36,7 +38,7 @@ impl Default for IndexerConfig {
|
||||
max_memory: None,
|
||||
chunk_compression_type: CompressionType::None,
|
||||
chunk_compression_level: None,
|
||||
thread_pool: None,
|
||||
thread_pool: RwLock::new(None),
|
||||
max_positions_per_attributes: None,
|
||||
skip_index_budget: false,
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user