From 7eb871d6716add774e052d51ff9bc09f514444ef Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 12 Aug 2025 15:41:10 +0200 Subject: [PATCH] Fix old databases imported from dumps for #5827 --- crates/dump/src/lib.rs | 11 ++++++-- crates/dump/src/reader/mod.rs | 44 ++++++++++++++++++++++++++++++-- crates/dump/src/reader/v6/mod.rs | 34 +++++++++++++++++++++--- 3 files changed, 81 insertions(+), 8 deletions(-) diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index 81ba40944..776c1b331 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -263,6 +263,7 @@ pub(crate) mod test { use uuid::Uuid; use crate::reader::Document; + use crate::writer::BatchWriter; use crate::{DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version}; pub fn create_test_instance_uid() -> Uuid { @@ -467,7 +468,7 @@ pub(crate) mod test { ] } - pub fn create_test_dump() -> File { + pub fn create_test_dump_writer() -> (DumpWriter, BatchWriter) { let instance_uid = create_test_instance_uid(); let dump = DumpWriter::new(Some(instance_uid)).unwrap(); @@ -489,7 +490,6 @@ pub(crate) mod test { for batch in &batches { batch_queue.push_batch(batch).unwrap(); } - batch_queue.flush().unwrap(); // ========== pushing the task queue let tasks = create_test_tasks(); @@ -523,6 +523,13 @@ pub(crate) mod test { let network = create_test_network(); dump.create_network(network).unwrap(); + (dump, batch_queue) + } + + pub fn create_test_dump() -> File { + let (dump, batch_writer) = create_test_dump_writer(); + batch_writer.flush().unwrap(); + // create the dump let mut file = tempfile::tempfile().unwrap(); dump.persist_to(&mut file).unwrap(); diff --git a/crates/dump/src/reader/mod.rs b/crates/dump/src/reader/mod.rs index da55bb4a8..f2a8ac90c 100644 --- a/crates/dump/src/reader/mod.rs +++ b/crates/dump/src/reader/mod.rs @@ -229,12 +229,52 @@ impl From for DumpIndexReader { #[cfg(test)] pub(crate) mod test { - use std::fs::File; + use std::{fs::File, io::Seek}; use meili_snap::insta; + use meilisearch_types::{batches::{Batch, BatchEnqueuedAt, BatchStats}, task_view::DetailsView, tasks::{BatchStopReason, Kind, Status}}; + use time::macros::datetime; use super::*; - use crate::reader::v6::RuntimeTogglableFeatures; + use crate::{reader::v6::RuntimeTogglableFeatures, test::create_test_dump_writer}; + + #[test] + fn import_dump_with_bad_batches() { + let (dump, mut batch_writer) = create_test_dump_writer(); + let bad_batch = Batch { + uid: 1, + progress: None, + details: DetailsView::default(), + stats: BatchStats { + total_nb_tasks: 1, + status: maplit::btreemap! { Status::Succeeded => 666 }, + types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 666 }, + index_uids: maplit::btreemap! { "doggo".to_string() => 666 }, + progress_trace: Default::default(), + write_channel_congestion: None, + internal_database_sizes: Default::default(), + }, + embedder_stats: Default::default(), + enqueued_at: Some(BatchEnqueuedAt { + earliest: datetime!(2022-11-11 0:00 UTC), + oldest: datetime!(2022-11-11 0:00 UTC), + }), + started_at: datetime!(2022-11-20 0:00 UTC), + finished_at: Some(datetime!(2022-11-21 0:00 UTC)), + stop_reason: BatchStopReason::Unspecified.to_string(), + }; + batch_writer.push_batch(&bad_batch).unwrap(); + batch_writer.flush().unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + dump.persist_to(&mut file).unwrap(); + file.rewind().unwrap(); + + let mut dump = DumpReader::open(file).unwrap(); + let read_batches = dump.batches().unwrap().map(|b| b.unwrap()).collect::>(); + + assert!(!read_batches.iter().any(|b| b.uid == 1)); + } #[test] fn import_dump_v6_with_vectors() { diff --git a/crates/dump/src/reader/v6/mod.rs b/crates/dump/src/reader/v6/mod.rs index 9bc4b33c5..645e7cb07 100644 --- a/crates/dump/src/reader/v6/mod.rs +++ b/crates/dump/src/reader/v6/mod.rs @@ -5,6 +5,7 @@ use std::path::Path; pub use meilisearch_types::milli; use meilisearch_types::milli::vector::hf::OverridePooling; +use roaring::RoaringBitmap; use tempfile::TempDir; use time::OffsetDateTime; use tracing::debug; @@ -188,11 +189,36 @@ impl V6Reader { } pub fn batches(&mut self) -> Box> + '_> { + let mut task_uids = RoaringBitmap::new(); + let mut faulty = false; + for task in self.tasks() { + let Ok((task, _)) = task else { + // If we can't read the tasks, just give up trying to filter the batches + // The database may contain phantom batches, but that's not that big of a deal + faulty = true; + break; + }; + task_uids.insert(task.uid); + } match self.batches.as_mut() { - Some(batches) => Box::new((batches).lines().map(|line| -> Result<_> { - let batch = serde_json::from_str(&line?)?; - Ok(batch) - })), + Some(batches) => Box::new( + (batches) + .lines() + .map(|line| -> Result<_> { + let batch: meilisearch_types::batches::Batch = + serde_json::from_str(&line?)?; + Ok(batch) + }) + .filter(move |batch| match batch { + Ok(batch) => { + faulty + || batch.stats.status.values().any(|t| task_uids.contains(*t)) + || batch.stats.types.values().any(|t| task_uids.contains(*t)) + || batch.stats.index_uids.values().any(|t| task_uids.contains(*t)) + } + Err(_) => true, + }), + ), None => Box::new(std::iter::empty()) as Box> + '_>, } }