From 44b24652d272dadb676c1422cc8ab13a606b0052 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Wed, 23 Jul 2025 14:30:25 +0200 Subject: [PATCH] Change strategy to remove task instead of marking it succeeded --- .../scheduler/process_snapshot_creation.rs | 53 ++++++++++++++++--- crates/meilisearch/tests/snapshot/mod.rs | 46 +++++++++++----- 2 files changed, 77 insertions(+), 22 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 0859974cc..3b46d0359 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -25,9 +25,50 @@ unsafe fn mark_tasks_as_succeeded( let env = env_options.max_dbs(TaskQueue::nb_db()).map_size(index_base_map_size).open(dst)?; let mut wtxn = env.write_txn()?; let task_queue = TaskQueue::new(&env, &mut wtxn)?; - for mut task in tasks.iter().cloned() { - task.status = Status::Succeeded; - task_queue.update_task(&mut wtxn, &task)?; + + // Destructuring to ensure the code below gets updated if a database gets added in the future. + let TaskQueue { + all_tasks, + status, + kind, + index_tasks: _, // snapshot creation tasks are not index tasks + canceled_by, + enqueued_at, + started_at, + finished_at, + } = task_queue; + + for task in tasks { + all_tasks.delete(&mut wtxn, &task.uid)?; + + let mut tasks = status.get(&wtxn, &task.status)?.unwrap_or_default(); + tasks.remove(task.uid); + status.put(&mut wtxn, &task.status, &tasks)?; + + let mut tasks = kind.get(&wtxn, &task.kind.as_kind())?.unwrap_or_default(); + tasks.remove(task.uid); + kind.put(&mut wtxn, &task.kind.as_kind(), &tasks)?; + + canceled_by.delete(&mut wtxn, &task.uid)?; + + let timestamp = task.enqueued_at.unix_timestamp_nanos(); + let mut tasks = enqueued_at.get(&wtxn, ×tamp)?.unwrap_or_default(); + tasks.remove(task.uid); + enqueued_at.put(&mut wtxn, ×tamp, &tasks)?; + + if let Some(task_started_at) = task.started_at { + let timestamp = task_started_at.unix_timestamp_nanos(); + let mut tasks = started_at.get(&wtxn, ×tamp)?.unwrap_or_default(); + tasks.remove(task.uid); + started_at.put(&mut wtxn, ×tamp, &tasks)?; + } + + if let Some(task_finished_at) = task.finished_at { + let timestamp = task_finished_at.unix_timestamp_nanos(); + let mut tasks = finished_at.get(&wtxn, ×tamp)?.unwrap_or_default(); + tasks.remove(task.uid); + finished_at.put(&mut wtxn, ×tamp, &tasks)?; + } } wtxn.commit()?; Ok(()) @@ -80,11 +121,7 @@ impl IndexScheduler { // This is safe because we open the env file we just created in a temporary directory. // We are sure it's not being used by any other process nor thread. unsafe { - mark_tasks_as_succeeded( - &tasks, - &dst, - self.index_mapper.index_base_map_size, - )?; + mark_tasks_as_succeeded(&tasks, &dst, self.index_mapper.index_base_map_size)?; } // 2.3 Create a read transaction on the index-scheduler diff --git a/crates/meilisearch/tests/snapshot/mod.rs b/crates/meilisearch/tests/snapshot/mod.rs index a3c78bf28..98ce17b80 100644 --- a/crates/meilisearch/tests/snapshot/mod.rs +++ b/crates/meilisearch/tests/snapshot/mod.rs @@ -229,22 +229,40 @@ async fn snapshotception_issue_4653() { let options = Opt { import_snapshot: Some(snapshot_path), ..default_settings(temp.path()) }; let snapshot_server = Server::new_with_options(options).await.unwrap(); - // The snapshot creation task should NOT be spawned again => task is succeeded - let (task, code) = snapshot_server.get_task(task.uid()).await; + // The snapshot should have been taken without the snapshot creation task + let (tasks, code) = snapshot_server.tasks().await; snapshot!(code, @"200 OK"); - snapshot!(json_string!(task, { ".enqueuedAt" => "[date]" }), @r#" + snapshot!(tasks, @r#" { - "uid": 0, - "batchUid": 0, - "indexUid": null, - "status": "succeeded", - "type": "snapshotCreation", - "canceledBy": null, - "error": null, - "duration": null, - "enqueuedAt": "[date]", - "startedAt": null, - "finishedAt": null + "results": [], + "total": 0, + "limit": 20, + "from": null, + "next": null + } + "#); + + // Ensure the task is not present in the snapshot + let (task, code) = snapshot_server.get_task(0).await; + snapshot!(code, @"404 Not Found"); + snapshot!(task, @r#" + { + "message": "Task `0` not found.", + "code": "task_not_found", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#task_not_found" + } + "#); + + // Ensure the batch is also not present + let (batch, code) = snapshot_server.get_batch(0).await; + snapshot!(code, @"404 Not Found"); + snapshot!(batch, @r#" + { + "message": "Batch `0` not found.", + "code": "batch_not_found", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#batch_not_found" } "#); }