Merge #5310
Some checks failed
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests almost all features (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 18s
Test suite / Run tests in debug (push) Failing after 17s
Test suite / Run Rustfmt (push) Successful in 2m42s
Test suite / Run Clippy (push) Failing after 7m17s
Test suite / Tests on macos-13 (push) Has been cancelled
Test suite / Tests on windows-2022 (push) Has been cancelled

5310: Fix batch export/import dump r=Kerollmops a=irevoire

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/5304
Fixes https://github.com/meilisearch/meilisearch/issues/5247

## What does this PR do?
- Add the batches to the dump
- Update the tests
- Create a new dump test containing batches and an enqueued task with a document addition


Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
meili-bors[bot]
2025-02-11 10:21:34 +00:00
committed by GitHub
17 changed files with 593 additions and 74 deletions

View File

@@ -10,8 +10,10 @@ dump
├── instance-uid.uuid
├── keys.jsonl
├── metadata.json
── tasks
├── update_files
│ └── [task_id].jsonl
── tasks
├── update_files
│ └── [task_id].jsonl
│ └── queue.jsonl
└── batches
└── queue.jsonl
```
```

View File

@@ -228,6 +228,7 @@ pub(crate) mod test {
use big_s::S;
use maplit::{btreemap, btreeset};
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats};
use meilisearch_types::facet_values_sort::FacetValuesSort;
use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures};
use meilisearch_types::index_uid_pattern::IndexUidPattern;
@@ -235,7 +236,8 @@ pub(crate) mod test {
use meilisearch_types::milli;
use meilisearch_types::milli::update::Setting;
use meilisearch_types::settings::{Checked, FacetingSettings, Settings};
use meilisearch_types::tasks::{Details, Status};
use meilisearch_types::task_view::DetailsView;
use meilisearch_types::tasks::{Details, Kind, Status};
use serde_json::{json, Map, Value};
use time::macros::datetime;
use uuid::Uuid;
@@ -305,6 +307,30 @@ pub(crate) mod test {
settings.check()
}
pub fn create_test_batches() -> Vec<Batch> {
vec![Batch {
uid: 0,
details: DetailsView {
received_documents: Some(12),
indexed_documents: Some(Some(10)),
..DetailsView::default()
},
progress: None,
stats: BatchStats {
total_nb_tasks: 1,
status: maplit::btreemap! { Status::Succeeded => 1 },
types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 1 },
index_uids: maplit::btreemap! { "doggo".to_string() => 1 },
},
enqueued_at: Some(BatchEnqueuedAt {
earliest: datetime!(2022-11-11 0:00 UTC),
oldest: datetime!(2022-11-11 0:00 UTC),
}),
started_at: datetime!(2022-11-20 0:00 UTC),
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
}]
}
pub fn create_test_tasks() -> Vec<(TaskDump, Option<Vec<Document>>)> {
vec![
(
@@ -427,6 +453,15 @@ pub(crate) mod test {
index.flush().unwrap();
index.settings(&settings).unwrap();
// ========== pushing the batch queue
let batches = create_test_batches();
let mut batch_queue = dump.create_batches_queue().unwrap();
for batch in &batches {
batch_queue.push_batch(batch).unwrap();
}
batch_queue.flush().unwrap();
// ========== pushing the task queue
let tasks = create_test_tasks();

View File

@@ -102,6 +102,13 @@ impl DumpReader {
}
}
pub fn batches(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Batch>> + '_>> {
match self {
DumpReader::Current(current) => Ok(current.batches()),
DumpReader::Compat(_compat) => Ok(Box::new(std::iter::empty())),
}
}
pub fn keys(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Key>> + '_>> {
match self {
DumpReader::Current(current) => Ok(current.keys()),
@@ -227,6 +234,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2024-05-16 15:51:34.151044 +00:00:00");
insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None");
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@@ -348,6 +359,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2023-07-06 7:10:27.21958 +00:00:00");
insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None");
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@@ -412,6 +427,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-04 15:55:10.344982459 +00:00:00");
insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d");
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@@ -492,6 +511,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-06 12:53:49.131989609 +00:00:00");
insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d");
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@@ -569,6 +592,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-07 11:39:03.709153554 +00:00:00");
assert_eq!(dump.instance_uid().unwrap(), None);
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@@ -662,6 +689,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-09 20:27:59.904096267 +00:00:00");
assert_eq!(dump.instance_uid().unwrap(), None);
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@@ -755,6 +786,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2023-01-30 16:26:09.247261 +00:00:00");
assert_eq!(dump.instance_uid().unwrap(), None);
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@@ -831,6 +866,10 @@ pub(crate) mod test {
assert_eq!(dump.date(), None);
assert_eq!(dump.instance_uid().unwrap(), None);
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();

View File

@@ -18,6 +18,7 @@ pub type Checked = meilisearch_types::settings::Checked;
pub type Unchecked = meilisearch_types::settings::Unchecked;
pub type Task = crate::TaskDump;
pub type Batch = meilisearch_types::batches::Batch;
pub type Key = meilisearch_types::keys::Key;
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
pub type Network = meilisearch_types::features::Network;
@@ -49,6 +50,7 @@ pub struct V6Reader {
instance_uid: Option<Uuid>,
metadata: Metadata,
tasks: BufReader<File>,
batches: Option<BufReader<File>>,
keys: BufReader<File>,
features: Option<RuntimeTogglableFeatures>,
network: Option<Network>,
@@ -79,6 +81,12 @@ impl V6Reader {
} else {
None
};
let batches = match File::open(dump.path().join("batches").join("queue.jsonl")) {
Ok(file) => Some(BufReader::new(file)),
// The batch file was only introduced during the v1.13, anything prior to that won't have batches
Err(err) if err.kind() == ErrorKind::NotFound => None,
Err(e) => return Err(e.into()),
};
let network_file = match fs::read(dump.path().join("network.json")) {
Ok(network_file) => Some(network_file),
@@ -101,6 +109,7 @@ impl V6Reader {
metadata: serde_json::from_reader(&*meta_file)?,
instance_uid,
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
batches,
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
features,
network,
@@ -144,7 +153,7 @@ impl V6Reader {
&mut self,
) -> Box<dyn Iterator<Item = Result<(Task, Option<Box<super::UpdateFile>>)>> + '_> {
Box::new((&mut self.tasks).lines().map(|line| -> Result<_> {
let task: Task = serde_json::from_str(&line?).unwrap();
let task: Task = serde_json::from_str(&line?)?;
let update_file_path = self
.dump
@@ -156,8 +165,7 @@ impl V6Reader {
if update_file_path.exists() {
Ok((
task,
Some(Box::new(UpdateFile::new(&update_file_path).unwrap())
as Box<super::UpdateFile>),
Some(Box::new(UpdateFile::new(&update_file_path)?) as Box<super::UpdateFile>),
))
} else {
Ok((task, None))
@@ -165,6 +173,16 @@ impl V6Reader {
}))
}
pub fn batches(&mut self) -> Box<dyn Iterator<Item = Result<Batch>> + '_> {
match self.batches.as_mut() {
Some(batches) => Box::new((batches).lines().map(|line| -> Result<_> {
let batch = serde_json::from_str(&line?)?;
Ok(batch)
})),
None => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Result<Batch>> + '_>,
}
}
pub fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Key>> + '_> {
Box::new(
(&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),

View File

@@ -4,6 +4,7 @@ use std::path::PathBuf;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::batches::Batch;
use meilisearch_types::features::{Network, RuntimeTogglableFeatures};
use meilisearch_types::keys::Key;
use meilisearch_types::settings::{Checked, Settings};
@@ -54,6 +55,10 @@ impl DumpWriter {
TaskWriter::new(self.dir.path().join("tasks"))
}
pub fn create_batches_queue(&self) -> Result<BatchWriter> {
BatchWriter::new(self.dir.path().join("batches"))
}
pub fn create_experimental_features(&self, features: RuntimeTogglableFeatures) -> Result<()> {
Ok(std::fs::write(
self.dir.path().join("experimental-features.json"),
@@ -88,7 +93,7 @@ impl KeyWriter {
}
pub fn push_key(&mut self, key: &Key) -> Result<()> {
self.keys.write_all(&serde_json::to_vec(key)?)?;
serde_json::to_writer(&mut self.keys, &key)?;
self.keys.write_all(b"\n")?;
Ok(())
}
@@ -118,7 +123,7 @@ impl TaskWriter {
/// Pushes tasks in the dump.
/// If the tasks has an associated `update_file` it'll use the `task_id` as its name.
pub fn push_task(&mut self, task: &TaskDump) -> Result<UpdateFile> {
self.queue.write_all(&serde_json::to_vec(task)?)?;
serde_json::to_writer(&mut self.queue, &task)?;
self.queue.write_all(b"\n")?;
Ok(UpdateFile::new(self.update_files.join(format!("{}.jsonl", task.uid))))
@@ -130,6 +135,30 @@ impl TaskWriter {
}
}
pub struct BatchWriter {
queue: BufWriter<File>,
}
impl BatchWriter {
pub(crate) fn new(path: PathBuf) -> Result<Self> {
std::fs::create_dir(&path)?;
let queue = File::create(path.join("queue.jsonl"))?;
Ok(BatchWriter { queue: BufWriter::new(queue) })
}
/// Pushes batches in the dump.
pub fn push_batch(&mut self, batch: &Batch) -> Result<()> {
serde_json::to_writer(&mut self.queue, &batch)?;
self.queue.write_all(b"\n")?;
Ok(())
}
pub fn flush(mut self) -> Result<()> {
self.queue.flush()?;
Ok(())
}
}
pub struct UpdateFile {
path: PathBuf,
writer: Option<BufWriter<File>>,
@@ -141,8 +170,8 @@ impl UpdateFile {
}
pub fn push_document(&mut self, document: &Document) -> Result<()> {
if let Some(writer) = self.writer.as_mut() {
writer.write_all(&serde_json::to_vec(document)?)?;
if let Some(mut writer) = self.writer.as_mut() {
serde_json::to_writer(&mut writer, &document)?;
writer.write_all(b"\n")?;
} else {
let file = File::create(&self.path).unwrap();
@@ -209,8 +238,8 @@ pub(crate) mod test {
use super::*;
use crate::reader::Document;
use crate::test::{
create_test_api_keys, create_test_documents, create_test_dump, create_test_instance_uid,
create_test_settings, create_test_tasks,
create_test_api_keys, create_test_batches, create_test_documents, create_test_dump,
create_test_instance_uid, create_test_settings, create_test_tasks,
};
fn create_directory_hierarchy(dir: &Path) -> String {
@@ -285,8 +314,10 @@ pub(crate) mod test {
let dump_path = dump.path();
// ==== checking global file hierarchy (we want to be sure there isn't too many files or too few)
insta::assert_snapshot!(create_directory_hierarchy(dump_path), @r###"
insta::assert_snapshot!(create_directory_hierarchy(dump_path), @r"
.
├---- batches/
│ └---- queue.jsonl
├---- indexes/
│ └---- doggos/
│ │ ├---- documents.jsonl
@@ -301,7 +332,7 @@ pub(crate) mod test {
├---- keys.jsonl
├---- metadata.json
└---- network.json
"###);
");
// ==== checking the top level infos
let metadata = fs::read_to_string(dump_path.join("metadata.json")).unwrap();
@@ -354,6 +385,16 @@ pub(crate) mod test {
}
}
// ==== checking the batch queue
let batches_queue = fs::read_to_string(dump_path.join("batches/queue.jsonl")).unwrap();
for (batch, expected) in batches_queue.lines().zip(create_test_batches()) {
let mut batch = serde_json::from_str::<Batch>(batch).unwrap();
if batch.details.settings == Some(Box::new(Settings::<Unchecked>::default())) {
batch.details.settings = None;
}
assert_eq!(batch, expected, "{batch:#?}{expected:#?}");
}
// ==== checking the keys
let keys = fs::read_to_string(dump_path.join("keys.jsonl")).unwrap();
for (key, expected) in keys.lines().zip(create_test_api_keys()) {

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::io;
use dump::{KindDump, TaskDump, UpdateFile};
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::heed::RwTxn;
use meilisearch_types::milli;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
@@ -14,9 +15,15 @@ pub struct Dump<'a> {
index_scheduler: &'a IndexScheduler,
wtxn: RwTxn<'a>,
batch_to_task_mapping: HashMap<BatchId, RoaringBitmap>,
indexes: HashMap<String, RoaringBitmap>,
statuses: HashMap<Status, RoaringBitmap>,
kinds: HashMap<Kind, RoaringBitmap>,
batch_indexes: HashMap<String, RoaringBitmap>,
batch_statuses: HashMap<Status, RoaringBitmap>,
batch_kinds: HashMap<Kind, RoaringBitmap>,
}
impl<'a> Dump<'a> {
@@ -27,12 +34,72 @@ impl<'a> Dump<'a> {
Ok(Dump {
index_scheduler,
wtxn,
batch_to_task_mapping: HashMap::new(),
indexes: HashMap::new(),
statuses: HashMap::new(),
kinds: HashMap::new(),
batch_indexes: HashMap::new(),
batch_statuses: HashMap::new(),
batch_kinds: HashMap::new(),
})
}
/// Register a new batch coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_batch(&mut self, batch: Batch) -> Result<()> {
self.index_scheduler.queue.batches.all_batches.put(&mut self.wtxn, &batch.uid, &batch)?;
if let Some(enqueued_at) = batch.enqueued_at {
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.queue.batches.enqueued_at,
enqueued_at.earliest,
batch.uid,
)?;
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.queue.batches.enqueued_at,
enqueued_at.oldest,
batch.uid,
)?;
}
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.queue.batches.started_at,
batch.started_at,
batch.uid,
)?;
if let Some(finished_at) = batch.finished_at {
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.queue.batches.finished_at,
finished_at,
batch.uid,
)?;
}
for index in batch.stats.index_uids.keys() {
match self.batch_indexes.get_mut(index) {
Some(bitmap) => {
bitmap.insert(batch.uid);
}
None => {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(batch.uid);
self.batch_indexes.insert(index.to_string(), bitmap);
}
};
}
for status in batch.stats.status.keys() {
self.batch_statuses.entry(*status).or_default().insert(batch.uid);
}
for kind in batch.stats.types.keys() {
self.batch_kinds.entry(*kind).or_default().insert(batch.uid);
}
Ok(())
}
/// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_task(
@@ -149,6 +216,9 @@ impl<'a> Dump<'a> {
};
self.index_scheduler.queue.tasks.all_tasks.put(&mut self.wtxn, &task.uid, &task)?;
if let Some(batch_id) = task.batch_uid {
self.batch_to_task_mapping.entry(batch_id).or_default().insert(task.uid);
}
for index in task.indexes() {
match self.indexes.get_mut(index) {
@@ -198,6 +268,14 @@ impl<'a> Dump<'a> {
/// Commit all the changes and exit the importing dump state
pub fn finish(mut self) -> Result<()> {
for (batch_id, task_ids) in self.batch_to_task_mapping {
self.index_scheduler.queue.batch_to_tasks_mapping.put(
&mut self.wtxn,
&batch_id,
&task_ids,
)?;
}
for (index, bitmap) in self.indexes {
self.index_scheduler.queue.tasks.index_tasks.put(&mut self.wtxn, &index, &bitmap)?;
}
@@ -208,6 +286,16 @@ impl<'a> Dump<'a> {
self.index_scheduler.queue.tasks.put_kind(&mut self.wtxn, kind, &bitmap)?;
}
for (index, bitmap) in self.batch_indexes {
self.index_scheduler.queue.batches.index_tasks.put(&mut self.wtxn, &index, &bitmap)?;
}
for (status, bitmap) in self.batch_statuses {
self.index_scheduler.queue.batches.put_status(&mut self.wtxn, status, &bitmap)?;
}
for (kind, bitmap) in self.batch_kinds {
self.index_scheduler.queue.batches.put_kind(&mut self.wtxn, kind, &bitmap)?;
}
self.wtxn.commit()?;
self.index_scheduler.scheduler.wake_up.signal();

View File

@@ -96,6 +96,7 @@ make_enum_progress! {
StartTheDumpCreation,
DumpTheApiKeys,
DumpTheTasks,
DumpTheBatches,
DumpTheIndexes,
DumpTheExperimentalFeatures,
CompressTheDump,

View File

@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::fs::File;
use std::io::BufWriter;
use std::sync::atomic::Ordering;
@@ -11,7 +12,9 @@ use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use time::macros::format_description;
use time::OffsetDateTime;
use crate::processing::{AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress};
use crate::processing::{
AtomicBatchStep, AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress,
};
use crate::{Error, IndexScheduler, Result};
impl IndexScheduler {
@@ -102,7 +105,40 @@ impl IndexScheduler {
}
dump_tasks.flush()?;
// 3. Dump the indexes
// 3. dump the batches
progress.update_progress(DumpCreationProgress::DumpTheBatches);
let mut dump_batches = dump.create_batches_queue()?;
let (atomic_batch_progress, update_batch_progress) =
AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u32);
progress.update_progress(update_batch_progress);
for ret in self.queue.batches.all_batches.iter(&rtxn)? {
if self.scheduler.must_stop_processing.get() {
return Err(Error::AbortedTask);
}
let (_, mut b) = ret?;
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if b.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
let mut statuses = BTreeMap::new();
statuses.insert(Status::Succeeded, b.stats.total_nb_tasks);
b.stats.status = statuses;
b.finished_at = Some(finished_at);
}
dump_batches.push_batch(&b)?;
atomic_batch_progress.fetch_add(1, Ordering::Relaxed);
}
dump_batches.flush()?;
// 4. Dump the indexes
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;
let mut count = 0;
@@ -142,7 +178,7 @@ impl IndexScheduler {
let documents = index
.all_documents(&rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
// 3.1. Dump the documents
// 4.1. Dump the documents
for ret in documents {
if self.scheduler.must_stop_processing.get() {
return Err(Error::AbortedTask);
@@ -204,7 +240,7 @@ impl IndexScheduler {
atomic.fetch_add(1, Ordering::Relaxed);
}
// 3.2. Dump the settings
// 4.2. Dump the settings
let settings = meilisearch_types::settings::settings(
index,
&rtxn,
@@ -215,7 +251,7 @@ impl IndexScheduler {
Ok(())
})?;
// 4. Dump experimental feature settings
// 5. Dump experimental feature settings
progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures);
let features = self.features().runtime_features();
dump.create_experimental_features(features)?;

View File

@@ -30,7 +30,21 @@ pub struct Batch {
pub enqueued_at: Option<BatchEnqueuedAt>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
impl PartialEq for Batch {
fn eq(&self, other: &Self) -> bool {
let Self { uid, progress, details, stats, started_at, finished_at, enqueued_at } = self;
*uid == other.uid
&& progress.is_none() == other.progress.is_none()
&& details == &other.details
&& stats == &other.stats
&& started_at == &other.started_at
&& finished_at == &other.finished_at
&& enqueued_at == &other.enqueued_at
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BatchEnqueuedAt {
#[serde(with = "time::serde::rfc3339")]
pub earliest: OffsetDateTime,
@@ -38,7 +52,7 @@ pub struct BatchEnqueuedAt {
pub oldest: OffsetDateTime,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize, ToSchema)]
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct BatchStats {

View File

@@ -571,9 +571,15 @@ fn import_dump(
index_scheduler.refresh_index_stats(&uid)?;
}
// 5. Import the queue
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
// 5.1. Import the batches
for ret in dump_reader.batches()? {
let batch = ret?;
index_scheduler_dump.register_dumped_batch(batch)?;
}
// 5. Import the tasks.
// 5.2. Import the tasks
for ret in dump_reader.tasks()? {
let (task, file) = ret?;
index_scheduler_dump.register_dumped_task(task, file)?;

View File

@@ -163,6 +163,10 @@ impl Server<Owned> {
self.service.get("/tasks").await
}
pub async fn batches(&self) -> (Value, StatusCode) {
self.service.get("/batches").await
}
pub async fn set_features(&self, value: Value) -> (Value, StatusCode) {
self.service.patch("/experimental-features", value).await
}

View File

@@ -22,6 +22,7 @@ pub enum GetDump {
TestV5,
TestV6WithExperimental,
TestV6WithBatchesAndEnqueuedTasks,
}
impl GetDump {
@@ -74,6 +75,10 @@ impl GetDump {
"tests/assets/v6_v1.6.0_use_deactivated_experimental_setting.dump"
)
.into(),
GetDump::TestV6WithBatchesAndEnqueuedTasks => {
exist_relative_path!("tests/assets/v6_v1.13.0_batches_and_enqueued_tasks.dump")
.into()
}
}
}
}

View File

@@ -1994,6 +1994,63 @@ async fn import_dump_v6_containing_experimental_features() {
.await;
}
#[actix_rt::test]
async fn import_dump_v6_containing_batches_and_enqueued_tasks() {
let temp = tempfile::tempdir().unwrap();
let options = Opt {
import_dump: Some(GetDump::TestV6WithBatchesAndEnqueuedTasks.path()),
..default_settings(temp.path())
};
let mut server = Server::new_auth_with_options(options, temp).await;
server.use_api_key("MASTER_KEY");
server.wait_task(2).await.succeeded();
let (tasks, _) = server.tasks().await;
snapshot!(json_string!(tasks, { ".results[1].startedAt" => "[date]", ".results[1].finishedAt" => "[date]", ".results[1].duration" => "[date]" }), name: "tasks");
let (batches, _) = server.batches().await;
snapshot!(json_string!(batches, { ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].duration" => "[date]" }), name: "batches");
let (indexes, code) = server.list_indexes(None, None).await;
assert_eq!(code, 200, "{indexes}");
assert_eq!(indexes["results"].as_array().unwrap().len(), 1);
assert_eq!(indexes["results"][0]["uid"], json!("kefir"));
assert_eq!(indexes["results"][0]["primaryKey"], json!("id"));
let (response, code) = server.get_features().await;
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"metrics": false,
"logsRoute": false,
"editDocumentsByFunction": false,
"containsFilter": false,
"network": false,
"getTaskDocumentsRoute": false
}
"###);
let index = server.index("kefir");
let (documents, _) = index.get_all_documents_raw("").await;
snapshot!(documents, @r#"
{
"results": [
{
"id": 1,
"dog": "kefir"
},
{
"id": 2,
"dog": "intel"
}
],
"offset": 0,
"limit": 20,
"total": 2
}
"#);
}
// In this test we must generate the dump ourselves to ensure the
// `user provided` vectors are well set
#[actix_rt::test]

View File

@@ -0,0 +1,78 @@
---
source: crates/meilisearch/tests/dumps/mod.rs
snapshot_kind: text
---
{
"results": [
{
"uid": 2,
"progress": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"stats": {
"totalNbTasks": 1,
"status": {
"succeeded": 1
},
"types": {
"documentAdditionOrUpdate": 1
},
"indexUids": {
"kefir": 1
}
},
"duration": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
},
{
"uid": 1,
"progress": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"stats": {
"totalNbTasks": 1,
"status": {
"succeeded": 1
},
"types": {
"documentAdditionOrUpdate": 1
},
"indexUids": {
"kefir": 1
}
},
"duration": "PT0.144827890S",
"startedAt": "2025-02-04T10:15:21.275640274Z",
"finishedAt": "2025-02-04T10:15:21.420468164Z"
},
{
"uid": 0,
"progress": null,
"details": {},
"stats": {
"totalNbTasks": 1,
"status": {
"succeeded": 1
},
"types": {
"indexCreation": 1
},
"indexUids": {
"kefir": 1
}
},
"duration": "PT0.032902186S",
"startedAt": "2025-02-04T10:14:43.559526162Z",
"finishedAt": "2025-02-04T10:14:43.592428348Z"
}
],
"total": 3,
"limit": 20,
"from": 2,
"next": null
}

View File

@@ -0,0 +1,78 @@
---
source: crates/meilisearch/tests/dumps/mod.rs
snapshot_kind: text
---
{
"results": [
{
"uid": 3,
"batchUid": null,
"indexUid": null,
"status": "succeeded",
"type": "dumpCreation",
"canceledBy": null,
"details": {
"dumpUid": null
},
"error": null,
"duration": "PT0.000629059S",
"enqueuedAt": "2025-02-04T10:22:31.318175268Z",
"startedAt": "2025-02-04T10:22:31.331701375Z",
"finishedAt": "2025-02-04T10:22:31.332330434Z"
},
{
"uid": 2,
"batchUid": 2,
"indexUid": "kefir",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[date]",
"enqueuedAt": "2025-02-04T10:15:49.212484063Z",
"startedAt": "[date]",
"finishedAt": "[date]"
},
{
"uid": 1,
"batchUid": null,
"indexUid": "kefir",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "PT0.144827890S",
"enqueuedAt": "2025-02-04T10:15:21.258630973Z",
"startedAt": "2025-02-04T10:15:21.275640274Z",
"finishedAt": "2025-02-04T10:15:21.420468164Z"
},
{
"uid": 0,
"batchUid": null,
"indexUid": "kefir",
"status": "succeeded",
"type": "indexCreation",
"canceledBy": null,
"details": {
"primaryKey": null
},
"error": null,
"duration": "PT0.032902186S",
"enqueuedAt": "2025-02-04T10:14:43.550379968Z",
"startedAt": "2025-02-04T10:14:43.559526162Z",
"finishedAt": "2025-02-04T10:14:43.592428348Z"
}
],
"total": 4,
"limit": 20,
"from": 3,
"next": null
}

View File

@@ -8,6 +8,7 @@ use clap::{Parser, Subcommand};
use dump::{DumpWriter, IndexMetadata};
use file_store::FileStore;
use meilisearch_auth::AuthController;
use meilisearch_types::batches::Batch;
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
@@ -279,70 +280,86 @@ fn export_a_dump(
eprintln!("Successfully dumped {count} keys!");
eprintln!("Dumping the queue");
let rtxn = env.read_txn()?;
let all_tasks: Database<BEU32, SerdeJson<Task>> =
try_opening_database(&env, &rtxn, "all-tasks")?;
let all_batches: Database<BEU32, SerdeJson<Batch>> =
try_opening_database(&env, &rtxn, "all-batches")?;
let index_mapping: Database<Str, UuidCodec> =
try_opening_database(&env, &rtxn, "index-mapping")?;
if skip_enqueued_tasks {
eprintln!("Skip dumping the enqueued tasks...");
} else {
let mut dump_tasks = dump.create_tasks_queue()?;
let mut count = 0;
for ret in all_tasks.iter(&rtxn)? {
let (_, t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
eprintln!("Dumping the tasks");
let mut dump_tasks = dump.create_tasks_queue()?;
let mut count_tasks = 0;
let mut count_enqueued_tasks = 0;
for ret in all_tasks.iter(&rtxn)? {
let (_, t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
if status == Status::Enqueued && skip_enqueued_tasks {
continue;
}
// 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file_uuid) = content_file {
if status == Status::Enqueued {
let content_file = file_store.get_update(content_file_uuid)?;
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
if (detected_version.0, detected_version.1, detected_version.2) < (1, 12, 0) {
eprintln!("Dumping the enqueued tasks reading them in obkv format...");
let reader =
DocumentsBatchReader::from_reader(content_file).with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().with_context(|| {
format!("While iterating on content file {:?}", content_file_uuid)
})? {
dump_content_file
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
}
} else {
eprintln!(
"Dumping the enqueued tasks reading them in JSON stream format..."
);
for document in
serde_json::de::Deserializer::from_reader(content_file).into_iter()
{
let document = document.with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
dump_content_file.push_document(&document)?;
}
// 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file_uuid) = content_file {
if status == Status::Enqueued {
let content_file = file_store.get_update(content_file_uuid)?;
if (detected_version.0, detected_version.1, detected_version.2) < (1, 12, 0) {
eprintln!("Dumping the enqueued tasks reading them in obkv format...");
let reader =
DocumentsBatchReader::from_reader(content_file).with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().with_context(|| {
format!("While iterating on content file {:?}", content_file_uuid)
})? {
dump_content_file
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
}
} else {
eprintln!("Dumping the enqueued tasks reading them in JSON stream format...");
for document in
serde_json::de::Deserializer::from_reader(content_file).into_iter()
{
let document = document.with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
dump_content_file.push_document(&document)?;
}
dump_content_file.flush()?;
count += 1;
}
dump_content_file.flush()?;
count_enqueued_tasks += 1;
}
}
dump_tasks.flush()?;
eprintln!("Successfully dumped {count} enqueued tasks!");
count_tasks += 1;
}
dump_tasks.flush()?;
eprintln!(
"Successfully dumped {count_tasks} tasks including {count_enqueued_tasks} enqueued tasks!"
);
// 4. dump the batches
eprintln!("Dumping the batches");
let mut dump_batches = dump.create_batches_queue()?;
let mut count = 0;
for ret in all_batches.iter(&rtxn)? {
let (_, b) = ret?;
dump_batches.push_batch(&b)?;
count += 1;
}
dump_batches.flush()?;
eprintln!("Successfully dumped {count} batches!");
// 5. Dump the indexes
eprintln!("Dumping the indexes...");
// 4. Dump the indexes
let mut count = 0;
for result in index_mapping.iter(&rtxn)? {
let (uid, uuid) = result?;
@@ -363,14 +380,14 @@ fn export_a_dump(
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 4.1. Dump the documents
// 5.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 4.2. Dump the settings
// 5.2. Dump the settings
let settings = meilisearch_types::settings::settings(
&index,
&rtxn,