Introduce the searchable parameter settings to the Settings update

This commit is contained in:
Clément Renault
2020-11-03 13:20:11 +01:00
parent 68d783145b
commit e48630da72
5 changed files with 243 additions and 22 deletions

View File

@@ -308,7 +308,17 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
} }
} }
match builder.execute() { let result = builder.execute(|count, total| {
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
processed_number_of_documents: count,
total_number_of_documents: Some(total),
}
});
});
match result {
Ok(_count) => wtxn.commit().map_err(Into::into), Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e.into())
} }

View File

@@ -9,8 +9,10 @@ use bstr::ByteSlice as _;
use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType};
use heed::types::ByteSlice; use heed::types::ByteSlice;
use log::{debug, info, error}; use log::{debug, info, error};
use memmap::Mmap;
use rayon::prelude::*; use rayon::prelude::*;
use rayon::ThreadPool; use rayon::ThreadPool;
use crate::index::Index; use crate::index::Index;
use self::store::Store; use self::store::Store;
use self::merge_function::{ use self::merge_function::{
@@ -248,7 +250,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
R: io::Read, R: io::Read,
F: Fn(usize, usize) + Sync, F: Fn(usize, usize) + Sync,
{ {
let before_indexing = Instant::now(); let before_transform = Instant::now();
let transform = Transform { let transform = Transform {
rtxn: &self.wtxn, rtxn: &self.wtxn,
@@ -268,6 +270,17 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
UpdateFormat::JsonStream => transform.from_json_stream(reader)?, UpdateFormat::JsonStream => transform.from_json_stream(reader)?,
}; };
info!("Update transformed in {:.02?}", before_transform.elapsed());
self.execute_raw(output, progress_callback)
}
pub fn execute_raw<F>(self, output: TransformOutput, progress_callback: F) -> anyhow::Result<()>
where
F: Fn(usize, usize) + Sync
{
let before_indexing = Instant::now();
let TransformOutput { let TransformOutput {
primary_key, primary_key,
fields_ids_map, fields_ids_map,
@@ -296,16 +309,14 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
let _deleted_documents_count = deletion_builder.execute()?; let _deleted_documents_count = deletion_builder.execute()?;
} }
let mmap = if documents_count == 0 { let mmap;
None let bytes = if documents_count == 0 {
&[][..]
} else { } else {
let mmap = unsafe { mmap = unsafe { Mmap::map(&documents_file).context("mmaping the transform documents file")? };
memmap::Mmap::map(&documents_file).context("mmaping the transform documents file")? &mmap
};
Some(mmap)
}; };
let bytes = mmap.as_ref().map(AsRef::as_ref).unwrap_or_default();
let documents = grenad::Reader::new(bytes).unwrap(); let documents = grenad::Reader::new(bytes).unwrap();
// The enum which indicates the type of the readers // The enum which indicates the type of the readers
@@ -492,7 +503,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
} }
} }
info!("Update processed in {:.02?}", before_indexing.elapsed()); info!("Transform output indexed in {:.02?}", before_indexing.elapsed());
Ok(()) Ok(())
} }

View File

@@ -404,6 +404,59 @@ impl Transform<'_, '_> {
documents_file, documents_file,
}) })
} }
/// Returns a `TransformOutput` with a file that contains the documents of the index
/// with the attributes reordered accordingly to the `FieldsIdsMap` given as argument.
// TODO this can be done in parallel by using the rayon `ThreadPool`.
pub fn remap_index_documents(
self,
primary_key: u8,
fields_ids_map: FieldsIdsMap,
) -> anyhow::Result<TransformOutput>
{
let current_fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn)?;
let documents_ids = self.index.documents_ids(self.rtxn)?;
let documents_count = documents_ids.len() as usize;
// We create a final writer to write the new documents in order from the sorter.
let file = tempfile::tempfile()?;
let mut writer = create_writer(self.chunk_compression_type, self.chunk_compression_level, file)?;
let mut obkv_buffer = Vec::new();
for result in self.index.documents.iter(self.rtxn)? {
let (docid, obkv) = result?;
let docid = docid.get();
obkv_buffer.clear();
let mut obkv_writer = obkv::KvWriter::new(&mut obkv_buffer);
// We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv.
for (id, name) in fields_ids_map.iter() {
if let Some(val) = current_fields_ids_map.id(name).and_then(|id| obkv.get(id)) {
obkv_writer.insert(id, val)?;
}
}
let buffer = obkv_writer.into_inner()?;
writer.insert(docid.to_be_bytes(), buffer)?;
}
// Once we have written all the documents, we extract
// the file and reset the seek to be able to read it again.
let mut documents_file = writer.into_inner()?;
documents_file.seek(SeekFrom::Start(0))?;
Ok(TransformOutput {
primary_key,
fields_ids_map,
users_ids_documents_ids: users_ids_documents_ids.map_data(Cow::into_owned)?,
new_documents_ids: documents_ids,
replaced_documents_ids: RoaringBitmap::default(),
documents_count,
documents_file,
})
}
} }
/// Only the last value associated with an id is kept. /// Only the last value associated with an id is kept.

View File

@@ -1,17 +1,45 @@
use anyhow::Context; use anyhow::Context;
use crate::Index; use grenad::CompressionType;
use rayon::ThreadPool;
pub struct Settings<'t, 'u, 'i> { use crate::update::index_documents::{Transform, IndexDocumentsMethod};
use crate::update::{ClearDocuments, IndexDocuments};
use crate::{Index, FieldsIdsMap};
pub struct Settings<'a, 't, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>, wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index, index: &'i Index,
// If the field is set to `None` it means that it hasn't been set by the user, pub(crate) log_every_n: Option<usize>,
pub(crate) max_nb_chunks: Option<usize>,
pub(crate) max_memory: Option<usize>,
pub(crate) linked_hash_map_size: Option<usize>,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) thread_pool: Option<&'a ThreadPool>,
// If a struct field is set to `None` it means that it hasn't been set by the user,
// however if it is `Some(None)` it means that the user forced a reset of the setting. // however if it is `Some(None)` it means that the user forced a reset of the setting.
searchable_fields: Option<Option<Vec<String>>>,
displayed_fields: Option<Option<Vec<String>>>, displayed_fields: Option<Option<Vec<String>>>,
} }
impl<'t, 'u, 'i> Settings<'t, 'u, 'i> { impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'t, 'u, 'i> { pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'a, 't, 'u, 'i> {
Settings { wtxn, index, displayed_fields: None } Settings {
wtxn,
index,
log_every_n: None,
max_nb_chunks: None,
max_memory: None,
linked_hash_map_size: None,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
chunk_fusing_shrink_size: None,
thread_pool: None,
searchable_fields: None,
displayed_fields: None,
}
} }
pub fn reset_displayed_fields(&mut self) { pub fn reset_displayed_fields(&mut self) {
@@ -22,8 +50,116 @@ impl<'t, 'u, 'i> Settings<'t, 'u, 'i> {
self.displayed_fields = Some(Some(names)); self.displayed_fields = Some(Some(names));
} }
pub fn execute(self) -> anyhow::Result<()> { pub fn execute<F>(self, progress_callback: F) -> anyhow::Result<()>
// Check that the displayed attributes parameters has been specified. where
F: Fn(usize, usize) + Sync
{
// Check that the searchable attributes have been specified.
if let Some(value) = self.searchable_fields {
let current_searchable_fields = self.index.searchable_fields(self.wtxn)?;
let current_displayed_fields = self.index.displayed_fields(self.wtxn)?;
let current_fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
let result = match value {
Some(fields_names) => {
// We create or generate the fields ids corresponding to those names.
let mut fields_ids_map = FieldsIdsMap::new();
let mut searchable_fields = Vec::new();
for name in fields_names {
let id = fields_ids_map.insert(&name).context("field id limit reached")?;
searchable_fields.push(id);
}
// We complete the new FieldsIdsMap with the previous names.
for (_id, name) in current_fields_ids_map.iter() {
fields_ids_map.insert(name).context("field id limit reached")?;
}
// We must also update the displayed fields according to the new `FieldsIdsMap`.
let displayed_fields = match current_displayed_fields {
Some(fields) => {
let mut displayed_fields = Vec::new();
for id in fields {
let name = current_fields_ids_map.name(*id).unwrap();
let id = fields_ids_map.id(name).context("field id limit reached")?;
displayed_fields.push(id);
}
Some(displayed_fields)
},
None => None,
};
(fields_ids_map, Some(searchable_fields), displayed_fields)
},
None => (
current_fields_ids_map.clone(),
current_searchable_fields.map(ToOwned::to_owned),
current_displayed_fields.map(ToOwned::to_owned),
),
};
let (mut fields_ids_map, searchable_fields, displayed_fields) = result;
let transform = Transform {
rtxn: &self.wtxn,
index: self.index,
chunk_compression_type: self.chunk_compression_type,
chunk_compression_level: self.chunk_compression_level,
chunk_fusing_shrink_size: self.chunk_fusing_shrink_size,
max_nb_chunks: self.max_nb_chunks,
max_memory: self.max_memory,
index_documents_method: IndexDocumentsMethod::ReplaceDocuments,
autogenerate_docids: false,
};
// We compute or generate the new primary key field id.
let primary_key = match self.index.primary_key(&self.wtxn)? {
Some(id) => {
let name = current_fields_ids_map.name(id).unwrap();
fields_ids_map.insert(name).context("field id limit reached")?
},
None => fields_ids_map.insert("id").context("field id limit reached")?,
};
// We remap the documents fields based on the new `FieldsIdsMap`.
let output = transform.remap_index_documents(primary_key, fields_ids_map.clone())?;
// We write the new FieldsIdsMap to the database
// this way next indexing methods will be based on that.
self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?;
// The new searchable fields are also written down to make sure
// that the IndexDocuments system takes only these ones into account.
match searchable_fields {
Some(fields) => self.index.put_searchable_fields(self.wtxn, &fields)?,
None => self.index.delete_searchable_fields(self.wtxn).map(drop)?,
}
// We write the displayed fields into the database here
// to make sure that the right fields are displayed.
match displayed_fields {
Some(fields) => self.index.put_displayed_fields(self.wtxn, &fields)?,
None => self.index.delete_displayed_fields(self.wtxn).map(drop)?,
}
// We clear the full database (words-fst, documents ids and documents content).
ClearDocuments::new(self.wtxn, self.index).execute()?;
// We index the generated `TransformOutput` which must contain
// all the documents with fields in the newly defined searchable order.
let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index);
indexing_builder.log_every_n = self.log_every_n;
indexing_builder.max_nb_chunks = self.max_nb_chunks;
indexing_builder.max_memory = self.max_memory;
indexing_builder.linked_hash_map_size = self.linked_hash_map_size;
indexing_builder.chunk_compression_type = self.chunk_compression_type;
indexing_builder.chunk_compression_level = self.chunk_compression_level;
indexing_builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
indexing_builder.thread_pool = self.thread_pool;
indexing_builder.execute_raw(output, progress_callback)?;
}
// Check that the displayed attributes have been specified.
if let Some(value) = self.displayed_fields { if let Some(value) = self.displayed_fields {
match value { match value {
// If it has been set, and it was a list of fields names, we create // If it has been set, and it was a list of fields names, we create
@@ -99,7 +235,7 @@ mod tests {
// In the same transaction we change the displayed fields to be only the age. // In the same transaction we change the displayed fields to be only the age.
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index);
builder.set_displayed_fields(vec!["age".into()]); builder.set_displayed_fields(vec!["age".into()]);
builder.execute().unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to only the "age" field. // Check that the displayed fields are correctly set to only the "age" field.
@@ -114,7 +250,7 @@ mod tests {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index);
builder.reset_displayed_fields(); builder.reset_displayed_fields();
builder.execute().unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to `None` (default value). // Check that the displayed fields are correctly set to `None` (default value).

View File

@@ -103,8 +103,19 @@ impl<'a> UpdateBuilder<'a> {
self, self,
wtxn: &'t mut heed::RwTxn<'i, 'u>, wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index, index: &'i Index,
) -> Settings<'t, 'u, 'i> ) -> Settings<'a, 't, 'u, 'i>
{ {
Settings::new(wtxn, index) let mut builder = Settings::new(wtxn, index);
builder.log_every_n = self.log_every_n;
builder.max_nb_chunks = self.max_nb_chunks;
builder.max_memory = self.max_memory;
builder.linked_hash_map_size = self.linked_hash_map_size;
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
builder.thread_pool = self.thread_pool;
builder
} }
} }