Fix old databases imported from dumps for #5827

This commit is contained in:
Mubelotix
2025-08-12 15:41:10 +02:00
parent eec711d93e
commit 7eb871d671
3 changed files with 81 additions and 8 deletions

View File

@ -263,6 +263,7 @@ pub(crate) mod test {
use uuid::Uuid;
use crate::reader::Document;
use crate::writer::BatchWriter;
use crate::{DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version};
pub fn create_test_instance_uid() -> Uuid {
@ -467,7 +468,7 @@ pub(crate) mod test {
]
}
pub fn create_test_dump() -> File {
pub fn create_test_dump_writer() -> (DumpWriter, BatchWriter) {
let instance_uid = create_test_instance_uid();
let dump = DumpWriter::new(Some(instance_uid)).unwrap();
@ -489,7 +490,6 @@ pub(crate) mod test {
for batch in &batches {
batch_queue.push_batch(batch).unwrap();
}
batch_queue.flush().unwrap();
// ========== pushing the task queue
let tasks = create_test_tasks();
@ -523,6 +523,13 @@ pub(crate) mod test {
let network = create_test_network();
dump.create_network(network).unwrap();
(dump, batch_queue)
}
pub fn create_test_dump() -> File {
let (dump, batch_writer) = create_test_dump_writer();
batch_writer.flush().unwrap();
// create the dump
let mut file = tempfile::tempfile().unwrap();
dump.persist_to(&mut file).unwrap();

View File

@ -229,12 +229,52 @@ impl From<CompatIndexV5ToV6> for DumpIndexReader {
#[cfg(test)]
pub(crate) mod test {
use std::fs::File;
use std::{fs::File, io::Seek};
use meili_snap::insta;
use meilisearch_types::{batches::{Batch, BatchEnqueuedAt, BatchStats}, task_view::DetailsView, tasks::{BatchStopReason, Kind, Status}};
use time::macros::datetime;
use super::*;
use crate::reader::v6::RuntimeTogglableFeatures;
use crate::{reader::v6::RuntimeTogglableFeatures, test::create_test_dump_writer};
#[test]
fn import_dump_with_bad_batches() {
let (dump, mut batch_writer) = create_test_dump_writer();
let bad_batch = Batch {
uid: 1,
progress: None,
details: DetailsView::default(),
stats: BatchStats {
total_nb_tasks: 1,
status: maplit::btreemap! { Status::Succeeded => 666 },
types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 666 },
index_uids: maplit::btreemap! { "doggo".to_string() => 666 },
progress_trace: Default::default(),
write_channel_congestion: None,
internal_database_sizes: Default::default(),
},
embedder_stats: Default::default(),
enqueued_at: Some(BatchEnqueuedAt {
earliest: datetime!(2022-11-11 0:00 UTC),
oldest: datetime!(2022-11-11 0:00 UTC),
}),
started_at: datetime!(2022-11-20 0:00 UTC),
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
stop_reason: BatchStopReason::Unspecified.to_string(),
};
batch_writer.push_batch(&bad_batch).unwrap();
batch_writer.flush().unwrap();
let mut file = tempfile::tempfile().unwrap();
dump.persist_to(&mut file).unwrap();
file.rewind().unwrap();
let mut dump = DumpReader::open(file).unwrap();
let read_batches = dump.batches().unwrap().map(|b| b.unwrap()).collect::<Vec<_>>();
assert!(!read_batches.iter().any(|b| b.uid == 1));
}
#[test]
fn import_dump_v6_with_vectors() {

View File

@ -5,6 +5,7 @@ use std::path::Path;
pub use meilisearch_types::milli;
use meilisearch_types::milli::vector::hf::OverridePooling;
use roaring::RoaringBitmap;
use tempfile::TempDir;
use time::OffsetDateTime;
use tracing::debug;
@ -188,11 +189,36 @@ impl V6Reader {
}
pub fn batches(&mut self) -> Box<dyn Iterator<Item = Result<Batch>> + '_> {
let mut task_uids = RoaringBitmap::new();
let mut faulty = false;
for task in self.tasks() {
let Ok((task, _)) = task else {
// If we can't read the tasks, just give up trying to filter the batches
// The database may contain phantom batches, but that's not that big of a deal
faulty = true;
break;
};
task_uids.insert(task.uid);
}
match self.batches.as_mut() {
Some(batches) => Box::new((batches).lines().map(|line| -> Result<_> {
let batch = serde_json::from_str(&line?)?;
Ok(batch)
})),
Some(batches) => Box::new(
(batches)
.lines()
.map(|line| -> Result<_> {
let batch: meilisearch_types::batches::Batch =
serde_json::from_str(&line?)?;
Ok(batch)
})
.filter(move |batch| match batch {
Ok(batch) => {
faulty
|| batch.stats.status.values().any(|t| task_uids.contains(*t))
|| batch.stats.types.values().any(|t| task_uids.contains(*t))
|| batch.stats.index_uids.values().any(|t| task_uids.contains(*t))
}
Err(_) => true,
}),
),
None => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Result<Batch>> + '_>,
}
}