Enhance index update functionality to support renaming by adding new_uid field. Update related structures and methods to handle the new index UID during updates, ensuring backward compatibility with existing index operations.

This commit is contained in:
Quentin de Quelen
2025-08-05 19:18:05 +02:00
committed by Tamo
parent 0f1c78b185
commit ae2d0a67a4
15 changed files with 547 additions and 119 deletions

View File

@ -15,7 +15,7 @@ use super::create_batch::Batch;
use crate::processing::{
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep,
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress, RenameIndexProgress,
UpdateIndexProgress,
};
use crate::utils::{
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
@ -224,38 +224,47 @@ impl IndexScheduler {
self.index_mapper.create_index(wtxn, &index_uid, None)?;
self.process_batch(
Batch::IndexUpdate { index_uid, primary_key, task },
Batch::IndexUpdate { index_uid, primary_key, new_index_uid: None, task },
current_batch,
progress,
)
}
Batch::IndexRename { index_uid, new_index_uid, mut task } => {
progress.update_progress(RenameIndexProgress::RenamingTheIndex);
let mut wtxn = self.env.write_txn()?;
self.index_mapper.rename(&mut wtxn, &index_uid, &new_index_uid)?;
self.queue.tasks.update_index(&mut wtxn, &new_index_uid, |bm| {
let old = self.queue.tasks.index_tasks(&wtxn, &index_uid).unwrap_or_default();
*bm |= &old;
})?;
self.queue.tasks.update_index(&mut wtxn, &index_uid, |bm| bm.clear())?;
wtxn.commit()?;
task.status = Status::Succeeded;
task.details = Some(Details::IndexRename(IndexRenameDetails { old_uid: index_uid, new_uid: new_index_uid }));
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::IndexUpdate { index_uid, primary_key, mut task } => {
Batch::IndexUpdate { index_uid, primary_key, new_index_uid, mut task } => {
progress.update_progress(UpdateIndexProgress::UpdatingTheIndex);
let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?;
if let Some(primary_key) = primary_key.clone() {
// Handle rename if new_index_uid is provided
let final_index_uid = if let Some(new_uid) = &new_index_uid {
let mut wtxn = self.env.write_txn()?;
// Rename the index
self.index_mapper.rename(&mut wtxn, &index_uid, new_uid)?;
// Update the task index mappings
let old_tasks =
self.queue.tasks.index_tasks(&wtxn, &index_uid).unwrap_or_default();
self.queue.tasks.update_index(&mut wtxn, new_uid, |bm| {
*bm |= &old_tasks;
})?;
self.queue.tasks.update_index(&mut wtxn, &index_uid, |bm| bm.clear())?;
wtxn.commit()?;
new_uid.clone()
} else {
index_uid.clone()
};
// Get the index (renamed or not)
let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &final_index_uid)?;
// Handle primary key update if provided
if let Some(ref primary_key) = primary_key {
let mut index_wtxn = index.write_txn()?;
let mut builder = MilliSettings::new(
&mut index_wtxn,
&index,
self.index_mapper.indexer_config(),
);
builder.set_primary_key(primary_key);
builder.set_primary_key(primary_key.clone());
let must_stop_processing = self.scheduler.must_stop_processing.clone();
builder
@ -264,7 +273,7 @@ impl IndexScheduler {
&progress,
current_batch.embedder_stats.clone(),
)
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
.map_err(|e| Error::from_milli(e, Some(final_index_uid.to_string())))?;
index_wtxn.commit()?;
}
@ -272,7 +281,10 @@ impl IndexScheduler {
rtxn.commit()?;
task.status = Status::Succeeded;
task.details = Some(Details::IndexInfo { primary_key });
task.details = Some(Details::IndexInfo {
primary_key: primary_key.clone(),
new_uid: new_index_uid.clone(),
});
// if the update processed successfully, we're going to store the new
// stats of the index. Since the tasks have already been processed and
@ -282,8 +294,8 @@ impl IndexScheduler {
let mut wtxn = self.env.write_txn()?;
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
.map_err(|e| Error::from_milli(e, Some(final_index_uid.clone())))?;
self.index_mapper.store_stats_of(&mut wtxn, &final_index_uid, &stats)?;
wtxn.commit()?;
Ok(())
}();