diff --git a/crates/dump/src/reader/mod.rs b/crates/dump/src/reader/mod.rs index f2a8ac90c..53ff4844c 100644 --- a/crates/dump/src/reader/mod.rs +++ b/crates/dump/src/reader/mod.rs @@ -232,7 +232,11 @@ pub(crate) mod test { use std::{fs::File, io::Seek}; use meili_snap::insta; - use meilisearch_types::{batches::{Batch, BatchEnqueuedAt, BatchStats}, task_view::DetailsView, tasks::{BatchStopReason, Kind, Status}}; + use meilisearch_types::{ + batches::{Batch, BatchEnqueuedAt, BatchStats}, + task_view::DetailsView, + tasks::{BatchStopReason, Kind, Status}, + }; use time::macros::datetime; use super::*; diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index b96f65836..62f145730 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -66,7 +66,7 @@ impl BatchQueue { NUMBER_OF_DATABASES } - pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { + pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { Ok(Self { all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?, status: env.create_database(wtxn, Some(db_name::BATCH_STATUS))?, diff --git a/crates/index-scheduler/src/queue/mod.rs b/crates/index-scheduler/src/queue/mod.rs index 92de10fe1..1d90cddaa 100644 --- a/crates/index-scheduler/src/queue/mod.rs +++ b/crates/index-scheduler/src/queue/mod.rs @@ -32,7 +32,7 @@ use crate::{Error, IndexSchedulerOptions, Result, TaskId}; /// The number of database used by queue itself const NUMBER_OF_DATABASES: u32 = 1; /// Database const names for the `IndexScheduler`. -mod db_name { +pub(crate) mod db_name { pub const BATCH_TO_TASKS_MAPPING: &str = "batch-to-tasks-mapping"; } diff --git a/crates/index-scheduler/src/upgrade/mod.rs b/crates/index-scheduler/src/upgrade/mod.rs index a749b31d5..69c91b642 100644 --- a/crates/index-scheduler/src/upgrade/mod.rs +++ b/crates/index-scheduler/src/upgrade/mod.rs @@ -7,6 +7,9 @@ use tracing::info; use crate::queue::TaskQueue; use crate::versioning::Versioning; +use v1_18::V1_17_To_V1_18_0; + +mod v1_18; trait UpgradeIndexScheduler { fn upgrade( @@ -29,7 +32,8 @@ pub fn upgrade_index_scheduler( let current_patch = to.2; let upgrade_functions: &[&dyn UpgradeIndexScheduler] = &[ - // This is the last upgrade function, it will be called when the index is up to date. + &V1_17_To_V1_18_0 {}, + // This is the last upgrade function, it will be called when the scheduler is up to date. // any other upgrade function should be added before this one. &ToCurrentNoOp {}, ]; @@ -40,6 +44,7 @@ pub fn upgrade_index_scheduler( (1, 14, _) => 0, (1, 15, _) => 0, (1, 16, _) => 0, + (1, 17, _) => 0, (major, minor, patch) => { if major > current_major || (major == current_major && minor > current_minor) diff --git a/crates/index-scheduler/src/upgrade/v1_18.rs b/crates/index-scheduler/src/upgrade/v1_18.rs new file mode 100644 index 000000000..65909cdb3 --- /dev/null +++ b/crates/index-scheduler/src/upgrade/v1_18.rs @@ -0,0 +1,62 @@ +use meilisearch_types::{ + heed::Database, + milli::{CboRoaringBitmapCodec, BEU32}, +}; +use tracing::info; + +use super::UpgradeIndexScheduler; +use crate::queue::{db_name::BATCH_TO_TASKS_MAPPING, BatchQueue}; + +#[allow(non_camel_case_types)] +pub(super) struct V1_17_To_V1_18_0(); + +impl UpgradeIndexScheduler for V1_17_To_V1_18_0 { + fn upgrade( + &self, + env: &meilisearch_types::heed::Env, + wtxn: &mut meilisearch_types::heed::RwTxn, + _original: (u32, u32, u32), + ) -> anyhow::Result<()> { + let batch_queue = BatchQueue::new(env, wtxn)?; + let all_batch_ids = batch_queue.all_batch_ids(wtxn)?; + + let batch_to_tasks_mapping: Database = + env.create_database(wtxn, Some(BATCH_TO_TASKS_MAPPING))?; + + let all_batches = batch_queue.all_batches.lazily_decode_data(); + let iter = all_batches.iter(wtxn)?; + let mut range_start = None; + let mut count = 0; + let mut ranges = Vec::new(); + for batch in iter { + let (batch_id, _) = batch?; + + if !all_batch_ids.contains(batch_id) { + count += 1; + if range_start.is_none() { + range_start = Some(batch_id); + } + } else if let Some(start) = range_start.take() { + ranges.push(start..batch_id); + } + } + if let Some(start) = range_start { + ranges.push(start..u32::MAX); + } + + if !ranges.is_empty() { + info!("Removing {count} batches that were not properly removed in previous versions due to #5827."); + } + + for range in ranges { + batch_queue.all_batches.delete_range(wtxn, &range)?; + batch_to_tasks_mapping.delete_range(wtxn, &range)?; + } + + Ok(()) + } + + fn target_version(&self) -> (u32, u32, u32) { + (1, 18, 0) + } +}