4090: Diff indexing r=ManyTheFish a=ManyTheFish

This pull request aims to reduce the indexing time by computing a difference between the data added to the index and the data removed from the index before writing in LMDB.

## Why focus on reducing the writings in LMDB?

The indexing in Meilisearch is split into 3 main phases:
1) The computing or the extraction of the data (Multi-threaded)
2) The writing of the data in LMDB (Mono-threaded)
3) The processing of the prefix databases (Mono-threaded)

see below:
![Capture d’écran 2023-09-28 à 20 01 45](https://github.com/meilisearch/meilisearch/assets/6482087/51513162-7c39-4244-978b-2c6b60c43a56)


Because the writing is mono-threaded, it represents a bottleneck in the indexing, reducing the number of writes in LMDB will reduce the pressure on the main thread and should reduce the global time spent on the indexing.

## Give Feedback

We created [a dedicated discussion](https://github.com/meilisearch/meilisearch/discussions/4196) for users to try this new feature and to give feedback on bugs or performance issues.

## Technical approach
### Part 1: merge the addition and the deletion process
This part:
a) Aims to reduce the time spent on indexing only the filterable/sortable fields of documents, for example:
  - Updating the number of "likes" or "stars" of a song or a movie
  - Updating the "stock count" or the "price" of a product

b) Aims to reduce the time spent on writing in LMDB which should reduce the global indexing time for the highly multi-threaded machines by reducing the writing bottleneck.

c) Aims to reduce the average time spent to delete documents without having to keep the soft-deleted documents implementation

- [x] Create a preprocessing function that creates the diff-based documents chuck (`OBKV<fid, OBKV<AddDel, value>>`)
  - [x] and clearly separate the faceted fields and the searchable fields in two different chunks
- Change the parameters of the input extractor by taking an `OBKV<fid, OBKV<AddDel, value>>` instead of  `OBKV<fid, value>`.
  - [x] extract_docid_word_positions
  - [x] extract_geo_points
  - [x] extract_vector_points
  - [x] extract_fid_docid_facet_values
- Adapt the searchable extractors to the new diff-chucks
  - [x] extract_fid_word_count_docids
  - [x] extract_word_pair_proximity_docids
  - [x] extract_word_position_docids
  - [x] extract_word_docids
- Adapt the facet extractors to the new diff-chucks
  - [x] extract_facet_number_docids
  - [x] extract_facet_string_docids
  - [x] extract_fid_docid_facet_values
  - [x] FacetsUpdate
- [x] Adapt the prefix database extractors ⚠️ ⚠️ 
- [x] Make the LMDB writer remove the document_ids to delete at the same time the new document_ids are added
- [x] Remove document deletion pipeline
  - [x] remove `new_documents_ids` entirely and `replaced_documents_ids`
  - [x] reuse extracted external id from transform instead of re-extracting in `TypedChunks::Documents`
  - [x] Remove deletion pipeline after autobatcher
  - [x] remove autobatcher deletion pipeline
    - [x] everything uses `IndexOperation::DocumentOperation`
    - [x] repair deletion by internal id for filter by delete
    - [x] Improve the deletion via internal ids by avoiding iterating over the whole set of external document ids.  
- [x] Remove soft-deleted documents

#### FIXME

- [x] field distribution is not correctly updated after deletion
- [x] missing documents in the tests of tokenizer_customization

### Part 2: Only compute the documents field by field
This part aims to reduce the global indexing time for any kind of partial document modification on any size of machine from the mono-threaded one to the highly multi-threaded one.

- [ ] Make the preprocessing function only send the fields that changed to the extractors
- [ ] remove the `word_docids` and `exact_word_docids` database and adapt the search (⚠️ could impact the search performances)
- [ ] replace the `word_pair_proximity_docids` database with a `word_pair_proximity_fid_docids` database and adapt the search (⚠️ could impact the search performances)
- [ ] Adapt the prefix database extractors ⚠️ ⚠️

## Technical Concerns
- The part 1 implementation could increase the indexing time for the smallest machines (with few threads) by increasing the extracting time (multi-threaded) more than the writing time (mono-threaded)
- The part 2 implementation needs to change the databases which could have a significant impact on the search performances
- The prefix databases are a bit special to process and may be a pain to adapt to the difference-based indexing

Co-authored-by: ManyTheFish <many@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
meili-bors[bot]
2023-11-21 09:44:38 +00:00
committed by GitHub
170 changed files with 3719 additions and 7081 deletions

View File

@@ -24,14 +24,13 @@ use std::fs::{self, File};
use std::io::BufWriter;
use dump::IndexMetadata;
use log::{debug, error, info};
use log::{debug, error, info, trace};
use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::{
DeleteDocuments, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
Settings as MilliSettings,
IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
};
use meilisearch_types::milli::{self, Filter, BEU32};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
@@ -44,7 +43,7 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///
@@ -105,12 +104,6 @@ pub(crate) enum IndexOperation {
operations: Vec<DocumentOperation>,
tasks: Vec<Task>,
},
DocumentDeletion {
index_uid: String,
// The vec associated with each document deletion tasks.
documents: Vec<Vec<String>>,
tasks: Vec<Task>,
},
IndexDocumentDeletionByFilter {
index_uid: String,
task: Task,
@@ -162,7 +155,6 @@ impl Batch {
}
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. }
| IndexOperation::DocumentDeletion { tasks, .. }
| IndexOperation::Settings { tasks, .. }
| IndexOperation::DocumentClear { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect()
@@ -227,7 +219,6 @@ impl IndexOperation {
pub fn index_uid(&self) -> &str {
match self {
IndexOperation::DocumentOperation { index_uid, .. }
| IndexOperation::DocumentDeletion { index_uid, .. }
| IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. }
| IndexOperation::DocumentClear { index_uid, .. }
| IndexOperation::Settings { index_uid, .. }
@@ -243,9 +234,6 @@ impl fmt::Display for IndexOperation {
IndexOperation::DocumentOperation { .. } => {
f.write_str("IndexOperation::DocumentOperation")
}
IndexOperation::DocumentDeletion { .. } => {
f.write_str("IndexOperation::DocumentDeletion")
}
IndexOperation::IndexDocumentDeletionByFilter { .. } => {
f.write_str("IndexOperation::IndexDocumentDeletionByFilter")
}
@@ -348,18 +336,27 @@ impl IndexScheduler {
BatchKind::DocumentDeletion { deletion_ids } => {
let tasks = self.get_existing_tasks(rtxn, deletion_ids)?;
let mut documents = Vec::new();
let mut operations = Vec::with_capacity(tasks.len());
let mut documents_counts = Vec::with_capacity(tasks.len());
for task in &tasks {
match task.kind {
KindWithContent::DocumentDeletion { ref documents_ids, .. } => {
documents.push(documents_ids.clone())
operations.push(DocumentOperation::Delete(documents_ids.clone()));
documents_counts.push(documents_ids.len() as u64);
}
_ => unreachable!(),
}
}
Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentDeletion { index_uid, documents, tasks },
op: IndexOperation::DocumentOperation {
index_uid,
primary_key: None,
method: IndexDocumentsMethod::ReplaceDocuments,
documents_counts,
operations,
tasks,
},
must_create_index,
}))
}
@@ -1204,7 +1201,7 @@ impl IndexScheduler {
index,
indexer_config,
config,
|indexing_step| debug!("update: {:?}", indexing_step),
|indexing_step| trace!("update: {:?}", indexing_step),
|| must_stop_processing.get(),
)?;
@@ -1251,7 +1248,8 @@ impl IndexScheduler {
let (new_builder, user_result) =
builder.remove_documents(document_ids)?;
builder = new_builder;
// Uses Invariant: remove documents actually always returns Ok for the inner result
let count = user_result.unwrap();
let provided_ids =
if let Some(Details::DocumentDeletion { provided_ids, .. }) =
task.details
@@ -1262,23 +1260,11 @@ impl IndexScheduler {
unreachable!();
};
match user_result {
Ok(count) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(count),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(0),
});
task.error = Some(milli::Error::from(e).into());
}
}
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(count),
});
}
}
}
@@ -1293,31 +1279,13 @@ impl IndexScheduler {
milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.reset_primary_key();
builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step),
|indexing_step| trace!("update: {:?}", indexing_step),
|| must_stop_processing.clone().get(),
)?;
}
Ok(tasks)
}
IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => {
let mut builder = milli::update::DeleteDocuments::new(index_wtxn, index)?;
documents.iter().flatten().for_each(|id| {
builder.delete_external_id(id);
});
let DocumentDeletionResult { deleted_documents, .. } = builder.execute()?;
for (task, documents) in tasks.iter_mut().zip(documents) {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletion {
provided_ids: documents.len(),
deleted_documents: Some(deleted_documents.min(documents.len() as u64)),
});
}
Ok(tasks)
}
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let filter =
if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } =
@@ -1327,7 +1295,13 @@ impl IndexScheduler {
} else {
unreachable!()
};
let deleted_documents = delete_document_by_filter(index_wtxn, filter, index);
let deleted_documents = delete_document_by_filter(
index_wtxn,
filter,
self.index_mapper.indexer_config(),
self.must_stop_processing.clone(),
index,
);
let original_filter = if let Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: _,
@@ -1561,6 +1535,8 @@ impl IndexScheduler {
fn delete_document_by_filter<'a>(
wtxn: &mut RwTxn<'a, '_>,
filter: &serde_json::Value,
indexer_config: &IndexerConfig,
must_stop_processing: MustStopProcessing,
index: &'a Index,
) -> Result<u64> {
let filter = Filter::from_json(filter)?;
@@ -1571,9 +1547,26 @@ fn delete_document_by_filter<'a>(
}
e => e.into(),
})?;
let mut delete_operation = DeleteDocuments::new(wtxn, index)?;
delete_operation.delete_documents(&candidates);
delete_operation.execute().map(|result| result.deleted_documents)?
let config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let mut builder = milli::update::IndexDocuments::new(
wtxn,
index,
indexer_config,
config,
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop_processing.get(),
)?;
let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?;
builder = new_builder;
let _ = builder.execute()?;
count
} else {
0
})