Add support for the progress API of arroy

This commit is contained in:
Tamo
2025-03-13 17:36:49 +01:00
parent 82912e191b
commit 009c36a4d0
7 changed files with 67 additions and 4 deletions

View File

@@ -1,3 +1,4 @@
use enum_iterator::Sequence;
use std::any::TypeId;
use std::borrow::Cow;
use std::marker::PhantomData;
@@ -76,6 +77,14 @@ impl Progress {
durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect()
}
// TODO: ideally we should expose the progress in a way that let arroy use it directly
pub(crate) fn update_progress_from_arroy(&self, progress: arroy::WriterProgress) {
self.update_progress(progress.main);
if let Some(sub) = progress.sub {
self.update_progress(sub);
}
}
}
/// Generate the names associated with the durations and push them.
@@ -238,3 +247,44 @@ impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
self.total
}
}
impl Step for arroy::MainStep {
fn name(&self) -> Cow<'static, str> {
match self {
arroy::MainStep::PreProcessingTheItems => "pre processing the items",
arroy::MainStep::WritingTheDescendantsAndMetadata => {
"writing the descendants and metadata"
}
arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes",
arroy::MainStep::UpdatingTheTrees => "updating the trees",
arroy::MainStep::CreateNewTrees => "create new trees",
arroy::MainStep::WritingNodesToDatabase => "writing nodes to database",
arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees",
arroy::MainStep::WriteTheMetadata => "write the metadata",
}
.into()
}
fn current(&self) -> u32 {
*self as u32
}
fn total(&self) -> u32 {
Self::CARDINALITY as u32
}
}
impl Step for arroy::SubStep {
fn name(&self) -> Cow<'static, str> {
self.unit.into()
}
fn current(&self) -> u32 {
self.current.load(Ordering::Relaxed)
}
fn total(&self) -> u32 {
self.max
}
}

View File

@@ -31,6 +31,7 @@ use super::new::StdResult;
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError};
use crate::index::{PrefixSearch, PrefixSettings};
use crate::progress::Progress;
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
@@ -522,6 +523,8 @@ where
let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized);
writer.build_and_quantize(
wtxn,
// In the settings we don't have any progress to share
&Progress::default(),
&mut rng,
dimension,
is_quantizing,

View File

@@ -201,6 +201,7 @@ where
build_vectors(
index,
wtxn,
indexing_context.progress,
index_embeddings,
arroy_memory,
&mut arroy_writers,

View File

@@ -10,6 +10,7 @@ use super::super::channel::*;
use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::index::IndexEmbeddingConfig;
use crate::progress::Progress;
use crate::update::settings::InnerIndexSettings;
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings};
use crate::{Error, Index, InternalError, Result};
@@ -100,6 +101,7 @@ impl ChannelCongestion {
pub fn build_vectors<MSP>(
index: &Index,
wtxn: &mut RwTxn<'_>,
progress: &Progress,
index_embeddings: Vec<IndexEmbeddingConfig>,
arroy_memory: Option<usize>,
arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
@@ -118,6 +120,7 @@ where
let dimensions = *dimensions;
writer.build_and_quantize(
wtxn,
progress,
&mut rng,
dimensions,
false,

View File

@@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use self::error::{EmbedError, NewEmbedderError};
use crate::progress::Progress;
use crate::prompt::{Prompt, PromptData};
use crate::ThreadPoolNoAbort;
@@ -81,9 +82,11 @@ impl ArroyWrapper {
}
}
#[allow(clippy::too_many_arguments)]
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng>(
&mut self,
wtxn: &mut RwTxn,
progress: &Progress,
rng: &mut R,
dimension: usize,
quantizing: bool,
@@ -110,12 +113,14 @@ impl ArroyWrapper {
writer
.builder(rng)
.available_memory(arroy_memory.unwrap_or(usize::MAX))
.progress(|step| progress.update_progress_from_arroy(step))
.cancel(cancel)
.build(wtxn)?;
} else if writer.need_build(wtxn)? {
writer
.builder(rng)
.available_memory(arroy_memory.unwrap_or(usize::MAX))
.progress(|step| progress.update_progress_from_arroy(step))
.cancel(cancel)
.build(wtxn)?;
} else if writer.is_empty(wtxn)? {