diff --git a/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs b/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs index 03a1d65a3..15c4a3786 100644 --- a/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs +++ b/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs @@ -226,59 +226,74 @@ impl IndexScheduler { total_moved_documents += documents_to_delete.len(); - let mut new_fields_ids_map = fields_ids_map.clone(); - - // candidates not empty => index not empty => a primary key is set - let primary_key = index.primary_key(&index_rtxn)?.unwrap(); - - let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map) - .map_err(milli::Error::from) - .map_err(err)?; - - let mut index_wtxn = index.write_txn()?; - - let mut indexer = indexer::DocumentDeletion::new(); - indexer.delete_documents_by_docids(documents_to_delete); - let document_changes = indexer.into_changes(&indexer_alloc, primary_key); - let embedders = index - .embedding_configs() - .embedding_configs(&index_wtxn) - .map_err(milli::Error::from) - .map_err(err)?; - let embedders = self.embedders(index_uid.to_string(), embedders)?; - let indexer_config = self.index_mapper.indexer_config(); - let pool = &indexer_config.thread_pool; - - indexer::index( - &mut index_wtxn, - index, - pool, - indexer_config.grenad_parameters(), - &fields_ids_map, - new_fields_ids_map, - None, // document deletion never changes primary key - &document_changes, - embedders, - &|| must_stop_processing.get(), - progress, - &EmbedderStats::default(), - ) - .map_err(err)?; - - // update stats - let mut mapper_wtxn = self.env.write_txn()?; - let stats = - crate::index_mapper::IndexStats::new(index, &index_wtxn).map_err(err)?; - self.index_mapper.store_stats_of(&mut mapper_wtxn, index_uid, &stats)?; - - index_wtxn.commit()?; - // update stats after committing changes to index - mapper_wtxn.commit()?; - - Ok(()) + self.delete_documents_from_index(progress, must_stop_processing, &indexer_alloc, index_uid, index, &err, index_rtxn, documents_to_delete, fields_ids_map) }, )?; Ok(total_moved_documents) } + + #[allow(clippy::too_many_arguments)] + fn delete_documents_from_index( + &self, + progress: &Progress, + must_stop_processing: &super::MustStopProcessing, + indexer_alloc: &Bump, + index_uid: &str, + index: &milli::Index, + err: &impl Fn(milli::Error) -> Error, + index_rtxn: milli::heed::RoTxn<'_, milli::heed::WithoutTls>, + documents_to_delete: RoaringBitmap, + fields_ids_map: milli::FieldsIdsMap, + ) -> std::result::Result<(), Error> { + let mut new_fields_ids_map = fields_ids_map.clone(); + + // candidates not empty => index not empty => a primary key is set + let primary_key = index.primary_key(&index_rtxn)?.unwrap(); + + let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map) + .map_err(milli::Error::from) + .map_err(err)?; + + let mut index_wtxn = index.write_txn()?; + + let mut indexer = indexer::DocumentDeletion::new(); + indexer.delete_documents_by_docids(documents_to_delete); + let document_changes = indexer.into_changes(&indexer_alloc, primary_key); + let embedders = index + .embedding_configs() + .embedding_configs(&index_wtxn) + .map_err(milli::Error::from) + .map_err(err)?; + let embedders = self.embedders(index_uid.to_string(), embedders)?; + let indexer_config = self.index_mapper.indexer_config(); + let pool = &indexer_config.thread_pool; + + indexer::index( + &mut index_wtxn, + index, + pool, + indexer_config.grenad_parameters(), + &fields_ids_map, + new_fields_ids_map, + None, // document deletion never changes primary key + &document_changes, + embedders, + &|| must_stop_processing.get(), + progress, + &EmbedderStats::default(), + ) + .map_err(err)?; + + // update stats + let mut mapper_wtxn = self.env.write_txn()?; + let stats = crate::index_mapper::IndexStats::new(index, &index_wtxn).map_err(err)?; + self.index_mapper.store_stats_of(&mut mapper_wtxn, index_uid, &stats)?; + + index_wtxn.commit()?; + // update stats after committing changes to index + mapper_wtxn.commit()?; + + Ok(()) + } }