mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-06-07 20:55:34 +00:00
Remove unused code
This commit is contained in:
parent
5dcd5d8797
commit
46dfa9f7c1
@ -1,29 +1,24 @@
|
|||||||
use std::cell::RefCell;
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::atomic::{self, AtomicBool, AtomicUsize};
|
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use std::thread::{self, Builder};
|
use std::thread::{self, Builder};
|
||||||
|
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
use bumpalo::Bump;
|
|
||||||
pub use document_changes::{extract, DocumentChanges, IndexingContext};
|
pub use document_changes::{extract, DocumentChanges, IndexingContext};
|
||||||
use document_changes::{DocumentChangeContext, Extractor};
|
|
||||||
use bumparaw_collections::RawMap;
|
|
||||||
pub use document_deletion::DocumentDeletion;
|
pub use document_deletion::DocumentDeletion;
|
||||||
pub use document_operation::{DocumentOperation, PayloadStats};
|
pub use document_operation::{DocumentOperation, PayloadStats};
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use heed::{RoTxn, RwTxn};
|
use heed::RwTxn;
|
||||||
pub use partial_dump::PartialDump;
|
pub use partial_dump::PartialDump;
|
||||||
pub use update_by_function::UpdateByFunction;
|
pub use update_by_function::UpdateByFunction;
|
||||||
use write::{build_vectors, update_index, write_to_db};
|
use write::{build_vectors, update_index, write_to_db};
|
||||||
use zstd::dict::{DecoderDictionary, EncoderDictionary};
|
use zstd::dict::DecoderDictionary;
|
||||||
|
|
||||||
use super::document::Document as _;
|
|
||||||
use super::extract::*;
|
use super::extract::*;
|
||||||
use super::ref_cell_ext::RefCellExt as _;
|
|
||||||
use super::steps::IndexingStep;
|
use super::steps::IndexingStep;
|
||||||
use super::thread_local::{FullySend, MostlySend, ThreadLocal};
|
use super::thread_local::ThreadLocal;
|
||||||
|
|
||||||
use super::{channel::*, DocumentChange};
|
use super::channel::*;
|
||||||
use crate::documents::PrimaryKey;
|
use crate::documents::PrimaryKey;
|
||||||
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
||||||
|
|
||||||
@ -134,7 +129,7 @@ where
|
|||||||
})
|
})
|
||||||
.unwrap()?;
|
.unwrap()?;
|
||||||
|
|
||||||
let mut index_embeddings = index.embedding_configs(wtxn)?;
|
let index_embeddings = index.embedding_configs(wtxn)?;
|
||||||
let mut field_distribution = index.field_distribution(wtxn)?;
|
let mut field_distribution = index.field_distribution(wtxn)?;
|
||||||
let mut document_ids = index.documents_ids(wtxn)?;
|
let mut document_ids = index.documents_ids(wtxn)?;
|
||||||
|
|
||||||
@ -235,137 +230,3 @@ where
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The compression level to use when compressing documents.
|
|
||||||
const DOCUMENT_COMPRESSION_LEVEL: i32 = 19;
|
|
||||||
/// The sample size used to generate the document compression dictionary.
|
|
||||||
const DOCUMENT_COMPRESSION_SAMPLE_SIZE: usize = 10_000;
|
|
||||||
/// The maximum size the document compression dictionary can be.
|
|
||||||
const DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE: usize = 64_000;
|
|
||||||
/// The maximum number of documents we accept to compress if they
|
|
||||||
/// weren't already compressed in the database. If this threshold
|
|
||||||
/// is reached we do not generate a dictionary and continue as is.
|
|
||||||
const DOCUMENT_COMPRESSION_COMPRESS_LIMIT: u64 = 5_000_000;
|
|
||||||
|
|
||||||
/// A function dedicated to use the existing or generate an appropriate
|
|
||||||
/// document compression dictionay based on the documents available in
|
|
||||||
/// the database and the ones in the payload.
|
|
||||||
///
|
|
||||||
/// If there are too many documents already in the database and no
|
|
||||||
/// compression dictionary we prefer not to generate a dictionary to avoid
|
|
||||||
/// compressing all of the documents and potentially blow up disk space.
|
|
||||||
fn compute_document_compression_dictionary<'pl, 'extractor, DC, MSP>(
|
|
||||||
index: &Index,
|
|
||||||
rtxn: &RoTxn<'_>,
|
|
||||||
document_changes: &DC,
|
|
||||||
indexing_context: IndexingContext<MSP>,
|
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
|
||||||
) -> Result<Option<EncoderDictionary<'static>>>
|
|
||||||
where
|
|
||||||
DC: DocumentChanges<'pl>,
|
|
||||||
MSP: Fn() -> bool + Sync,
|
|
||||||
{
|
|
||||||
match index.document_compression_raw_dictionary(rtxn)? {
|
|
||||||
Some(dict) => Ok(Some(EncoderDictionary::copy(dict, DOCUMENT_COMPRESSION_LEVEL))),
|
|
||||||
None if index.number_of_documents(rtxn)? >= DOCUMENT_COMPRESSION_COMPRESS_LIMIT => Ok(None),
|
|
||||||
None => {
|
|
||||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
|
||||||
let extractor = CompressorExtractor {
|
|
||||||
total_documents_to_extract: DOCUMENT_COMPRESSION_SAMPLE_SIZE,
|
|
||||||
extracted_documents_count: AtomicUsize::new(0),
|
|
||||||
};
|
|
||||||
|
|
||||||
todo!("collect the documents samples from the database first (or after)");
|
|
||||||
|
|
||||||
// This extraction only takes care about documents replacement
|
|
||||||
// and not update (merges). The merged documents are ignore as
|
|
||||||
// we will only use the previous version of them in the database.
|
|
||||||
extract(
|
|
||||||
document_changes,
|
|
||||||
&extractor,
|
|
||||||
indexing_context,
|
|
||||||
extractor_allocs,
|
|
||||||
&datastore,
|
|
||||||
IndexingStep::PreparingCompressionDictionary,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let mut sample_data = Vec::new();
|
|
||||||
let mut sample_sizes = Vec::new();
|
|
||||||
for data in datastore {
|
|
||||||
let CompressorExtractorData { buffer, must_stop: _ } = data.into_inner();
|
|
||||||
let mut subsample_size = 0;
|
|
||||||
for subsample in buffer {
|
|
||||||
sample_data.extend_from_slice(subsample);
|
|
||||||
subsample_size += subsample.len();
|
|
||||||
}
|
|
||||||
sample_sizes.push(subsample_size);
|
|
||||||
}
|
|
||||||
|
|
||||||
let dictionary = zstd::dict::from_continuous(
|
|
||||||
&sample_data,
|
|
||||||
&sample_sizes,
|
|
||||||
DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(Some(EncoderDictionary::copy(&dictionary, DOCUMENT_COMPRESSION_LEVEL)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct CompressorExtractor {
|
|
||||||
total_documents_to_extract: usize,
|
|
||||||
extracted_documents_count: AtomicUsize,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct CompressorExtractorData<'extractor> {
|
|
||||||
buffer: Vec<&'extractor [u8]>,
|
|
||||||
/// We extracted the expected count of documents, we can skip everything now.
|
|
||||||
must_stop: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<'extractor> MostlySend for RefCell<CompressorExtractorData<'extractor>> {}
|
|
||||||
|
|
||||||
impl<'extractor> Extractor<'extractor> for CompressorExtractor {
|
|
||||||
type Data = RefCell<CompressorExtractorData<'extractor>>;
|
|
||||||
|
|
||||||
fn init_data<'doc>(
|
|
||||||
&'doc self,
|
|
||||||
_extractor_alloc: &'extractor bumpalo::Bump,
|
|
||||||
) -> crate::Result<Self::Data> {
|
|
||||||
Ok(RefCell::new(CompressorExtractorData::default()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process<'doc>(
|
|
||||||
&'doc self,
|
|
||||||
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
|
|
||||||
context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>,
|
|
||||||
) -> crate::Result<()> {
|
|
||||||
let mut data = context.data.borrow_mut_or_yield();
|
|
||||||
|
|
||||||
for change in changes {
|
|
||||||
if data.must_stop {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let change = change?;
|
|
||||||
match change {
|
|
||||||
DocumentChange::Deletion(_) => (),
|
|
||||||
DocumentChange::Update(_) => (),
|
|
||||||
DocumentChange::Insertion(insertion) => {
|
|
||||||
for result in insertion.inserted().iter_top_level_fields() {
|
|
||||||
let (_field_name, raw_value) = result?;
|
|
||||||
let bytes = raw_value.get().as_bytes();
|
|
||||||
data.buffer.push(context.extractor_alloc.alloc_slice_copy(bytes));
|
|
||||||
}
|
|
||||||
|
|
||||||
let previous_count =
|
|
||||||
self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst);
|
|
||||||
data.must_stop = previous_count >= self.total_documents_to_extract;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user