mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 01:46:28 +00:00 
			
		
		
		
	use new iterator in batch
This commit is contained in:
		@@ -30,7 +30,7 @@ use meilisearch_types::heed::{RoTxn, RwTxn};
 | 
				
			|||||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
 | 
					use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
 | 
				
			||||||
use meilisearch_types::milli::heed::CompactionOption;
 | 
					use meilisearch_types::milli::heed::CompactionOption;
 | 
				
			||||||
use meilisearch_types::milli::update::{
 | 
					use meilisearch_types::milli::update::{
 | 
				
			||||||
    IndexDocumentsConfig, IndexDocumentsMethod, Settings as MilliSettings,
 | 
					    IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use meilisearch_types::milli::{self, Filter, BEU32};
 | 
					use meilisearch_types::milli::{self, Filter, BEU32};
 | 
				
			||||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
 | 
					use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
 | 
				
			||||||
@@ -43,7 +43,7 @@ use uuid::Uuid;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
use crate::autobatcher::{self, BatchKind};
 | 
					use crate::autobatcher::{self, BatchKind};
 | 
				
			||||||
use crate::utils::{self, swap_index_uid_in_task};
 | 
					use crate::utils::{self, swap_index_uid_in_task};
 | 
				
			||||||
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
 | 
					use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// Represents a combination of tasks that can all be processed at the same time.
 | 
					/// Represents a combination of tasks that can all be processed at the same time.
 | 
				
			||||||
///
 | 
					///
 | 
				
			||||||
@@ -1323,7 +1323,13 @@ impl IndexScheduler {
 | 
				
			|||||||
                    } else {
 | 
					                    } else {
 | 
				
			||||||
                        unreachable!()
 | 
					                        unreachable!()
 | 
				
			||||||
                    };
 | 
					                    };
 | 
				
			||||||
                let deleted_documents = delete_document_by_filter(index_wtxn, filter, index);
 | 
					                let deleted_documents = delete_document_by_filter(
 | 
				
			||||||
 | 
					                    index_wtxn,
 | 
				
			||||||
 | 
					                    filter,
 | 
				
			||||||
 | 
					                    self.index_mapper.indexer_config(),
 | 
				
			||||||
 | 
					                    self.must_stop_processing.clone(),
 | 
				
			||||||
 | 
					                    index,
 | 
				
			||||||
 | 
					                );
 | 
				
			||||||
                let original_filter = if let Some(Details::DocumentDeletionByFilter {
 | 
					                let original_filter = if let Some(Details::DocumentDeletionByFilter {
 | 
				
			||||||
                    original_filter,
 | 
					                    original_filter,
 | 
				
			||||||
                    deleted_documents: _,
 | 
					                    deleted_documents: _,
 | 
				
			||||||
@@ -1557,6 +1563,8 @@ impl IndexScheduler {
 | 
				
			|||||||
fn delete_document_by_filter<'a>(
 | 
					fn delete_document_by_filter<'a>(
 | 
				
			||||||
    wtxn: &mut RwTxn<'a, '_>,
 | 
					    wtxn: &mut RwTxn<'a, '_>,
 | 
				
			||||||
    filter: &serde_json::Value,
 | 
					    filter: &serde_json::Value,
 | 
				
			||||||
 | 
					    indexer_config: &IndexerConfig,
 | 
				
			||||||
 | 
					    must_stop_processing: MustStopProcessing,
 | 
				
			||||||
    index: &'a Index,
 | 
					    index: &'a Index,
 | 
				
			||||||
) -> Result<u64> {
 | 
					) -> Result<u64> {
 | 
				
			||||||
    let filter = Filter::from_json(filter)?;
 | 
					    let filter = Filter::from_json(filter)?;
 | 
				
			||||||
@@ -1567,10 +1575,38 @@ fn delete_document_by_filter<'a>(
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
            e => e.into(),
 | 
					            e => e.into(),
 | 
				
			||||||
        })?;
 | 
					        })?;
 | 
				
			||||||
        todo!("need a way to get back the external ids from the internal ids");
 | 
					        let external_documents_ids = index.external_documents_ids(wtxn)?;
 | 
				
			||||||
        // let mut delete_operation = DeleteDocuments::new(wtxn, index)?;
 | 
					        // FIXME: for filters matching a lot of documents, this will allocate a huge vec of external docids (strings).
 | 
				
			||||||
        // delete_operation.delete_documents(&candidates);
 | 
					        // Since what we have is an iterator, it would be better to delete in chunks
 | 
				
			||||||
        // delete_operation.execute().map(|result| result.deleted_documents)?
 | 
					        let external_to_internal: std::result::Result<Vec<_>, RoaringBitmap> =
 | 
				
			||||||
 | 
					            external_documents_ids.find_external_id_of(candidates).only_external_ids().collect();
 | 
				
			||||||
 | 
					        let document_ids = match external_to_internal {
 | 
				
			||||||
 | 
					            Ok(external_ids) => external_ids,
 | 
				
			||||||
 | 
					            Err(remaining_ids) => panic!("Couldn't find some external ids {:?}", remaining_ids),
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let config = IndexDocumentsConfig {
 | 
				
			||||||
 | 
					            update_method: IndexDocumentsMethod::ReplaceDocuments,
 | 
				
			||||||
 | 
					            ..Default::default()
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let mut builder = milli::update::IndexDocuments::new(
 | 
				
			||||||
 | 
					            wtxn,
 | 
				
			||||||
 | 
					            index,
 | 
				
			||||||
 | 
					            indexer_config,
 | 
				
			||||||
 | 
					            config,
 | 
				
			||||||
 | 
					            |indexing_step| debug!("update: {:?}", indexing_step),
 | 
				
			||||||
 | 
					            || must_stop_processing.get(),
 | 
				
			||||||
 | 
					        )?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let (new_builder, user_result) = builder.remove_documents(document_ids)?;
 | 
				
			||||||
 | 
					        builder = new_builder;
 | 
				
			||||||
 | 
					        // Uses Invariant: remove documents actually always returns Ok for the inner result
 | 
				
			||||||
 | 
					        let count = user_result.unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let _ = builder.execute()?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        count
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
        0
 | 
					        0
 | 
				
			||||||
    })
 | 
					    })
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user