Compare commits

...

3 Commits

6 changed files with 32 additions and 3 deletions

View File

@ -534,7 +534,9 @@ impl IndexScheduler {
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
// If autobatching is disabled we only take one task at a time.
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
// Otherwise, we take only a maximum of tasks to create batches.
let tasks_limit =
if self.autobatching_enabled { self.maximum_number_of_batched_tasks } else { 1 };
let enqueued = index_tasks
.into_iter()

View File

@ -15,6 +15,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let IndexScheduler {
autobatching_enabled,
maximum_number_of_batched_tasks: _,
must_stop_processing: _,
processing_tasks,
file_store,

View File

@ -253,6 +253,9 @@ pub struct IndexSchedulerOptions {
/// Set to `true` iff the index scheduler is allowed to automatically
/// batch tasks together, to process multiple tasks at once.
pub autobatching_enabled: bool,
/// If the autobatcher is allowed to automatically batch tasks
/// it will only batch this defined number of tasks at once.
pub maximum_number_of_batched_tasks: usize,
/// The maximum number of tasks stored in the task queue before starting
/// to auto schedule task deletions.
pub max_number_of_tasks: usize,
@ -310,6 +313,9 @@ pub struct IndexScheduler {
/// Whether auto-batching is enabled or not.
pub(crate) autobatching_enabled: bool,
/// The maximum number of tasks that will be batched together.
pub(crate) maximum_number_of_batched_tasks: usize,
/// The max number of tasks allowed before the scheduler starts to delete
/// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize,
@ -363,6 +369,7 @@ impl IndexScheduler {
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
maximum_number_of_batched_tasks: self.maximum_number_of_batched_tasks,
max_number_of_tasks: self.max_number_of_tasks,
snapshots_path: self.snapshots_path.clone(),
dumps_path: self.dumps_path.clone(),
@ -458,6 +465,7 @@ impl IndexScheduler {
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled,
maximum_number_of_batched_tasks: options.maximum_number_of_batched_tasks,
max_number_of_tasks: options.max_number_of_tasks,
dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path,
@ -1578,6 +1586,7 @@ mod tests {
index_count: 5,
indexer_config,
autobatching_enabled: true,
maximum_number_of_batched_tasks: usize::MAX,
max_number_of_tasks: 1_000_000,
instance_features: Default::default(),
};

View File

@ -285,6 +285,7 @@ impl From<Opt> for Infos {
db_path,
experimental_enable_metrics,
experimental_reduce_indexing_memory_usage,
experimental_limit_batched_tasks: _,
http_addr,
master_key: _,
env,

View File

@ -236,6 +236,7 @@ fn open_or_create_database_unchecked(
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
maximum_number_of_batched_tasks: opt.experimental_limit_batched_tasks,
max_number_of_tasks: 1_000_000,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,

View File

@ -51,6 +51,7 @@ const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
"MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE";
const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS: &str = "MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms";
@ -301,6 +302,11 @@ pub struct Opt {
#[serde(default)]
pub experimental_reduce_indexing_memory_usage: bool,
/// Experimental limit to the number of tasks per batch
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())]
#[serde(default = "default_limit_batched_tasks")]
pub experimental_limit_batched_tasks: usize,
#[serde(flatten)]
#[clap(flatten)]
pub indexer_options: IndexerOpts,
@ -393,7 +399,8 @@ impl Opt {
#[cfg(all(not(debug_assertions), feature = "analytics"))]
no_analytics,
experimental_enable_metrics: enable_metrics_route,
experimental_reduce_indexing_memory_usage: reduce_indexing_memory_usage,
experimental_reduce_indexing_memory_usage,
experimental_limit_batched_tasks,
} = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -437,7 +444,11 @@ impl Opt {
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE,
reduce_indexing_memory_usage.to_string(),
experimental_reduce_indexing_memory_usage.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS,
experimental_limit_batched_tasks.to_string(),
);
indexer_options.export_to_env();
}
@ -739,6 +750,10 @@ fn default_dump_dir() -> PathBuf {
PathBuf::from(DEFAULT_DUMP_DIR)
}
fn default_limit_batched_tasks() -> usize {
usize::MAX
}
/// Indicates if a snapshot was scheduled, and if yes with which interval.
#[derive(Debug, Default, Copy, Clone, Deserialize, Serialize)]
pub enum ScheduleSnapshot {