mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	makes the dump cancellable
This commit is contained in:
		| @@ -825,6 +825,10 @@ impl IndexScheduler { | |||||||
|                 // 2. dump the tasks |                 // 2. dump the tasks | ||||||
|                 let mut dump_tasks = dump.create_tasks_queue()?; |                 let mut dump_tasks = dump.create_tasks_queue()?; | ||||||
|                 for ret in self.all_tasks.iter(&rtxn)? { |                 for ret in self.all_tasks.iter(&rtxn)? { | ||||||
|  |                     if self.must_stop_processing.get() { | ||||||
|  |                         return Err(Error::AbortedTask); | ||||||
|  |                     } | ||||||
|  |  | ||||||
|                     let (_, mut t) = ret?; |                     let (_, mut t) = ret?; | ||||||
|                     let status = t.status; |                     let status = t.status; | ||||||
|                     let content_file = t.content_uuid(); |                     let content_file = t.content_uuid(); | ||||||
| @@ -845,6 +849,9 @@ impl IndexScheduler { | |||||||
|  |  | ||||||
|                     // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. |                     // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. | ||||||
|                     if let Some(content_file) = content_file { |                     if let Some(content_file) = content_file { | ||||||
|  |                         if self.must_stop_processing.get() { | ||||||
|  |                             return Err(Error::AbortedTask); | ||||||
|  |                         } | ||||||
|                         if status == Status::Enqueued { |                         if status == Status::Enqueued { | ||||||
|                             let content_file = self.file_store.get_update(content_file)?; |                             let content_file = self.file_store.get_update(content_file)?; | ||||||
|  |  | ||||||
| @@ -884,6 +891,9 @@ impl IndexScheduler { | |||||||
|  |  | ||||||
|                     // 3.1. Dump the documents |                     // 3.1. Dump the documents | ||||||
|                     for ret in index.all_documents(&rtxn)? { |                     for ret in index.all_documents(&rtxn)? { | ||||||
|  |                         if self.must_stop_processing.get() { | ||||||
|  |                             return Err(Error::AbortedTask); | ||||||
|  |                         } | ||||||
|                         let (_id, doc) = ret?; |                         let (_id, doc) = ret?; | ||||||
|                         let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; |                         let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; | ||||||
|                         index_dumper.push_document(&document)?; |                         index_dumper.push_document(&document)?; | ||||||
| @@ -903,6 +913,9 @@ impl IndexScheduler { | |||||||
|                     "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" |                     "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" | ||||||
|                 )).unwrap(); |                 )).unwrap(); | ||||||
|  |  | ||||||
|  |                 if self.must_stop_processing.get() { | ||||||
|  |                     return Err(Error::AbortedTask); | ||||||
|  |                 } | ||||||
|                 let path = self.dumps_path.join(format!("{}.dump", dump_uid)); |                 let path = self.dumps_path.join(format!("{}.dump", dump_uid)); | ||||||
|                 let file = File::create(path)?; |                 let file = File::create(path)?; | ||||||
|                 dump.persist_to(BufWriter::new(file))?; |                 dump.persist_to(BufWriter::new(file))?; | ||||||
|   | |||||||
| @@ -108,6 +108,8 @@ pub enum Error { | |||||||
|     TaskDeletionWithEmptyQuery, |     TaskDeletionWithEmptyQuery, | ||||||
|     #[error("Query parameters to filter the tasks to cancel are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.")] |     #[error("Query parameters to filter the tasks to cancel are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.")] | ||||||
|     TaskCancelationWithEmptyQuery, |     TaskCancelationWithEmptyQuery, | ||||||
|  |     #[error("Aborted task")] | ||||||
|  |     AbortedTask, | ||||||
|  |  | ||||||
|     #[error(transparent)] |     #[error(transparent)] | ||||||
|     Dump(#[from] dump::Error), |     Dump(#[from] dump::Error), | ||||||
| @@ -175,6 +177,7 @@ impl Error { | |||||||
|             | Error::TaskNotFound(_) |             | Error::TaskNotFound(_) | ||||||
|             | Error::TaskDeletionWithEmptyQuery |             | Error::TaskDeletionWithEmptyQuery | ||||||
|             | Error::TaskCancelationWithEmptyQuery |             | Error::TaskCancelationWithEmptyQuery | ||||||
|  |             | Error::AbortedTask | ||||||
|             | Error::Dump(_) |             | Error::Dump(_) | ||||||
|             | Error::Heed(_) |             | Error::Heed(_) | ||||||
|             | Error::Milli(_) |             | Error::Milli(_) | ||||||
| @@ -236,6 +239,9 @@ impl ErrorCode for Error { | |||||||
|             Error::TaskDatabaseUpdate(_) => Code::Internal, |             Error::TaskDatabaseUpdate(_) => Code::Internal, | ||||||
|             Error::CreateBatch(_) => Code::Internal, |             Error::CreateBatch(_) => Code::Internal, | ||||||
|  |  | ||||||
|  |             // This one should never be seen by the end user | ||||||
|  |             Error::AbortedTask => Code::Internal, | ||||||
|  |  | ||||||
|             #[cfg(test)] |             #[cfg(test)] | ||||||
|             Error::PlannedFailure => Code::Internal, |             Error::PlannedFailure => Code::Internal, | ||||||
|         } |         } | ||||||
|   | |||||||
| @@ -1167,7 +1167,8 @@ impl IndexScheduler { | |||||||
|             // If we have an abortion error we must stop the tick here and re-schedule tasks. |             // If we have an abortion error we must stop the tick here and re-schedule tasks. | ||||||
|             Err(Error::Milli(milli::Error::InternalError( |             Err(Error::Milli(milli::Error::InternalError( | ||||||
|                 milli::InternalError::AbortedIndexation, |                 milli::InternalError::AbortedIndexation, | ||||||
|             ))) => { |             ))) | ||||||
|  |             | Err(Error::AbortedTask) => { | ||||||
|                 #[cfg(test)] |                 #[cfg(test)] | ||||||
|                 self.breakpoint(Breakpoint::AbortedIndexation); |                 self.breakpoint(Breakpoint::AbortedIndexation); | ||||||
|                 wtxn.abort().map_err(Error::HeedTransaction)?; |                 wtxn.abort().map_err(Error::HeedTransaction)?; | ||||||
| @@ -4323,4 +4324,26 @@ mod tests { | |||||||
|         } |         } | ||||||
|         "###); |         "###); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn cancel_processing_dump() { | ||||||
|  |         let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); | ||||||
|  |  | ||||||
|  |         let dump_creation = KindWithContent::DumpCreation { keys: Vec::new(), instance_uid: None }; | ||||||
|  |         let dump_cancellation = KindWithContent::TaskCancelation { | ||||||
|  |             query: "cancel dump".to_owned(), | ||||||
|  |             tasks: RoaringBitmap::from_iter([0]), | ||||||
|  |         }; | ||||||
|  |         let _ = index_scheduler.register(dump_creation).unwrap(); | ||||||
|  |         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_dump_register"); | ||||||
|  |         handle.advance_till([Start, BatchCreated, InsideProcessBatch]); | ||||||
|  |  | ||||||
|  |         let _ = index_scheduler.register(dump_cancellation).unwrap(); | ||||||
|  |         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_registered"); | ||||||
|  |  | ||||||
|  |         snapshot!(format!("{:?}", handle.advance()), @"AbortedIndexation"); | ||||||
|  |  | ||||||
|  |         handle.advance_one_successful_batch(); | ||||||
|  |         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed"); | ||||||
|  |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -0,0 +1,35 @@ | |||||||
|  | --- | ||||||
|  | source: index-scheduler/src/lib.rs | ||||||
|  | --- | ||||||
|  | ### Autobatching Enabled = true | ||||||
|  | ### Processing Tasks: | ||||||
|  | [] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### All Tasks: | ||||||
|  | 0 {uid: 0, status: enqueued, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }} | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Status: | ||||||
|  | enqueued [0,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Kind: | ||||||
|  | "dumpCreation" [0,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Index Tasks: | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Index Mapper: | ||||||
|  |  | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Canceled By: | ||||||
|  |  | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Enqueued At: | ||||||
|  | [timestamp] [0,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Started At: | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Finished At: | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### File Store: | ||||||
|  |  | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  |  | ||||||
| @@ -0,0 +1,45 @@ | |||||||
|  | --- | ||||||
|  | source: index-scheduler/src/lib.rs | ||||||
|  | --- | ||||||
|  | ### Autobatching Enabled = true | ||||||
|  | ### Processing Tasks: | ||||||
|  | [] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### All Tasks: | ||||||
|  | 0 {uid: 0, status: canceled, canceled_by: 1, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }} | ||||||
|  | 1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(0), original_filter: "cancel dump" }, kind: TaskCancelation { query: "cancel dump", tasks: RoaringBitmap<[0]> }} | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Status: | ||||||
|  | enqueued [] | ||||||
|  | succeeded [1,] | ||||||
|  | canceled [0,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Kind: | ||||||
|  | "taskCancelation" [1,] | ||||||
|  | "dumpCreation" [0,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Index Tasks: | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Index Mapper: | ||||||
|  |  | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Canceled By: | ||||||
|  | 1 [0,] | ||||||
|  |  | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Enqueued At: | ||||||
|  | [timestamp] [0,] | ||||||
|  | [timestamp] [1,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Started At: | ||||||
|  | [timestamp] [0,] | ||||||
|  | [timestamp] [1,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Finished At: | ||||||
|  | [timestamp] [0,] | ||||||
|  | [timestamp] [1,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### File Store: | ||||||
|  |  | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  |  | ||||||
| @@ -0,0 +1,38 @@ | |||||||
|  | --- | ||||||
|  | source: index-scheduler/src/lib.rs | ||||||
|  | --- | ||||||
|  | ### Autobatching Enabled = true | ||||||
|  | ### Processing Tasks: | ||||||
|  | [0,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### All Tasks: | ||||||
|  | 0 {uid: 0, status: enqueued, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }} | ||||||
|  | 1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "cancel dump" }, kind: TaskCancelation { query: "cancel dump", tasks: RoaringBitmap<[0]> }} | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Status: | ||||||
|  | enqueued [0,1,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Kind: | ||||||
|  | "taskCancelation" [1,] | ||||||
|  | "dumpCreation" [0,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Index Tasks: | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Index Mapper: | ||||||
|  |  | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Canceled By: | ||||||
|  |  | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Enqueued At: | ||||||
|  | [timestamp] [0,] | ||||||
|  | [timestamp] [1,] | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Started At: | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### Finished At: | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  | ### File Store: | ||||||
|  |  | ||||||
|  | ---------------------------------------------------------------------- | ||||||
|  |  | ||||||
		Reference in New Issue
	
	Block a user