IndexScheduler::update_task now merges the task.network and accepts &mut Task

This commit is contained in:
Louis Dureuil
2025-07-29 13:48:33 +02:00
parent 56c7f54804
commit bd97a7cc19
2 changed files with 24 additions and 3 deletions

View File

@ -97,7 +97,22 @@ impl TaskQueue {
Ok(self.all_tasks.get(rtxn, &task_id)?) Ok(self.all_tasks.get(rtxn, &task_id)?)
} }
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { /// Update the inverted task indexes and write the new value of the task.
///
/// The passed `task` object typically comes from a previous transaction, so two kinds of modification might have occurred:
/// 1. Modification to the `task` object after loading it from the DB (the purpose of this method is to persist these changes)
/// 2. Modification to the task committed by another transaction in the DB (an annoying consequence of having lost the original
/// transaction from which the `task` instance was deserialized)
///
/// When calling this function, this `task` is modified to take into account any existing `network`
/// that can have been added since the task was loaded into memory.
///
/// Any other modification to the task that was committed from the DB since the parameter was pulled from the DB will be overwritten.
///
/// # Errors
///
/// - CorruptedTaskQueue: The task doesn't exist in the database
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &mut Task) -> Result<()> {
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?; let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
let reprocessing = old_task.status != Status::Enqueued; let reprocessing = old_task.status != Status::Enqueued;
@ -157,6 +172,12 @@ impl TaskQueue {
} }
} }
task.network = match (old_task.network, task.network.take()) {
(None, None) => None,
(None, Some(network)) | (Some(network), None) => Some(network),
(Some(_), Some(network)) => Some(network),
};
self.all_tasks.put(wtxn, &task.uid, task)?; self.all_tasks.put(wtxn, &task.uid, task)?;
Ok(()) Ok(())
} }

View File

@ -268,7 +268,7 @@ impl IndexScheduler {
self.queue self.queue
.tasks .tasks
.update_task(&mut wtxn, &task) .update_task(&mut wtxn, &mut task)
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?; .map_err(|e| Error::UnrecoverableError(Box::new(e)))?;
} }
if let Some(canceled_by) = canceled_by { if let Some(canceled_by) = canceled_by {
@ -349,7 +349,7 @@ impl IndexScheduler {
self.queue self.queue
.tasks .tasks
.update_task(&mut wtxn, &task) .update_task(&mut wtxn, &mut task)
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?; .map_err(|e| Error::UnrecoverableError(Box::new(e)))?;
} }
} }