mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 21:16:28 +00:00 
			
		
		
		
	fix the missing batch in the dumps in meilisearch and meilitools
This commit is contained in:
		| @@ -2,6 +2,7 @@ use std::collections::HashMap; | ||||
| use std::io; | ||||
|  | ||||
| use dump::{KindDump, TaskDump, UpdateFile}; | ||||
| use meilisearch_types::batches::{Batch, BatchId}; | ||||
| use meilisearch_types::heed::RwTxn; | ||||
| use meilisearch_types::milli; | ||||
| use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; | ||||
| @@ -14,9 +15,15 @@ pub struct Dump<'a> { | ||||
|     index_scheduler: &'a IndexScheduler, | ||||
|     wtxn: RwTxn<'a>, | ||||
|  | ||||
|     batch_to_task_mapping: HashMap<BatchId, RoaringBitmap>, | ||||
|  | ||||
|     indexes: HashMap<String, RoaringBitmap>, | ||||
|     statuses: HashMap<Status, RoaringBitmap>, | ||||
|     kinds: HashMap<Kind, RoaringBitmap>, | ||||
|  | ||||
|     batch_indexes: HashMap<String, RoaringBitmap>, | ||||
|     batch_statuses: HashMap<Status, RoaringBitmap>, | ||||
|     batch_kinds: HashMap<Kind, RoaringBitmap>, | ||||
| } | ||||
|  | ||||
| impl<'a> Dump<'a> { | ||||
| @@ -27,12 +34,72 @@ impl<'a> Dump<'a> { | ||||
|         Ok(Dump { | ||||
|             index_scheduler, | ||||
|             wtxn, | ||||
|             batch_to_task_mapping: HashMap::new(), | ||||
|             indexes: HashMap::new(), | ||||
|             statuses: HashMap::new(), | ||||
|             kinds: HashMap::new(), | ||||
|             batch_indexes: HashMap::new(), | ||||
|             batch_statuses: HashMap::new(), | ||||
|             batch_kinds: HashMap::new(), | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     /// Register a new batch coming from a dump in the scheduler. | ||||
|     /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. | ||||
|     pub fn register_dumped_batch(&mut self, batch: Batch) -> Result<()> { | ||||
|         self.index_scheduler.queue.batches.all_batches.put(&mut self.wtxn, &batch.uid, &batch)?; | ||||
|         if let Some(enqueued_at) = batch.enqueued_at { | ||||
|             utils::insert_task_datetime( | ||||
|                 &mut self.wtxn, | ||||
|                 self.index_scheduler.queue.batches.enqueued_at, | ||||
|                 enqueued_at.earliest, | ||||
|                 batch.uid, | ||||
|             )?; | ||||
|             utils::insert_task_datetime( | ||||
|                 &mut self.wtxn, | ||||
|                 self.index_scheduler.queue.batches.enqueued_at, | ||||
|                 enqueued_at.oldest, | ||||
|                 batch.uid, | ||||
|             )?; | ||||
|         } | ||||
|         utils::insert_task_datetime( | ||||
|             &mut self.wtxn, | ||||
|             self.index_scheduler.queue.batches.started_at, | ||||
|             batch.started_at, | ||||
|             batch.uid, | ||||
|         )?; | ||||
|         if let Some(finished_at) = batch.finished_at { | ||||
|             utils::insert_task_datetime( | ||||
|                 &mut self.wtxn, | ||||
|                 self.index_scheduler.queue.batches.finished_at, | ||||
|                 finished_at, | ||||
|                 batch.uid, | ||||
|             )?; | ||||
|         } | ||||
|  | ||||
|         for index in batch.stats.index_uids.keys() { | ||||
|             match self.batch_indexes.get_mut(index) { | ||||
|                 Some(bitmap) => { | ||||
|                     bitmap.insert(batch.uid); | ||||
|                 } | ||||
|                 None => { | ||||
|                     let mut bitmap = RoaringBitmap::new(); | ||||
|                     bitmap.insert(batch.uid); | ||||
|                     self.batch_indexes.insert(index.to_string(), bitmap); | ||||
|                 } | ||||
|             }; | ||||
|         } | ||||
|  | ||||
|         for status in batch.stats.status.keys() { | ||||
|             self.batch_statuses.entry(*status).or_default().insert(batch.uid); | ||||
|         } | ||||
|         for kind in batch.stats.types.keys() { | ||||
|             self.batch_kinds.entry(*kind).or_default().insert(batch.uid); | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     /// Register a new task coming from a dump in the scheduler. | ||||
|     /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. | ||||
|     pub fn register_dumped_task( | ||||
| @@ -149,6 +216,9 @@ impl<'a> Dump<'a> { | ||||
|         }; | ||||
|  | ||||
|         self.index_scheduler.queue.tasks.all_tasks.put(&mut self.wtxn, &task.uid, &task)?; | ||||
|         if let Some(batch_id) = task.batch_uid { | ||||
|             self.batch_to_task_mapping.entry(batch_id).or_default().insert(task.uid); | ||||
|         } | ||||
|  | ||||
|         for index in task.indexes() { | ||||
|             match self.indexes.get_mut(index) { | ||||
| @@ -198,6 +268,14 @@ impl<'a> Dump<'a> { | ||||
|  | ||||
|     /// Commit all the changes and exit the importing dump state | ||||
|     pub fn finish(mut self) -> Result<()> { | ||||
|         for (batch_id, task_ids) in self.batch_to_task_mapping { | ||||
|             self.index_scheduler.queue.batch_to_tasks_mapping.put( | ||||
|                 &mut self.wtxn, | ||||
|                 &batch_id, | ||||
|                 &task_ids, | ||||
|             )?; | ||||
|         } | ||||
|  | ||||
|         for (index, bitmap) in self.indexes { | ||||
|             self.index_scheduler.queue.tasks.index_tasks.put(&mut self.wtxn, &index, &bitmap)?; | ||||
|         } | ||||
| @@ -208,6 +286,16 @@ impl<'a> Dump<'a> { | ||||
|             self.index_scheduler.queue.tasks.put_kind(&mut self.wtxn, kind, &bitmap)?; | ||||
|         } | ||||
|  | ||||
|         for (index, bitmap) in self.batch_indexes { | ||||
|             self.index_scheduler.queue.batches.index_tasks.put(&mut self.wtxn, &index, &bitmap)?; | ||||
|         } | ||||
|         for (status, bitmap) in self.batch_statuses { | ||||
|             self.index_scheduler.queue.batches.put_status(&mut self.wtxn, status, &bitmap)?; | ||||
|         } | ||||
|         for (kind, bitmap) in self.batch_kinds { | ||||
|             self.index_scheduler.queue.batches.put_kind(&mut self.wtxn, kind, &bitmap)?; | ||||
|         } | ||||
|  | ||||
|         self.wtxn.commit()?; | ||||
|         self.index_scheduler.scheduler.wake_up.signal(); | ||||
|  | ||||
|   | ||||
| @@ -96,6 +96,7 @@ make_enum_progress! { | ||||
|         StartTheDumpCreation, | ||||
|         DumpTheApiKeys, | ||||
|         DumpTheTasks, | ||||
|         DumpTheBatches, | ||||
|         DumpTheIndexes, | ||||
|         DumpTheExperimentalFeatures, | ||||
|         CompressTheDump, | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| use std::collections::BTreeMap; | ||||
| use std::fs::File; | ||||
| use std::io::BufWriter; | ||||
| use std::sync::atomic::Ordering; | ||||
| @@ -11,7 +12,9 @@ use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; | ||||
| use time::macros::format_description; | ||||
| use time::OffsetDateTime; | ||||
|  | ||||
| use crate::processing::{AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress}; | ||||
| use crate::processing::{ | ||||
|     AtomicBatchStep, AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress, | ||||
| }; | ||||
| use crate::{Error, IndexScheduler, Result}; | ||||
|  | ||||
| impl IndexScheduler { | ||||
| @@ -102,7 +105,40 @@ impl IndexScheduler { | ||||
|         } | ||||
|         dump_tasks.flush()?; | ||||
|  | ||||
|         // 3. Dump the indexes | ||||
|         // 3. dump the batches | ||||
|         progress.update_progress(DumpCreationProgress::DumpTheBatches); | ||||
|         let mut dump_batches = dump.create_batches_queue()?; | ||||
|  | ||||
|         let (atomic, update_batch_progress) = | ||||
|             AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u32); | ||||
|         progress.update_progress(update_batch_progress); | ||||
|  | ||||
|         for ret in self.queue.batches.all_batches.iter(&rtxn)? { | ||||
|             if self.scheduler.must_stop_processing.get() { | ||||
|                 return Err(Error::AbortedTask); | ||||
|             } | ||||
|  | ||||
|             let (_, mut b) = ret?; | ||||
|             // In the case we're dumping ourselves we want to be marked as finished | ||||
|             // to not loop over ourselves indefinitely. | ||||
|             if b.uid == task.uid { | ||||
|                 let finished_at = OffsetDateTime::now_utc(); | ||||
|  | ||||
|                 // We're going to fake the date because we don't know if everything is going to go well. | ||||
|                 // But we need to dump the task as finished and successful. | ||||
|                 // If something fail everything will be set appropriately in the end. | ||||
|                 let mut statuses = BTreeMap::new(); | ||||
|                 statuses.insert(Status::Succeeded, b.stats.total_nb_tasks); | ||||
|                 b.stats.status = statuses; | ||||
|                 b.finished_at = Some(finished_at); | ||||
|             } | ||||
|  | ||||
|             dump_batches.push_batch(&b)?; | ||||
|             atomic.fetch_add(1, Ordering::Relaxed); | ||||
|         } | ||||
|         dump_batches.flush()?; | ||||
|  | ||||
|         // 4. Dump the indexes | ||||
|         progress.update_progress(DumpCreationProgress::DumpTheIndexes); | ||||
|         let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; | ||||
|         let mut count = 0; | ||||
| @@ -142,7 +178,7 @@ impl IndexScheduler { | ||||
|             let documents = index | ||||
|                 .all_documents(&rtxn) | ||||
|                 .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; | ||||
|             // 3.1. Dump the documents | ||||
|             // 4.1. Dump the documents | ||||
|             for ret in documents { | ||||
|                 if self.scheduler.must_stop_processing.get() { | ||||
|                     return Err(Error::AbortedTask); | ||||
| @@ -204,7 +240,7 @@ impl IndexScheduler { | ||||
|                 atomic.fetch_add(1, Ordering::Relaxed); | ||||
|             } | ||||
|  | ||||
|             // 3.2. Dump the settings | ||||
|             // 4.2. Dump the settings | ||||
|             let settings = meilisearch_types::settings::settings( | ||||
|                 index, | ||||
|                 &rtxn, | ||||
| @@ -215,7 +251,7 @@ impl IndexScheduler { | ||||
|             Ok(()) | ||||
|         })?; | ||||
|  | ||||
|         // 4. Dump experimental feature settings | ||||
|         // 5. Dump experimental feature settings | ||||
|         progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); | ||||
|         let features = self.features().runtime_features(); | ||||
|         dump.create_experimental_features(features)?; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user