mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-05 20:26:31 +00:00
Change strategy to remove task instead of marking it succeeded
This commit is contained in:
@ -25,9 +25,50 @@ unsafe fn mark_tasks_as_succeeded(
|
|||||||
let env = env_options.max_dbs(TaskQueue::nb_db()).map_size(index_base_map_size).open(dst)?;
|
let env = env_options.max_dbs(TaskQueue::nb_db()).map_size(index_base_map_size).open(dst)?;
|
||||||
let mut wtxn = env.write_txn()?;
|
let mut wtxn = env.write_txn()?;
|
||||||
let task_queue = TaskQueue::new(&env, &mut wtxn)?;
|
let task_queue = TaskQueue::new(&env, &mut wtxn)?;
|
||||||
for mut task in tasks.iter().cloned() {
|
|
||||||
task.status = Status::Succeeded;
|
// Destructuring to ensure the code below gets updated if a database gets added in the future.
|
||||||
task_queue.update_task(&mut wtxn, &task)?;
|
let TaskQueue {
|
||||||
|
all_tasks,
|
||||||
|
status,
|
||||||
|
kind,
|
||||||
|
index_tasks: _, // snapshot creation tasks are not index tasks
|
||||||
|
canceled_by,
|
||||||
|
enqueued_at,
|
||||||
|
started_at,
|
||||||
|
finished_at,
|
||||||
|
} = task_queue;
|
||||||
|
|
||||||
|
for task in tasks {
|
||||||
|
all_tasks.delete(&mut wtxn, &task.uid)?;
|
||||||
|
|
||||||
|
let mut tasks = status.get(&wtxn, &task.status)?.unwrap_or_default();
|
||||||
|
tasks.remove(task.uid);
|
||||||
|
status.put(&mut wtxn, &task.status, &tasks)?;
|
||||||
|
|
||||||
|
let mut tasks = kind.get(&wtxn, &task.kind.as_kind())?.unwrap_or_default();
|
||||||
|
tasks.remove(task.uid);
|
||||||
|
kind.put(&mut wtxn, &task.kind.as_kind(), &tasks)?;
|
||||||
|
|
||||||
|
canceled_by.delete(&mut wtxn, &task.uid)?;
|
||||||
|
|
||||||
|
let timestamp = task.enqueued_at.unix_timestamp_nanos();
|
||||||
|
let mut tasks = enqueued_at.get(&wtxn, ×tamp)?.unwrap_or_default();
|
||||||
|
tasks.remove(task.uid);
|
||||||
|
enqueued_at.put(&mut wtxn, ×tamp, &tasks)?;
|
||||||
|
|
||||||
|
if let Some(task_started_at) = task.started_at {
|
||||||
|
let timestamp = task_started_at.unix_timestamp_nanos();
|
||||||
|
let mut tasks = started_at.get(&wtxn, ×tamp)?.unwrap_or_default();
|
||||||
|
tasks.remove(task.uid);
|
||||||
|
started_at.put(&mut wtxn, ×tamp, &tasks)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(task_finished_at) = task.finished_at {
|
||||||
|
let timestamp = task_finished_at.unix_timestamp_nanos();
|
||||||
|
let mut tasks = finished_at.get(&wtxn, ×tamp)?.unwrap_or_default();
|
||||||
|
tasks.remove(task.uid);
|
||||||
|
finished_at.put(&mut wtxn, ×tamp, &tasks)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -80,11 +121,7 @@ impl IndexScheduler {
|
|||||||
// This is safe because we open the env file we just created in a temporary directory.
|
// This is safe because we open the env file we just created in a temporary directory.
|
||||||
// We are sure it's not being used by any other process nor thread.
|
// We are sure it's not being used by any other process nor thread.
|
||||||
unsafe {
|
unsafe {
|
||||||
mark_tasks_as_succeeded(
|
mark_tasks_as_succeeded(&tasks, &dst, self.index_mapper.index_base_map_size)?;
|
||||||
&tasks,
|
|
||||||
&dst,
|
|
||||||
self.index_mapper.index_base_map_size,
|
|
||||||
)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2.3 Create a read transaction on the index-scheduler
|
// 2.3 Create a read transaction on the index-scheduler
|
||||||
|
@ -229,22 +229,40 @@ async fn snapshotception_issue_4653() {
|
|||||||
let options = Opt { import_snapshot: Some(snapshot_path), ..default_settings(temp.path()) };
|
let options = Opt { import_snapshot: Some(snapshot_path), ..default_settings(temp.path()) };
|
||||||
let snapshot_server = Server::new_with_options(options).await.unwrap();
|
let snapshot_server = Server::new_with_options(options).await.unwrap();
|
||||||
|
|
||||||
// The snapshot creation task should NOT be spawned again => task is succeeded
|
// The snapshot should have been taken without the snapshot creation task
|
||||||
let (task, code) = snapshot_server.get_task(task.uid()).await;
|
let (tasks, code) = snapshot_server.tasks().await;
|
||||||
snapshot!(code, @"200 OK");
|
snapshot!(code, @"200 OK");
|
||||||
snapshot!(json_string!(task, { ".enqueuedAt" => "[date]" }), @r#"
|
snapshot!(tasks, @r#"
|
||||||
{
|
{
|
||||||
"uid": 0,
|
"results": [],
|
||||||
"batchUid": 0,
|
"total": 0,
|
||||||
"indexUid": null,
|
"limit": 20,
|
||||||
"status": "succeeded",
|
"from": null,
|
||||||
"type": "snapshotCreation",
|
"next": null
|
||||||
"canceledBy": null,
|
}
|
||||||
"error": null,
|
"#);
|
||||||
"duration": null,
|
|
||||||
"enqueuedAt": "[date]",
|
// Ensure the task is not present in the snapshot
|
||||||
"startedAt": null,
|
let (task, code) = snapshot_server.get_task(0).await;
|
||||||
"finishedAt": null
|
snapshot!(code, @"404 Not Found");
|
||||||
|
snapshot!(task, @r#"
|
||||||
|
{
|
||||||
|
"message": "Task `0` not found.",
|
||||||
|
"code": "task_not_found",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#task_not_found"
|
||||||
|
}
|
||||||
|
"#);
|
||||||
|
|
||||||
|
// Ensure the batch is also not present
|
||||||
|
let (batch, code) = snapshot_server.get_batch(0).await;
|
||||||
|
snapshot!(code, @"404 Not Found");
|
||||||
|
snapshot!(batch, @r#"
|
||||||
|
{
|
||||||
|
"message": "Batch `0` not found.",
|
||||||
|
"code": "batch_not_found",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#batch_not_found"
|
||||||
}
|
}
|
||||||
"#);
|
"#);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user