Compare commits

...

4 Commits

Author SHA1 Message Date
3b95007292 update arroy to the latest un-released version 2025-07-07 12:27:19 +02:00
4a2b8be428 fmt 2025-07-07 12:18:59 +02:00
bea11a1353 update arroy to the latest working version 2025-07-07 12:18:57 +02:00
5a7948bfab Use unstable arroy version containing incremental indexing 2025-07-07 12:12:11 +02:00
17 changed files with 101 additions and 80 deletions

20
Cargo.lock generated
View File

@ -444,12 +444,12 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "arroy"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08e6111f351d004bd13e95ab540721272136fd3218b39d3ec95a2ea1c4e6a0a6"
version = "0.7.0"
source = "git+https://github.com/meilisearch/arroy.git?rev=1a21e5c0cbe3e106b05d834076a5b727cf46bca1#1a21e5c0cbe3e106b05d834076a5b727cf46bca1"
dependencies = [
"bytemuck",
"byteorder",
"crossbeam",
"enum-iterator",
"heed",
"memmap2",
@ -461,6 +461,7 @@ dependencies = [
"roaring",
"tempfile",
"thiserror 2.0.12",
"thread_local",
"tracing",
]
@ -1367,6 +1368,19 @@ dependencies = [
"itertools 0.10.5",
]
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.15"

View File

@ -238,7 +238,7 @@ impl IndexScheduler {
#[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u64);
progress.update_progress(task_progress_obj);
process_batch_info = info;
let mut success = 0;
@ -317,7 +317,7 @@ impl IndexScheduler {
Err(err) => {
#[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchFailed);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u64);
progress.update_progress(task_progress_obj);
if matches!(err, Error::DatabaseUpgrade(_)) {

View File

@ -353,8 +353,8 @@ impl IndexScheduler {
for (step, swap) in swaps.iter().enumerate() {
progress.update_progress(VariableNameStep::<SwappingTheIndexes>::new(
format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1),
step as u32,
swaps.len() as u32,
step as u64,
swaps.len() as u64,
));
self.apply_index_swap(
&mut wtxn,
@ -472,7 +472,7 @@ impl IndexScheduler {
// 3. before_name -> new_name in the task's KindWithContent
progress.update_progress(InnerSwappingTwoIndexes::UpdateTheTasks);
let tasks_to_update = &index_lhs_task_ids | &index_rhs_task_ids;
let (atomic, task_progress) = AtomicTaskStep::new(tasks_to_update.len() as u32);
let (atomic, task_progress) = AtomicTaskStep::new(tasks_to_update.len() as u64);
progress.update_progress(task_progress);
for task_id in tasks_to_update {
@ -529,7 +529,7 @@ impl IndexScheduler {
// The tasks that have been removed *per batches*.
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u64);
progress.update_progress(task_progress);
for task_id in to_delete_tasks.iter() {
let task =
@ -575,7 +575,7 @@ impl IndexScheduler {
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
let (atomic_progress, task_progress) = AtomicTaskStep::new(
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u64,
);
progress.update_progress(task_progress);
for index in affected_indexes.iter() {
@ -594,7 +594,7 @@ impl IndexScheduler {
}
progress.update_progress(TaskDeletionProgress::DeletingTasks);
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u64);
progress.update_progress(task_progress);
for task in to_delete_tasks.iter() {
self.queue.tasks.all_tasks.delete(wtxn, &task)?;
@ -611,7 +611,7 @@ impl IndexScheduler {
}
}
progress.update_progress(TaskDeletionProgress::DeletingBatches);
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u64);
progress.update_progress(batch_progress);
for (batch_id, to_delete_tasks) in affected_batches {
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
@ -786,7 +786,7 @@ impl IndexScheduler {
}
// 3. We now have a list of tasks to cancel, cancel them
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u64);
progress.update_progress(progress_obj);
let mut tasks = self.queue.tasks.get_existing_tasks(
@ -797,7 +797,7 @@ impl IndexScheduler {
)?;
progress.update_progress(TaskCancelationProgress::UpdatingTasks);
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u64);
progress.update_progress(progress_obj);
for task in tasks.iter_mut() {
task.status = Status::Canceled;

View File

@ -57,7 +57,7 @@ impl IndexScheduler {
let mut dump_tasks = dump.create_tasks_queue()?;
let (atomic, update_task_progress) =
AtomicTaskStep::new(self.queue.tasks.all_tasks.len(&rtxn)? as u32);
AtomicTaskStep::new(self.queue.tasks.all_tasks.len(&rtxn)? as u64);
progress.update_progress(update_task_progress);
for ret in self.queue.tasks.all_tasks.iter(&rtxn)? {
@ -119,7 +119,7 @@ impl IndexScheduler {
let mut dump_batches = dump.create_batches_queue()?;
let (atomic_batch_progress, update_batch_progress) =
AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u32);
AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u64);
progress.update_progress(update_batch_progress);
for ret in self.queue.batches.all_batches.iter(&rtxn)? {
@ -149,7 +149,7 @@ impl IndexScheduler {
// 5. Dump the indexes
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u64;
let mut count = 0;
let () = self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
progress.update_progress(VariableNameStep::<DumpCreationProgress>::new(
@ -178,7 +178,7 @@ impl IndexScheduler {
let nb_documents = index
.number_of_documents(&rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?
as u32;
as u64;
let (atomic, update_document_progress) = AtomicDocumentStep::new(nb_documents);
progress.update_progress(update_document_progress);
let documents = index

View File

@ -55,8 +55,8 @@ impl IndexScheduler {
progress.update_progress(VariableNameStep::<ExportIndex>::new(
format!("Exporting index `{uid}`"),
i as u32,
indexes.len() as u32,
i as u64,
indexes.len() as u64,
));
let ExportIndexSettings { filter, override_settings } = export_settings;
@ -155,7 +155,7 @@ impl IndexScheduler {
// spawn many threads to process the documents
drop(index_rtxn);
let total_documents = universe.len() as u32;
let total_documents = universe.len();
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
progress.update_progress(progress_step);
@ -163,7 +163,7 @@ impl IndexScheduler {
IndexUidPattern::new_unchecked(uid.clone()),
DetailsExportIndexSettings {
settings: (*export_settings).clone(),
matched_documents: Some(total_documents as u64),
matched_documents: Some(total_documents),
},
);

View File

@ -58,7 +58,7 @@ impl IndexScheduler {
// 2.4 Only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u64);
progress.update_progress(update_file_progress);
for task_id in enqueued {
let task =
@ -74,12 +74,12 @@ impl IndexScheduler {
// 3. Snapshot every indexes
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
let index_mapping = self.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
let nb_indexes = index_mapping.len(&rtxn)? as u64;
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
let (name, uuid) = result?;
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
name, i as u32, nb_indexes,
name, i as u64, nb_indexes,
));
let index = self.index_mapper.index(&rtxn, name)?;
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());

View File

@ -22,8 +22,8 @@ impl IndexScheduler {
}
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
format!("Upgrading index `{uid}`"),
i as u32,
indexes.len() as u32,
i as u64,
indexes.len() as u64,
));
let index = self.index(uid)?;
let mut index_wtxn = index.write_txn()?;
@ -65,8 +65,8 @@ impl IndexScheduler {
for (i, uid) in indexes.iter().enumerate() {
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
format!("Rollbacking index `{uid}`"),
i as u32,
indexes.len() as u32,
i as u64,
indexes.len() as u64,
));
let index_schd_rtxn = self.env.read_txn()?;

View File

@ -162,8 +162,8 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
let (uid, uuid) = result?;
progress.update_progress(VariableNameStep::new(
&uid,
index_index as u32,
index_count as u32,
index_index as u64,
index_count as u64,
));
let index_path = db_path.join("indexes").join(uuid.to_string());
@ -220,12 +220,12 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
pub struct VariableNameStep {
name: String,
current: u32,
total: u32,
current: u64,
total: u64,
}
impl VariableNameStep {
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
pub fn new(name: impl Into<String>, current: u64, total: u64) -> Self {
Self { name: name.into(), current, total }
}
}
@ -235,11 +235,11 @@ impl Step for VariableNameStep {
self.name.clone().into()
}
fn current(&self) -> u32 {
fn current(&self) -> u64 {
self.current
}
fn total(&self) -> u32 {
fn total(&self) -> u64 {
self.total
}
}

View File

@ -87,7 +87,8 @@ rhai = { version = "1.22.2", features = [
"no_time",
"sync",
] }
arroy = "0.6.1"
# arroy = "0.6.1"
arroy = { git = "https://github.com/meilisearch/arroy.git", rev = "1a21e5c0cbe3e106b05d834076a5b727cf46bca1" } # incremental update
rand = "0.8.5"
tracing = "0.1.41"
ureq = { version = "2.12.1", features = ["json"] }

View File

@ -420,7 +420,9 @@ impl From<arroy::Error> for Error {
| arroy::Error::NeedBuild(_)
| arroy::Error::MissingKey { .. }
| arroy::Error::MissingMetadata(_)
| arroy::Error::CannotDecodeKeyMode { .. } => {
| arroy::Error::CannotDecodeKeyMode { .. }
| arroy::Error::UnknownVersion { .. }
| arroy::Error::Panic(_) => {
Error::InternalError(InternalError::ArroyError(value))
}
}

View File

@ -1,7 +1,7 @@
use std::any::TypeId;
use std::borrow::Cow;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@ -13,8 +13,8 @@ use utoipa::ToSchema;
pub trait Step: 'static + Send + Sync {
fn name(&self) -> Cow<'static, str>;
fn current(&self) -> u32;
fn total(&self) -> u32;
fn current(&self) -> u64;
fn total(&self) -> u64;
}
#[derive(Clone, Default)]
@ -132,13 +132,13 @@ pub trait NamedStep: 'static + Send + Sync + Default {
/// - The total number of steps doesn't change
pub struct AtomicSubStep<Name: NamedStep> {
unit_name: Name,
current: Arc<AtomicU32>,
total: u32,
current: Arc<AtomicU64>,
total: u64,
}
impl<Name: NamedStep> AtomicSubStep<Name> {
pub fn new(total: u32) -> (Arc<AtomicU32>, Self) {
let current = Arc::new(AtomicU32::new(0));
pub fn new(total: u64) -> (Arc<AtomicU64>, Self) {
let current = Arc::new(AtomicU64::new(0));
(current.clone(), Self { current, total, unit_name: Name::default() })
}
}
@ -148,11 +148,11 @@ impl<Name: NamedStep> Step for AtomicSubStep<Name> {
self.unit_name.name().into()
}
fn current(&self) -> u32 {
fn current(&self) -> u64 {
self.current.load(Ordering::Relaxed)
}
fn total(&self) -> u32 {
fn total(&self) -> u64 {
self.total
}
}
@ -183,13 +183,13 @@ macro_rules! make_enum_progress {
}
}
fn current(&self) -> u32 {
*self as u32
fn current(&self) -> u64 {
*self as u64
}
fn total(&self) -> u32 {
fn total(&self) -> u64 {
use $crate::progress::_private_enum_iterator::Sequence;
Self::CARDINALITY as u32
Self::CARDINALITY as u64
}
}
};
@ -235,8 +235,8 @@ pub struct ProgressView {
#[schema(rename_all = "camelCase")]
pub struct ProgressStepView {
pub current_step: Cow<'static, str>,
pub finished: u32,
pub total: u32,
pub finished: u64,
pub total: u64,
}
/// Used when the name can change but it's still the same step.
@ -252,13 +252,13 @@ pub struct ProgressStepView {
/// ```
pub struct VariableNameStep<U: Send + Sync + 'static> {
name: String,
current: u32,
total: u32,
current: u64,
total: u64,
phantom: PhantomData<U>,
}
impl<U: Send + Sync + 'static> VariableNameStep<U> {
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
pub fn new(name: impl Into<String>, current: u64, total: u64) -> Self {
Self { name: name.into(), current, total, phantom: PhantomData }
}
}
@ -268,11 +268,11 @@ impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
self.name.clone().into()
}
fn current(&self) -> u32 {
fn current(&self) -> u64 {
self.current
}
fn total(&self) -> u32 {
fn total(&self) -> u64 {
self.total
}
}
@ -285,22 +285,26 @@ impl Step for arroy::MainStep {
"writing the descendants and metadata"
}
arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes",
arroy::MainStep::UpdatingTheTrees => "updating the trees",
arroy::MainStep::CreateNewTrees => "create new trees",
arroy::MainStep::WritingNodesToDatabase => "writing nodes to database",
arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees",
arroy::MainStep::WriteTheMetadata => "write the metadata",
arroy::MainStep::RetrievingTheItemsIds => "retrieving the items ids",
arroy::MainStep::RetrievingTheUsedTreeNodes => "retrieving the used tree nodes",
arroy::MainStep::DeletingExtraTrees => "deleting extra trees",
arroy::MainStep::RemoveItemsFromExistingTrees => "remove items from existing trees",
arroy::MainStep::InsertItemsInCurrentTrees => "insert items in current trees",
arroy::MainStep::RetrievingTheItems => "retrieving the items",
arroy::MainStep::RetrievingTheTreeNodes => "retrieving the tree nodes",
arroy::MainStep::RetrieveTheLargeDescendants => "retrieve the large descendants",
arroy::MainStep::CreateTreesForItems => "create trees for items",
}
.into()
}
fn current(&self) -> u32 {
*self as u32
fn current(&self) -> u64 {
*self as u64
}
fn total(&self) -> u32 {
Self::CARDINALITY as u32
fn total(&self) -> u64 {
Self::CARDINALITY as u64
}
}
@ -309,11 +313,11 @@ impl Step for arroy::SubStep {
self.unit.into()
}
fn current(&self) -> u32 {
fn current(&self) -> u64 {
self.current.load(Ordering::Relaxed)
}
fn total(&self) -> u32 {
fn total(&self) -> u64 {
self.max
}
}

View File

@ -135,7 +135,7 @@ where
extractor_alloc.0.reset();
}
let total_documents = document_changes.len() as u32;
let total_documents = document_changes.len() as u64;
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
progress.update_progress(progress_step);
@ -167,7 +167,7 @@ where
});
let res = extractor.process(changes, context).map_err(Arc::new);
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
step.fetch_add(items.as_ref().len() as u64, Ordering::Relaxed);
// send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));

View File

@ -85,14 +85,14 @@ impl<'pl> DocumentOperation<'pl> {
let mut primary_key = None;
let payload_count = operations.len();
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32);
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u64);
progress.update_progress(progress_step);
for (payload_index, operation) in operations.into_iter().enumerate() {
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into());
}
step.store(payload_index as u32, Ordering::Relaxed);
step.store(payload_index as u64, Ordering::Relaxed);
let mut bytes = 0;
let result = match operation {
@ -145,7 +145,7 @@ impl<'pl> DocumentOperation<'pl> {
};
operations_stats.push(PayloadStats { document_count, bytes, error });
}
step.store(payload_count as u32, Ordering::Relaxed);
step.store(payload_count as u64, Ordering::Relaxed);
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =

View File

@ -101,7 +101,7 @@ pub fn settings_change_extract<
extractor_alloc.0.reset();
}
let total_documents = documents.len() as u32;
let total_documents = documents.len() as u64;
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
progress.update_progress(progress_step);
@ -132,7 +132,7 @@ pub fn settings_change_extract<
.filter_map(|item| documents.item_to_database_document(context, item).transpose());
let res = extractor.process(documents, context).map_err(Arc::new);
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
step.fetch_add(items.as_ref().len() as u64, Ordering::Relaxed);
// send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));

View File

@ -11,7 +11,7 @@ pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progre
let field_id_map = index.fields_ids_map(wtxn)?;
let (update_document_count, sub_step) =
AtomicSubStep::<progress::Document>::new(document_count as u32);
AtomicSubStep::<progress::Document>::new(document_count as u64);
progress.update_progress(sub_step);
let docids = index.documents_ids(wtxn)?;

View File

@ -81,8 +81,8 @@ where
target.1,
target.2
),
i as u32,
upgrade_path.len() as u32,
i as u64,
upgrade_path.len() as u64,
));
regenerate_stats |= upgrade.upgrade(wtxn, index, from, progress.clone())?;
index.put_version(wtxn, target)?;

View File

@ -133,7 +133,7 @@ impl ArroyWrapper {
}
#[allow(clippy::too_many_arguments)]
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng>(
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng + Send + Sync>(
&mut self,
wtxn: &mut RwTxn,
progress: &Progress,