Sweep old batches using dumpless upgrade

This commit is contained in:
Mubelotix
2025-08-12 16:48:28 +02:00
parent d7776ec82b
commit fe275397ec
5 changed files with 75 additions and 4 deletions

View File

@ -232,7 +232,11 @@ pub(crate) mod test {
use std::{fs::File, io::Seek};
use meili_snap::insta;
use meilisearch_types::{batches::{Batch, BatchEnqueuedAt, BatchStats}, task_view::DetailsView, tasks::{BatchStopReason, Kind, Status}};
use meilisearch_types::{
batches::{Batch, BatchEnqueuedAt, BatchStats},
task_view::DetailsView,
tasks::{BatchStopReason, Kind, Status},
};
use time::macros::datetime;
use super::*;

View File

@ -66,7 +66,7 @@ impl BatchQueue {
NUMBER_OF_DATABASES
}
pub(super) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
pub(crate) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self {
all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?,
status: env.create_database(wtxn, Some(db_name::BATCH_STATUS))?,

View File

@ -32,7 +32,7 @@ use crate::{Error, IndexSchedulerOptions, Result, TaskId};
/// The number of database used by queue itself
const NUMBER_OF_DATABASES: u32 = 1;
/// Database const names for the `IndexScheduler`.
mod db_name {
pub(crate) mod db_name {
pub const BATCH_TO_TASKS_MAPPING: &str = "batch-to-tasks-mapping";
}

View File

@ -7,6 +7,9 @@ use tracing::info;
use crate::queue::TaskQueue;
use crate::versioning::Versioning;
use v1_18::V1_17_To_V1_18_0;
mod v1_18;
trait UpgradeIndexScheduler {
fn upgrade(
@ -29,7 +32,8 @@ pub fn upgrade_index_scheduler(
let current_patch = to.2;
let upgrade_functions: &[&dyn UpgradeIndexScheduler] = &[
// This is the last upgrade function, it will be called when the index is up to date.
&V1_17_To_V1_18_0 {},
// This is the last upgrade function, it will be called when the scheduler is up to date.
// any other upgrade function should be added before this one.
&ToCurrentNoOp {},
];
@ -40,6 +44,7 @@ pub fn upgrade_index_scheduler(
(1, 14, _) => 0,
(1, 15, _) => 0,
(1, 16, _) => 0,
(1, 17, _) => 0,
(major, minor, patch) => {
if major > current_major
|| (major == current_major && minor > current_minor)

View File

@ -0,0 +1,62 @@
use meilisearch_types::{
heed::Database,
milli::{CboRoaringBitmapCodec, BEU32},
};
use tracing::info;
use super::UpgradeIndexScheduler;
use crate::queue::{db_name::BATCH_TO_TASKS_MAPPING, BatchQueue};
#[allow(non_camel_case_types)]
pub(super) struct V1_17_To_V1_18_0();
impl UpgradeIndexScheduler for V1_17_To_V1_18_0 {
fn upgrade(
&self,
env: &meilisearch_types::heed::Env<meilisearch_types::heed::WithoutTls>,
wtxn: &mut meilisearch_types::heed::RwTxn,
_original: (u32, u32, u32),
) -> anyhow::Result<()> {
let batch_queue = BatchQueue::new(env, wtxn)?;
let all_batch_ids = batch_queue.all_batch_ids(wtxn)?;
let batch_to_tasks_mapping: Database<BEU32, CboRoaringBitmapCodec> =
env.create_database(wtxn, Some(BATCH_TO_TASKS_MAPPING))?;
let all_batches = batch_queue.all_batches.lazily_decode_data();
let iter = all_batches.iter(wtxn)?;
let mut range_start = None;
let mut count = 0;
let mut ranges = Vec::new();
for batch in iter {
let (batch_id, _) = batch?;
if !all_batch_ids.contains(batch_id) {
count += 1;
if range_start.is_none() {
range_start = Some(batch_id);
}
} else if let Some(start) = range_start.take() {
ranges.push(start..batch_id);
}
}
if let Some(start) = range_start {
ranges.push(start..u32::MAX);
}
if !ranges.is_empty() {
info!("Removing {count} batches that were not properly removed in previous versions due to #5827.");
}
for range in ranges {
batch_queue.all_batches.delete_range(wtxn, &range)?;
batch_to_tasks_mapping.delete_range(wtxn, &range)?;
}
Ok(())
}
fn target_version(&self) -> (u32, u32, u32) {
(1, 18, 0)
}
}