mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-19 04:50:37 +00:00
Compare commits
18 Commits
feat/mcp-s
...
document-d
Author | SHA1 | Date | |
---|---|---|---|
83616bc03e | |||
bbbc4410ac | |||
46dfa9f7c1 | |||
5dcd5d8797 | |||
bc62cb0801 | |||
9109fbaeb0 | |||
78c9f67550 | |||
523733db0a | |||
6d7415a25f | |||
3a32a58d6c | |||
ecc7741212 | |||
d43ddd7205 | |||
0dcbd2fe07 | |||
df80aaefc9 | |||
afec94d1f3 | |||
e122970570 | |||
19b0bf7121 | |||
beef5b5f98 |
397
Cargo.lock
generated
397
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -150,7 +150,7 @@ fn main() {
|
||||
|
||||
// after executing a batch we check if the database is corrupted
|
||||
let res = index.search(&wtxn).execute().unwrap();
|
||||
index.documents(&wtxn, res.documents_ids).unwrap();
|
||||
index.compressed_documents(&wtxn, res.documents_ids).unwrap();
|
||||
progression.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
wtxn.abort();
|
||||
|
@ -128,6 +128,7 @@ impl IndexScheduler {
|
||||
let embedding_configs = index
|
||||
.embedding_configs(&rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
let decompression_dictionary = index.document_decompression_dictionary(&rtxn)?;
|
||||
|
||||
let nb_documents = index
|
||||
.number_of_documents(&rtxn)
|
||||
@ -135,8 +136,9 @@ impl IndexScheduler {
|
||||
as u32;
|
||||
let (atomic, update_document_progress) = AtomicDocumentStep::new(nb_documents);
|
||||
progress.update_progress(update_document_progress);
|
||||
let doc_alloc = bumpalo::Bump::new();
|
||||
let documents = index
|
||||
.all_documents(&rtxn)
|
||||
.all_compressed_documents(&rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
// 3.1. Dump the documents
|
||||
for ret in documents {
|
||||
@ -145,6 +147,10 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
let (id, doc) = ret.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
let doc = match decompression_dictionary.as_ref() {
|
||||
Some(dict) => doc.decompress_into_bump(&doc_alloc, dict)?,
|
||||
None => doc.as_non_compressed(),
|
||||
};
|
||||
|
||||
let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
|
@ -15,7 +15,7 @@ pub mod star_or;
|
||||
pub mod task_view;
|
||||
pub mod tasks;
|
||||
pub mod versioning;
|
||||
pub use milli::{heed, Index};
|
||||
pub use milli::{heed, zstd, Index};
|
||||
use uuid::Uuid;
|
||||
pub use versioning::VERSION_FILE_NAME;
|
||||
pub use {milli, serde_cs};
|
||||
|
@ -132,7 +132,7 @@ reqwest = { version = "0.12.12", features = [
|
||||
sha-1 = { version = "0.10.1", optional = true }
|
||||
static-files = { version = "0.2.4", optional = true }
|
||||
tempfile = { version = "3.15.0", optional = true }
|
||||
zip = { version = "2.2.2", optional = true }
|
||||
zip = { version = "2.2.2", default-features = false, features = ["deflate"], optional = true }
|
||||
|
||||
[features]
|
||||
default = ["meilisearch-types/all-tokenizations", "mini-dashboard"]
|
||||
|
@ -194,6 +194,7 @@ struct Infos {
|
||||
experimental_reduce_indexing_memory_usage: bool,
|
||||
experimental_max_number_of_batched_tasks: usize,
|
||||
experimental_limit_batched_tasks_total_size: u64,
|
||||
experimental_enable_document_compression: bool,
|
||||
gpu_enabled: bool,
|
||||
db_path: bool,
|
||||
import_dump: bool,
|
||||
@ -240,6 +241,7 @@ impl Infos {
|
||||
experimental_reduce_indexing_memory_usage,
|
||||
experimental_max_number_of_batched_tasks,
|
||||
experimental_limit_batched_tasks_total_size,
|
||||
experimental_enable_document_compression,
|
||||
http_addr,
|
||||
master_key: _,
|
||||
env,
|
||||
@ -299,6 +301,7 @@ impl Infos {
|
||||
experimental_replication_parameters,
|
||||
experimental_enable_logs_route: experimental_enable_logs_route | logs_route,
|
||||
experimental_reduce_indexing_memory_usage,
|
||||
experimental_enable_document_compression,
|
||||
gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(),
|
||||
db_path: db_path != PathBuf::from("./data.ms"),
|
||||
import_dump: import_dump.is_some(),
|
||||
|
@ -62,6 +62,8 @@ const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str =
|
||||
"MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS";
|
||||
const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE: &str =
|
||||
"MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_SIZE";
|
||||
const MEILI_EXPERIMENTAL_ENABLE_DOCUMENT_COMPRESSION: &str =
|
||||
"MEILI_EXPERIMENTAL_ENABLE_DOCUMENT_COMPRESSION";
|
||||
|
||||
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
|
||||
const DEFAULT_DB_PATH: &str = "./data.ms";
|
||||
@ -438,6 +440,11 @@ pub struct Opt {
|
||||
#[serde(default = "default_limit_batched_tasks_total_size")]
|
||||
pub experimental_limit_batched_tasks_total_size: u64,
|
||||
|
||||
/// Experimentally enable the document compression feature, see: <https://github.com/orgs/meilisearch/discussions/802>
|
||||
#[clap(long, env = MEILI_EXPERIMENTAL_ENABLE_DOCUMENT_COMPRESSION)]
|
||||
#[serde(default)]
|
||||
pub experimental_enable_document_compression: bool,
|
||||
|
||||
#[serde(flatten)]
|
||||
#[clap(flatten)]
|
||||
pub indexer_options: IndexerOpts,
|
||||
@ -540,6 +547,7 @@ impl Opt {
|
||||
experimental_reduce_indexing_memory_usage,
|
||||
experimental_max_number_of_batched_tasks,
|
||||
experimental_limit_batched_tasks_total_size,
|
||||
experimental_enable_document_compression,
|
||||
} = self;
|
||||
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
|
||||
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
|
||||
@ -628,6 +636,10 @@ impl Opt {
|
||||
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
|
||||
experimental_limit_batched_tasks_total_size.to_string(),
|
||||
);
|
||||
export_to_env_if_not_present(
|
||||
MEILI_EXPERIMENTAL_ENABLE_DOCUMENT_COMPRESSION,
|
||||
experimental_enable_document_compression.to_string(),
|
||||
);
|
||||
indexer_options.export_to_env();
|
||||
}
|
||||
|
||||
|
@ -1411,43 +1411,50 @@ fn some_documents<'a, 't: 'a>(
|
||||
retrieve_vectors: RetrieveVectors,
|
||||
) -> Result<impl Iterator<Item = Result<Document, ResponseError>> + 'a, ResponseError> {
|
||||
let fields_ids_map = index.fields_ids_map(rtxn)?;
|
||||
let dictionary = index.document_decompression_dictionary(rtxn)?;
|
||||
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||
let embedding_configs = index.embedding_configs(rtxn)?;
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
Ok(index.iter_documents(rtxn, doc_ids)?.map(move |ret| {
|
||||
ret.map_err(ResponseError::from).and_then(|(key, document)| -> Result<_, ResponseError> {
|
||||
let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?;
|
||||
match retrieve_vectors {
|
||||
RetrieveVectors::Hide => {
|
||||
document.remove("_vectors");
|
||||
}
|
||||
RetrieveVectors::Retrieve => {
|
||||
// Clippy is simply wrong
|
||||
#[allow(clippy::manual_unwrap_or_default)]
|
||||
let mut vectors = match document.remove("_vectors") {
|
||||
Some(Value::Object(map)) => map,
|
||||
_ => Default::default(),
|
||||
};
|
||||
for (name, vector) in index.embeddings(rtxn, key)? {
|
||||
let user_provided = embedding_configs
|
||||
.iter()
|
||||
.find(|conf| conf.name == name)
|
||||
.is_some_and(|conf| conf.user_provided.contains(key));
|
||||
let embeddings = ExplicitVectors {
|
||||
embeddings: Some(vector.into()),
|
||||
regenerate: !user_provided,
|
||||
};
|
||||
vectors.insert(
|
||||
name,
|
||||
serde_json::to_value(embeddings).map_err(MeilisearchHttpError::from)?,
|
||||
);
|
||||
Ok(index.iter_compressed_documents(rtxn, doc_ids)?.map(move |ret| {
|
||||
ret.map_err(ResponseError::from).and_then(
|
||||
|(key, compressed_document)| -> Result<_, ResponseError> {
|
||||
let document = compressed_document
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?;
|
||||
let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?;
|
||||
match retrieve_vectors {
|
||||
RetrieveVectors::Hide => {
|
||||
document.remove("_vectors");
|
||||
}
|
||||
RetrieveVectors::Retrieve => {
|
||||
// Clippy is simply wrong
|
||||
#[allow(clippy::manual_unwrap_or_default)]
|
||||
let mut vectors = match document.remove("_vectors") {
|
||||
Some(Value::Object(map)) => map,
|
||||
_ => Default::default(),
|
||||
};
|
||||
for (name, vector) in index.embeddings(rtxn, key)? {
|
||||
let user_provided = embedding_configs
|
||||
.iter()
|
||||
.find(|conf| conf.name == name)
|
||||
.is_some_and(|conf| conf.user_provided.contains(key));
|
||||
let embeddings = ExplicitVectors {
|
||||
embeddings: Some(vector.into()),
|
||||
regenerate: !user_provided,
|
||||
};
|
||||
vectors.insert(
|
||||
name,
|
||||
serde_json::to_value(embeddings)
|
||||
.map_err(MeilisearchHttpError::from)?,
|
||||
);
|
||||
}
|
||||
document.insert("_vectors".into(), vectors.into());
|
||||
}
|
||||
document.insert("_vectors".into(), vectors.into());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(document)
|
||||
})
|
||||
Ok(document)
|
||||
},
|
||||
)
|
||||
}))
|
||||
}
|
||||
|
||||
|
@ -1327,12 +1327,13 @@ impl<'a> HitMaker<'a> {
|
||||
}
|
||||
|
||||
pub fn make_hit(&self, id: u32, score: &[ScoreDetails]) -> milli::Result<SearchHit> {
|
||||
let (_, obkv) =
|
||||
self.index.iter_documents(self.rtxn, std::iter::once(id))?.next().unwrap()?;
|
||||
let mut buffer = Vec::new();
|
||||
let dict = self.index.document_decompression_dictionary(self.rtxn)?;
|
||||
let compressed = self.index.compressed_document(self.rtxn, id)?.unwrap();
|
||||
let doc = compressed.decompress_with_optional_dictionary(&mut buffer, dict.as_ref())?;
|
||||
|
||||
// First generate a document with all the displayed fields
|
||||
let displayed_document = make_document(&self.displayed_ids, &self.fields_ids_map, obkv)?;
|
||||
|
||||
let displayed_document = make_document(&self.displayed_ids, &self.fields_ids_map, doc)?;
|
||||
let add_vectors_fid =
|
||||
self.vectors_fid.filter(|_fid| self.retrieve_vectors == RetrieveVectors::Retrieve);
|
||||
|
||||
|
@ -280,6 +280,7 @@ fn export_a_dump(
|
||||
|
||||
// 4. Dump the indexes
|
||||
let mut count = 0;
|
||||
let mut buffer = Vec::new();
|
||||
for result in index_mapping.iter(&rtxn)? {
|
||||
let (uid, uuid) = result?;
|
||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||
@ -288,6 +289,7 @@ fn export_a_dump(
|
||||
})?;
|
||||
|
||||
let rtxn = index.read_txn()?;
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let metadata = IndexMetadata {
|
||||
uid: uid.to_owned(),
|
||||
primary_key: index.primary_key(&rtxn)?.map(String::from),
|
||||
@ -300,8 +302,11 @@ fn export_a_dump(
|
||||
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||
|
||||
// 4.1. Dump the documents
|
||||
for ret in index.all_documents(&rtxn)? {
|
||||
let (_id, doc) = ret?;
|
||||
for ret in index.all_compressed_documents(&rtxn)? {
|
||||
let (_id, compressed_doc) = ret?;
|
||||
let doc = compressed_doc
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?;
|
||||
index_dumper.push_document(&document)?;
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ heed = { version = "0.20.5", default-features = false, features = [
|
||||
indexmap = { version = "2.7.0", features = ["serde"] }
|
||||
json-depth-checker = { path = "../json-depth-checker" }
|
||||
levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] }
|
||||
zstd = { version = "0.13.1", features = ["zdict_builder", "experimental"] }
|
||||
memchr = "2.7.4"
|
||||
memmap2 = "0.9.5"
|
||||
obkv = "0.3.0"
|
||||
|
120
crates/milli/src/heed_codec/compressed_obkv_codec.rs
Normal file
120
crates/milli/src/heed_codec/compressed_obkv_codec.rs
Normal file
@ -0,0 +1,120 @@
|
||||
use std::borrow::Cow;
|
||||
use std::io;
|
||||
use std::io::ErrorKind;
|
||||
|
||||
use bumpalo::Bump;
|
||||
use heed::BoxedError;
|
||||
use obkv::KvReaderU16;
|
||||
use zstd::bulk::{Compressor, Decompressor};
|
||||
use zstd::dict::{DecoderDictionary, EncoderDictionary};
|
||||
|
||||
pub struct CompressedObkvCodec;
|
||||
|
||||
impl<'a> heed::BytesDecode<'a> for CompressedObkvCodec {
|
||||
type DItem = CompressedKvReaderU16<'a>;
|
||||
|
||||
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
|
||||
Ok(CompressedKvReaderU16(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl heed::BytesEncode<'_> for CompressedObkvCodec {
|
||||
type EItem = CompressedObkvU16;
|
||||
|
||||
fn bytes_encode(item: &Self::EItem) -> Result<Cow<[u8]>, BoxedError> {
|
||||
Ok(Cow::Borrowed(&item.0))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Make this an unsized slice wrapper instead?
|
||||
// &'a CompressedKvReaderU16([u8])
|
||||
pub struct CompressedKvReaderU16<'a>(&'a [u8]);
|
||||
|
||||
impl<'a> CompressedKvReaderU16<'a> {
|
||||
/// Decompresses the KvReader into the buffer using the provided dictionnary.
|
||||
pub fn decompress_with<'b>(
|
||||
&self,
|
||||
buffer: &'b mut Vec<u8>,
|
||||
dictionary: &DecoderDictionary,
|
||||
) -> io::Result<&'b KvReaderU16> {
|
||||
const TWO_GIGABYTES: usize = 2 * 1024 * 1024 * 1024;
|
||||
|
||||
let mut decompressor = Decompressor::with_prepared_dictionary(dictionary)?;
|
||||
let mut max_size = self.0.len() * 4;
|
||||
let size = loop {
|
||||
buffer.resize(max_size, 0);
|
||||
match decompressor.decompress_to_buffer(self.0, &mut buffer[..max_size]) {
|
||||
Ok(size) => break size,
|
||||
// TODO don't do that !!! But what should I do?
|
||||
Err(e) if e.kind() == ErrorKind::Other && max_size <= TWO_GIGABYTES => {
|
||||
max_size *= 2
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
};
|
||||
Ok(KvReaderU16::from_slice(&buffer[..size]))
|
||||
}
|
||||
|
||||
pub fn decompress_into_bump<'b>(
|
||||
&self,
|
||||
bump: &'b Bump,
|
||||
dictionary: &DecoderDictionary,
|
||||
) -> io::Result<&'b KvReaderU16> {
|
||||
let mut buffer = Vec::new();
|
||||
self.decompress_with(&mut buffer, dictionary)?;
|
||||
Ok(KvReaderU16::from_slice(bump.alloc_slice_copy(&buffer)))
|
||||
}
|
||||
|
||||
/// Returns the KvReader like it is not compressed.
|
||||
/// Happends when there is no dictionary yet.
|
||||
pub fn as_non_compressed(&self) -> &'a KvReaderU16 {
|
||||
KvReaderU16::from_slice(self.0)
|
||||
}
|
||||
|
||||
/// Decompresses this KvReader if necessary.
|
||||
pub fn decompress_with_optional_dictionary<'b>(
|
||||
&self,
|
||||
buffer: &'b mut Vec<u8>,
|
||||
dictionary: Option<&DecoderDictionary>,
|
||||
) -> io::Result<&'b KvReaderU16>
|
||||
where
|
||||
'a: 'b,
|
||||
{
|
||||
match dictionary {
|
||||
Some(dict) => self.decompress_with(buffer, dict),
|
||||
None => Ok(self.as_non_compressed()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_owned_with_dictionary(
|
||||
&self,
|
||||
dictionary: &DecoderDictionary<'_>,
|
||||
) -> io::Result<Box<KvReaderU16>> {
|
||||
let mut buffer = Vec::new();
|
||||
let reader = self.decompress_with(&mut buffer, dictionary)?;
|
||||
// Make sure the Vec is exactly the size of the reader
|
||||
let size = reader.as_bytes().len();
|
||||
buffer.resize(size, 0);
|
||||
Ok(buffer.into_boxed_slice().into())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CompressedObkvU16(Vec<u8>);
|
||||
|
||||
impl CompressedObkvU16 {
|
||||
pub fn with_dictionary(
|
||||
input: &KvReaderU16,
|
||||
dictionary: &EncoderDictionary,
|
||||
) -> io::Result<Self> {
|
||||
let mut compressor = Compressor::with_prepared_dictionary(dictionary)?;
|
||||
Self::with_compressor(input, &mut compressor)
|
||||
}
|
||||
|
||||
pub fn with_compressor(input: &KvReaderU16, compressor: &mut Compressor) -> io::Result<Self> {
|
||||
compressor.compress(input.as_bytes()).map(CompressedObkvU16)
|
||||
}
|
||||
|
||||
pub fn as_bytes(&self) -> &[u8] {
|
||||
&self.0
|
||||
}
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
mod beu16_str_codec;
|
||||
mod beu32_str_codec;
|
||||
mod byte_slice_ref;
|
||||
mod compressed_obkv_codec;
|
||||
pub mod facet;
|
||||
mod field_id_word_count_codec;
|
||||
mod fst_set_codec;
|
||||
@ -18,6 +19,9 @@ use thiserror::Error;
|
||||
|
||||
pub use self::beu16_str_codec::BEU16StrCodec;
|
||||
pub use self::beu32_str_codec::BEU32StrCodec;
|
||||
pub use self::compressed_obkv_codec::{
|
||||
CompressedKvReaderU16, CompressedObkvCodec, CompressedObkvU16,
|
||||
};
|
||||
pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
|
||||
pub use self::fst_set_codec::FstSetCodec;
|
||||
pub use self::obkv_codec::ObkvCodec;
|
||||
|
@ -9,6 +9,7 @@ use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified};
|
||||
use roaring::RoaringBitmap;
|
||||
use rstar::RTree;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use zstd::dict::{DecoderDictionary, EncoderDictionary};
|
||||
|
||||
use crate::constants::RESERVED_VECTORS_FIELD_NAME;
|
||||
use crate::documents::PrimaryKey;
|
||||
@ -18,14 +19,17 @@ use crate::heed_codec::facet::{
|
||||
FacetGroupKeyCodec, FacetGroupValueCodec, FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec,
|
||||
FieldIdCodec, OrderedF64Codec,
|
||||
};
|
||||
use crate::heed_codec::{BEU16StrCodec, FstSetCodec, StrBEU16Codec, StrRefCodec};
|
||||
use crate::heed_codec::{
|
||||
BEU16StrCodec, CompressedKvReaderU16, CompressedObkvCodec, FstSetCodec, StrBEU16Codec,
|
||||
StrRefCodec,
|
||||
};
|
||||
use crate::order_by_map::OrderByMap;
|
||||
use crate::proximity::ProximityPrecision;
|
||||
use crate::vector::{ArroyWrapper, Embedding, EmbeddingConfig};
|
||||
use crate::{
|
||||
default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds,
|
||||
FacetDistribution, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldIdWordCountCodec,
|
||||
FieldidsWeightsMap, GeoPoint, LocalizedAttributesRule, ObkvCodec, Result, RoaringBitmapCodec,
|
||||
FieldidsWeightsMap, GeoPoint, LocalizedAttributesRule, Result, RoaringBitmapCodec,
|
||||
RoaringBitmapLenCodec, Search, U8StrStrCodec, Weight, BEU16, BEU32, BEU64,
|
||||
};
|
||||
|
||||
@ -69,6 +73,7 @@ pub mod main_key {
|
||||
pub const PROXIMITY_PRECISION: &str = "proximity-precision";
|
||||
pub const EMBEDDING_CONFIGS: &str = "embedding_configs";
|
||||
pub const SEARCH_CUTOFF: &str = "search_cutoff";
|
||||
pub const DOCUMENT_COMPRESSION_DICTIONARY: &str = "document-compression-dictionary";
|
||||
pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules";
|
||||
pub const FACET_SEARCH: &str = "facet_search";
|
||||
pub const PREFIX_SEARCH: &str = "prefix_search";
|
||||
@ -167,7 +172,7 @@ pub struct Index {
|
||||
pub vector_arroy: arroy::Database<Unspecified>,
|
||||
|
||||
/// Maps the document id to the document as an obkv store.
|
||||
pub(crate) documents: Database<BEU32, ObkvCodec>,
|
||||
pub(crate) documents: Database<BEU32, CompressedObkvCodec>,
|
||||
}
|
||||
|
||||
impl Index {
|
||||
@ -331,6 +336,50 @@ impl Index {
|
||||
self.env.prepare_for_closing()
|
||||
}
|
||||
|
||||
/* document compression dictionary */
|
||||
|
||||
/// Writes the dictionnary that will further be used to compress the documents.
|
||||
pub fn put_document_compression_dictionary(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
dictionary: &[u8],
|
||||
) -> heed::Result<()> {
|
||||
self.main.remap_types::<Str, Bytes>().put(
|
||||
wtxn,
|
||||
main_key::DOCUMENT_COMPRESSION_DICTIONARY,
|
||||
dictionary,
|
||||
)
|
||||
}
|
||||
|
||||
/// Deletes the document compression dictionary.
|
||||
pub fn delete_document_compression_dictionary(&self, wtxn: &mut RwTxn) -> heed::Result<bool> {
|
||||
self.main.remap_key_type::<Str>().delete(wtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY)
|
||||
}
|
||||
|
||||
/// Returns the optional raw bytes dictionary to be used when reading or writing the OBKV documents.
|
||||
pub fn document_compression_raw_dictionary<'t>(
|
||||
&self,
|
||||
rtxn: &'t RoTxn,
|
||||
) -> heed::Result<Option<&'t [u8]>> {
|
||||
self.main.remap_types::<Str, Bytes>().get(rtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY)
|
||||
}
|
||||
|
||||
pub fn document_decompression_dictionary<'t>(
|
||||
&self,
|
||||
rtxn: &'t RoTxn,
|
||||
) -> heed::Result<Option<DecoderDictionary<'t>>> {
|
||||
self.document_compression_raw_dictionary(rtxn).map(|opt| opt.map(DecoderDictionary::new))
|
||||
}
|
||||
|
||||
pub fn document_compression_dictionary(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
) -> heed::Result<Option<EncoderDictionary<'static>>> {
|
||||
const COMPRESSION_LEVEL: i32 = 19;
|
||||
self.document_compression_raw_dictionary(rtxn)
|
||||
.map(|opt| opt.map(|bytes| EncoderDictionary::copy(bytes, COMPRESSION_LEVEL)))
|
||||
}
|
||||
|
||||
/* documents ids */
|
||||
|
||||
/// Writes the documents ids that corresponds to the user-ids-documents-ids FST.
|
||||
@ -1258,43 +1307,42 @@ impl Index {
|
||||
/* documents */
|
||||
|
||||
/// Returns a document by using the document id.
|
||||
pub fn document<'t>(&self, rtxn: &'t RoTxn, id: DocumentId) -> Result<&'t obkv::KvReaderU16> {
|
||||
self.documents
|
||||
.get(rtxn, &id)?
|
||||
.ok_or(UserError::UnknownInternalDocumentId { document_id: id })
|
||||
.map_err(Into::into)
|
||||
pub fn compressed_document<'t>(
|
||||
&self,
|
||||
rtxn: &'t RoTxn,
|
||||
id: DocumentId,
|
||||
) -> Result<Option<CompressedKvReaderU16<'t>>> {
|
||||
self.documents.get(rtxn, &id).map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Returns an iterator over the requested documents. The next item will be an error if a document is missing.
|
||||
pub fn iter_documents<'a, 't: 'a>(
|
||||
/// Returns an iterator over the requested compressed documents. The next item will be an error if a document is missing.
|
||||
pub fn iter_compressed_documents<'a, 't: 'a>(
|
||||
&'a self,
|
||||
rtxn: &'t RoTxn<'t>,
|
||||
ids: impl IntoIterator<Item = DocumentId> + 'a,
|
||||
) -> Result<impl Iterator<Item = Result<(DocumentId, &'t obkv::KvReaderU16)>> + 'a> {
|
||||
Ok(ids.into_iter().map(move |id| {
|
||||
let kv = self
|
||||
.documents
|
||||
.get(rtxn, &id)?
|
||||
.ok_or(UserError::UnknownInternalDocumentId { document_id: id })?;
|
||||
Ok((id, kv))
|
||||
) -> Result<impl Iterator<Item = Result<(DocumentId, CompressedKvReaderU16<'t>)>> + 'a> {
|
||||
Ok(ids.into_iter().flat_map(move |id| {
|
||||
self.compressed_document(rtxn, id)
|
||||
.map(|opt| opt.map(|compressed| (id, compressed)))
|
||||
.transpose()
|
||||
}))
|
||||
}
|
||||
|
||||
/// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing.
|
||||
pub fn documents<'t>(
|
||||
pub fn compressed_documents<'t>(
|
||||
&self,
|
||||
rtxn: &'t RoTxn<'t>,
|
||||
ids: impl IntoIterator<Item = DocumentId>,
|
||||
) -> Result<Vec<(DocumentId, &'t obkv::KvReaderU16)>> {
|
||||
self.iter_documents(rtxn, ids)?.collect()
|
||||
) -> Result<Vec<(DocumentId, CompressedKvReaderU16<'t>)>> {
|
||||
self.iter_compressed_documents(rtxn, ids)?.collect()
|
||||
}
|
||||
|
||||
/// Returns an iterator over all the documents in the index.
|
||||
pub fn all_documents<'a, 't: 'a>(
|
||||
pub fn all_compressed_documents<'a, 't: 'a>(
|
||||
&'a self,
|
||||
rtxn: &'t RoTxn<'t>,
|
||||
) -> Result<impl Iterator<Item = Result<(DocumentId, &'t obkv::KvReaderU16)>> + 'a> {
|
||||
self.iter_documents(rtxn, self.documents_ids(rtxn)?)
|
||||
) -> Result<impl Iterator<Item = Result<(DocumentId, CompressedKvReaderU16<'t>)>> + 'a> {
|
||||
self.iter_compressed_documents(rtxn, self.documents_ids(rtxn)?)
|
||||
}
|
||||
|
||||
pub fn external_id_of<'a, 't: 'a>(
|
||||
@ -1315,8 +1363,13 @@ impl Index {
|
||||
process: "external_id_of",
|
||||
})
|
||||
})?;
|
||||
Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> {
|
||||
let (_docid, obkv) = entry?;
|
||||
let dictionary =
|
||||
self.document_compression_raw_dictionary(rtxn)?.map(DecoderDictionary::copy);
|
||||
let mut buffer = Vec::new();
|
||||
Ok(self.iter_compressed_documents(rtxn, ids)?.map(move |entry| -> Result<_> {
|
||||
let (_docid, compressed_obkv) = entry?;
|
||||
let obkv = compressed_obkv
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?;
|
||||
match primary_key.document_id(obkv, &fields)? {
|
||||
Ok(document_id) => Ok(document_id),
|
||||
Err(_) => Err(InternalError::DocumentsError(
|
||||
@ -2625,7 +2678,12 @@ pub(crate) mod tests {
|
||||
"###);
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let (_docid, obkv) = index.documents(&rtxn, [0]).unwrap()[0];
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [0]).unwrap().remove(0);
|
||||
let mut buffer = Vec::new();
|
||||
let obkv = compressed_obkv
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap();
|
||||
insta::assert_debug_snapshot!(json, @r###"
|
||||
{
|
||||
@ -2634,7 +2692,10 @@ pub(crate) mod tests {
|
||||
"###);
|
||||
|
||||
// Furthermore, when we retrieve document 34, it is not the result of merging 35 with 34
|
||||
let (_docid, obkv) = index.documents(&rtxn, [2]).unwrap()[0];
|
||||
let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [2]).unwrap().remove(0);
|
||||
let obkv = compressed_obkv
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap();
|
||||
insta::assert_debug_snapshot!(json, @r###"
|
||||
{
|
||||
@ -2643,6 +2704,7 @@ pub(crate) mod tests {
|
||||
}
|
||||
"###);
|
||||
|
||||
drop(dictionary);
|
||||
drop(rtxn);
|
||||
|
||||
// Add new documents again
|
||||
@ -2841,11 +2903,16 @@ pub(crate) mod tests {
|
||||
} = search.execute().unwrap();
|
||||
let primary_key_id = index.fields_ids_map(&rtxn).unwrap().id("primary_key").unwrap();
|
||||
documents_ids.sort_unstable();
|
||||
let docs = index.documents(&rtxn, documents_ids).unwrap();
|
||||
let compressed_docs = index.compressed_documents(&rtxn, documents_ids).unwrap();
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let mut buffer = Vec::new();
|
||||
let mut all_ids = HashSet::new();
|
||||
for (_docid, obkv) in docs {
|
||||
let id = obkv.get(primary_key_id).unwrap();
|
||||
assert!(all_ids.insert(id));
|
||||
for (_docid, compressed) in compressed_docs {
|
||||
let doc = compressed
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
let id = doc.get(primary_key_id).unwrap();
|
||||
assert!(all_ids.insert(id.to_vec()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,7 +48,7 @@ pub use search::new::{
|
||||
};
|
||||
use serde_json::Value;
|
||||
pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||
pub use {charabia as tokenizer, heed, rhai};
|
||||
pub use {charabia as tokenizer, heed, rhai, zstd};
|
||||
|
||||
pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError};
|
||||
pub use self::criterion::{default_criteria, Criterion, CriterionError};
|
||||
|
@ -25,8 +25,13 @@ fn collect_field_values(
|
||||
) -> Vec<String> {
|
||||
let mut values = vec![];
|
||||
let fid = index.fields_ids_map(txn).unwrap().id(fid).unwrap();
|
||||
for doc in index.documents(txn, docids.iter().copied()).unwrap() {
|
||||
if let Some(v) = doc.1.get(fid) {
|
||||
let mut buffer = Vec::new();
|
||||
let dictionary = index.document_decompression_dictionary(txn).unwrap();
|
||||
for (_id, compressed_doc) in index.compressed_documents(txn, docids.iter().copied()).unwrap() {
|
||||
let doc = compressed_doc
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
if let Some(v) = doc.get(fid) {
|
||||
let v: serde_json::Value = serde_json::from_slice(v).unwrap();
|
||||
let v = v.to_string();
|
||||
values.push(v);
|
||||
|
@ -407,9 +407,15 @@ pub fn snap_documents(index: &Index) -> String {
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
||||
let display = fields_ids_map.ids().collect::<Vec<_>>();
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
for document in index.all_documents(&rtxn).unwrap() {
|
||||
let doc = obkv_to_json(&display, &fields_ids_map, document.unwrap().1).unwrap();
|
||||
for result in index.all_compressed_documents(&rtxn).unwrap() {
|
||||
let (_id, compressed_document) = result.unwrap();
|
||||
let document = compressed_document
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
let doc = obkv_to_json(&display, &fields_ids_map, document).unwrap();
|
||||
snap.push_str(&serde_json::to_string(&doc).unwrap());
|
||||
snap.push('\n');
|
||||
}
|
||||
|
@ -62,6 +62,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
|
||||
self.index.put_field_distribution(self.wtxn, &FieldDistribution::default())?;
|
||||
self.index.delete_geo_rtree(self.wtxn)?;
|
||||
self.index.delete_geo_faceted_documents_ids(self.wtxn)?;
|
||||
self.index.delete_document_compression_dictionary(self.wtxn)?;
|
||||
|
||||
// Remove all user-provided bits from the configs
|
||||
let mut configs = self.index.embedding_configs(self.wtxn)?;
|
||||
|
@ -4,8 +4,8 @@ mod helpers;
|
||||
mod transform;
|
||||
mod typed_chunk;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::io::{Read, Seek};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io::{BufWriter, Read, Seek, Write};
|
||||
use std::iter;
|
||||
use std::num::NonZeroU32;
|
||||
use std::sync::Arc;
|
||||
@ -13,9 +13,8 @@ use std::sync::Arc;
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use enrich::enrich_documents_batch;
|
||||
use grenad::{Merger, MergerBuilder};
|
||||
use hashbrown::HashMap;
|
||||
use heed::types::Str;
|
||||
use heed::Database;
|
||||
use heed::types::{Bytes, Str};
|
||||
use heed::{Database, PutFlags};
|
||||
use rand::SeedableRng as _;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -28,7 +27,8 @@ pub use self::helpers::*;
|
||||
pub use self::transform::{Transform, TransformOutput};
|
||||
use super::new::StdResult;
|
||||
use crate::documents::{obkv_to_object, DocumentsBatchReader};
|
||||
use crate::error::{Error, InternalError};
|
||||
use crate::error::{Error, InternalError, UserError};
|
||||
use crate::heed_codec::{CompressedObkvCodec, CompressedObkvU16};
|
||||
use crate::index::{PrefixSearch, PrefixSettings};
|
||||
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
|
||||
pub use crate::update::index_documents::helpers::CursorClonableMmap;
|
||||
@ -36,7 +36,7 @@ use crate::update::{
|
||||
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
|
||||
};
|
||||
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
|
||||
use crate::{CboRoaringBitmapCodec, Index, Result, UserError};
|
||||
use crate::{CboRoaringBitmapCodec, Index, Result, BEU32};
|
||||
|
||||
static MERGED_DATABASE_COUNT: usize = 7;
|
||||
static PREFIX_DATABASE_COUNT: usize = 4;
|
||||
@ -201,7 +201,7 @@ where
|
||||
target = "indexing::details",
|
||||
name = "index_documents_raw"
|
||||
)]
|
||||
pub fn execute_raw(self, output: TransformOutput) -> Result<u64>
|
||||
pub fn execute_raw(mut self, output: TransformOutput) -> Result<u64>
|
||||
where
|
||||
FP: Fn(UpdateIndexingStep) + Sync,
|
||||
FA: Fn() -> bool + Sync,
|
||||
@ -523,6 +523,10 @@ where
|
||||
word_fid_docids.map(MergerBuilder::build),
|
||||
)?;
|
||||
|
||||
// This call contains an internal condition to ensure we do not always
|
||||
// generate compression dictionaries and always compress documents.
|
||||
self.manage_compression_dictionary()?;
|
||||
|
||||
Ok(number_of_documents)
|
||||
}
|
||||
|
||||
@ -533,7 +537,7 @@ where
|
||||
name = "index_documents_prefix_databases"
|
||||
)]
|
||||
pub fn execute_prefix_databases(
|
||||
self,
|
||||
&mut self,
|
||||
word_docids: Option<Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>>,
|
||||
exact_word_docids: Option<Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>>,
|
||||
word_position_docids: Option<Merger<CursorClonableMmap, MergeDeladdCboRoaringBitmaps>>,
|
||||
@ -723,6 +727,64 @@ where
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Computes a new dictionay and compress the documents with it in the database.
|
||||
///
|
||||
/// Documents still need to be directly compressed when being written in the database and a dictionary exists.
|
||||
#[tracing::instrument(
|
||||
level = "trace",
|
||||
skip_all,
|
||||
target = "indexing::compression",
|
||||
name = "compress_documents_database"
|
||||
)]
|
||||
pub fn manage_compression_dictionary(&mut self) -> Result<()> {
|
||||
/// The size of the dictionary generated from a sample of the documents already
|
||||
/// in the database. It will be used when compressing and decompressing documents.
|
||||
const COMPRESSION_DICTIONARY_SIZE: usize = 64_000;
|
||||
/// The minimum number of documents to trigger the generation of the compression dictionary.
|
||||
const COMPRESSION_ON_NUMBER_OF_DOCUMENTS: usize = 10_000;
|
||||
|
||||
if self.index.number_of_documents(self.wtxn)? < COMPRESSION_ON_NUMBER_OF_DOCUMENTS as u64
|
||||
|| self.index.document_compression_dictionary(self.wtxn)?.is_some()
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut sample_file = tempfile::tempfile().map(BufWriter::new)?;
|
||||
let mut sample_sizes = Vec::new();
|
||||
// TODO make this 1_000 be 10k and const
|
||||
let documents = self.index.documents.remap_types::<BEU32, Bytes>();
|
||||
for result in documents.iter(self.wtxn)?.take(COMPRESSION_ON_NUMBER_OF_DOCUMENTS) {
|
||||
let (_id, bytes) = result?;
|
||||
sample_file.write_all(bytes)?;
|
||||
sample_sizes.push(bytes.len());
|
||||
}
|
||||
|
||||
let sample_file = sample_file.into_inner().map_err(|ie| ie.into_error())?;
|
||||
let sample_data = unsafe { memmap2::Mmap::map(&sample_file)? };
|
||||
let dictionary =
|
||||
zstd::dict::from_continuous(&sample_data, &sample_sizes, COMPRESSION_DICTIONARY_SIZE)?;
|
||||
self.index.put_document_compression_dictionary(self.wtxn, &dictionary)?;
|
||||
// safety: We just set the dictionary above. It must be there when we get it back.
|
||||
let dictionary = self.index.document_compression_dictionary(self.wtxn)?.unwrap();
|
||||
|
||||
let mut iter = self.index.documents.iter_mut(self.wtxn)?;
|
||||
while let Some(result) = iter.next() {
|
||||
let (docid, document) = result?;
|
||||
let document = document.as_non_compressed();
|
||||
let compressed = CompressedObkvU16::with_dictionary(document, &dictionary)?;
|
||||
// safety: the compressed document is entirely owned
|
||||
unsafe {
|
||||
iter.put_current_with_options::<CompressedObkvCodec>(
|
||||
PutFlags::empty(),
|
||||
&docid,
|
||||
&compressed,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the word prefix docids update operation.
|
||||
@ -814,7 +876,7 @@ mod tests {
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let count = index.number_of_documents(&rtxn).unwrap();
|
||||
assert_eq!(count, 3);
|
||||
let count = index.all_documents(&rtxn).unwrap().count();
|
||||
let count = index.all_compressed_documents(&rtxn).unwrap().count();
|
||||
assert_eq!(count, 3);
|
||||
|
||||
drop(rtxn);
|
||||
@ -823,6 +885,7 @@ mod tests {
|
||||
#[test]
|
||||
fn simple_document_merge() {
|
||||
let mut index = TempIndex::new();
|
||||
let mut buffer = Vec::new();
|
||||
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
||||
|
||||
// First we send 3 documents with duplicate ids and
|
||||
@ -841,16 +904,21 @@ mod tests {
|
||||
assert_eq!(count, 1);
|
||||
|
||||
// Check that we get only one document from the database.
|
||||
let docs = index.documents(&rtxn, Some(0)).unwrap();
|
||||
assert_eq!(docs.len(), 1);
|
||||
let (id, doc) = docs[0];
|
||||
let mut compressed_docs = index.compressed_documents(&rtxn, Some(0)).unwrap();
|
||||
assert_eq!(compressed_docs.len(), 1);
|
||||
let (id, compressed_doc) = compressed_docs.remove(0);
|
||||
assert_eq!(id, 0);
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let doc = compressed_doc
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
|
||||
// Check that this document is equal to the last one sent.
|
||||
let mut doc_iter = doc.iter();
|
||||
assert_eq!(doc_iter.next(), Some((0, &b"1"[..])));
|
||||
assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..])));
|
||||
assert_eq!(doc_iter.next(), None);
|
||||
drop(dictionary);
|
||||
drop(rtxn);
|
||||
|
||||
// Second we send 1 document with id 1, to force it to be merged with the previous one.
|
||||
@ -862,10 +930,14 @@ mod tests {
|
||||
assert_eq!(count, 1);
|
||||
|
||||
// Check that we get only one document from the database.
|
||||
let docs = index.documents(&rtxn, Some(0)).unwrap();
|
||||
assert_eq!(docs.len(), 1);
|
||||
let (id, doc) = docs[0];
|
||||
let mut compressed_docs = index.compressed_documents(&rtxn, Some(0)).unwrap();
|
||||
assert_eq!(compressed_docs.len(), 1);
|
||||
let (id, compressed_doc) = compressed_docs.remove(0);
|
||||
assert_eq!(id, 0);
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let doc = compressed_doc
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
|
||||
// Check that this document is equal to the last one sent.
|
||||
let mut doc_iter = doc.iter();
|
||||
@ -873,6 +945,7 @@ mod tests {
|
||||
assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..])));
|
||||
assert_eq!(doc_iter.next(), Some((2, &b"25"[..])));
|
||||
assert_eq!(doc_iter.next(), None);
|
||||
drop(dictionary);
|
||||
drop(rtxn);
|
||||
}
|
||||
|
||||
@ -974,7 +1047,7 @@ mod tests {
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let count = index.number_of_documents(&rtxn).unwrap();
|
||||
assert_eq!(count, 6);
|
||||
let count = index.all_documents(&rtxn).unwrap().count();
|
||||
let count = index.all_compressed_documents(&rtxn).unwrap().count();
|
||||
assert_eq!(count, 6);
|
||||
|
||||
db_snap!(index, word_docids, "updated");
|
||||
@ -1392,7 +1465,7 @@ mod tests {
|
||||
index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap();
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let all_documents_count = index.all_documents(&rtxn).unwrap().count();
|
||||
let all_documents_count = index.all_compressed_documents(&rtxn).unwrap().count();
|
||||
assert_eq!(all_documents_count, 1);
|
||||
let external_documents_ids = index.external_documents_ids();
|
||||
assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some());
|
||||
@ -2844,7 +2917,7 @@ mod tests {
|
||||
// Ensuring all the returned IDs actually exists
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let res = index.search(&rtxn).execute().unwrap();
|
||||
index.documents(&rtxn, res.documents_ids).unwrap();
|
||||
index.compressed_documents(&rtxn, res.documents_ids).unwrap();
|
||||
}
|
||||
|
||||
fn delete_documents<'t>(
|
||||
@ -3223,7 +3296,7 @@ mod tests {
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
// list all documents
|
||||
let results = index.all_documents(&rtxn).unwrap();
|
||||
let results = index.all_compressed_documents(&rtxn).unwrap();
|
||||
for result in results {
|
||||
let (id, _) = result.unwrap();
|
||||
assert!(
|
||||
|
@ -174,10 +174,12 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
let external_documents_ids = self.index.external_documents_ids();
|
||||
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
|
||||
|
||||
let dictionary = self.index.document_decompression_dictionary(wtxn)?;
|
||||
let primary_key = cursor.primary_key().to_string();
|
||||
let primary_key_id =
|
||||
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
|
||||
|
||||
let mut decompression_buffer = Vec::new();
|
||||
let mut obkv_buffer = Vec::new();
|
||||
let mut document_sorter_value_buffer = Vec::new();
|
||||
let mut document_sorter_key_buffer = Vec::new();
|
||||
@ -253,18 +255,17 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
let mut skip_insertion = false;
|
||||
if let Some(original_docid) = original_docid {
|
||||
let original_key = original_docid;
|
||||
let base_obkv = self
|
||||
.index
|
||||
.documents
|
||||
.remap_data_type::<heed::types::Bytes>()
|
||||
.get(wtxn, &original_key)?
|
||||
.ok_or(InternalError::DatabaseMissingEntry {
|
||||
db_name: db_name::DOCUMENTS,
|
||||
key: None,
|
||||
})?;
|
||||
let base_compressed_obkv = self.index.documents.get(wtxn, &original_key)?.ok_or(
|
||||
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
||||
)?;
|
||||
|
||||
let base_obkv = base_compressed_obkv.decompress_with_optional_dictionary(
|
||||
&mut decompression_buffer,
|
||||
dictionary.as_ref(),
|
||||
)?;
|
||||
|
||||
// we check if the two documents are exactly equal. If it's the case we can skip this document entirely
|
||||
if base_obkv == obkv_buffer {
|
||||
if base_obkv.as_bytes() == obkv_buffer {
|
||||
// we're not replacing anything
|
||||
self.replaced_documents_ids.remove(original_docid);
|
||||
// and we need to put back the original id as it was before
|
||||
@ -284,13 +285,12 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
document_sorter_value_buffer.clear();
|
||||
document_sorter_value_buffer.push(Operation::Addition as u8);
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::from_slice(base_obkv),
|
||||
base_obkv,
|
||||
deladd_operation,
|
||||
&mut document_sorter_value_buffer,
|
||||
)?;
|
||||
self.original_sorter
|
||||
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||
let base_obkv = KvReader::from_slice(base_obkv);
|
||||
if let Some(flattened_obkv) =
|
||||
Self::flatten_from_fields_ids_map(base_obkv, &mut self.fields_ids_map)?
|
||||
{
|
||||
@ -354,9 +354,12 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
documents_seen: documents_count,
|
||||
});
|
||||
|
||||
drop(dictionary);
|
||||
|
||||
self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?;
|
||||
self.index.put_primary_key(wtxn, &primary_key)?;
|
||||
self.documents_count += documents_count;
|
||||
|
||||
// Now that we have a valid sorter that contains the user id and the obkv we
|
||||
// give it to the last transforming function which returns the TransformOutput.
|
||||
Ok(documents_count)
|
||||
@ -857,15 +860,21 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
|
||||
if original_sorter.is_some() || flattened_sorter.is_some() {
|
||||
let modified_faceted_fields = settings_diff.modified_faceted_fields();
|
||||
let dictionary = self.index.document_decompression_dictionary(wtxn)?;
|
||||
|
||||
let mut original_obkv_buffer = Vec::new();
|
||||
let mut flattened_obkv_buffer = Vec::new();
|
||||
let mut document_sorter_key_buffer = Vec::new();
|
||||
let mut buffer = Vec::new();
|
||||
for result in self.index.external_documents_ids().iter(wtxn)? {
|
||||
let (external_id, docid) = result?;
|
||||
let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
|
||||
let old_compressed_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
|
||||
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
||||
)?;
|
||||
|
||||
let old_obkv = old_compressed_obkv
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?;
|
||||
|
||||
let injected_vectors: std::result::Result<
|
||||
serde_json::Map<String, serde_json::Value>,
|
||||
arroy::Error,
|
||||
|
@ -7,7 +7,7 @@ use bytemuck::allocation::pod_collect_to_vec;
|
||||
use grenad::{MergeFunction, Merger, MergerBuilder};
|
||||
use heed::types::Bytes;
|
||||
use heed::{BytesDecode, RwTxn};
|
||||
use obkv::{KvReader, KvWriter};
|
||||
use obkv::{KvReader, KvReaderU16, KvWriter};
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use super::helpers::{
|
||||
@ -17,6 +17,7 @@ use super::helpers::{
|
||||
};
|
||||
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
|
||||
use crate::facet::FacetType;
|
||||
use crate::heed_codec::CompressedObkvU16;
|
||||
use crate::index::db_name::DOCUMENTS;
|
||||
use crate::index::IndexEmbeddingConfig;
|
||||
use crate::proximity::MAX_DISTANCE;
|
||||
@ -158,6 +159,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
.into_iter()
|
||||
.map(|IndexEmbeddingConfig { name, .. }| name)
|
||||
.collect();
|
||||
let dictionary = index.document_compression_dictionary(wtxn)?;
|
||||
let mut vectors_buffer = Vec::new();
|
||||
while let Some((key, reader)) = iter.next()? {
|
||||
let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
|
||||
@ -207,7 +209,15 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
let db = index.documents.remap_data_type::<Bytes>();
|
||||
|
||||
if !writer.is_empty() {
|
||||
db.put(wtxn, &docid, &writer.into_inner().unwrap())?;
|
||||
let uncompressed_document_bytes = writer.into_inner().unwrap();
|
||||
match dictionary.as_ref() {
|
||||
Some(dictionary) => {
|
||||
let doc = KvReaderU16::from_slice(&uncompressed_document_bytes);
|
||||
let compressed = CompressedObkvU16::with_dictionary(doc, dictionary)?;
|
||||
db.put(wtxn, &docid, compressed.as_bytes())?
|
||||
}
|
||||
None => db.put(wtxn, &docid, &uncompressed_document_bytes)?,
|
||||
}
|
||||
operations.push(DocumentOperation {
|
||||
external_id: external_id.to_string(),
|
||||
internal_id: docid,
|
||||
|
@ -21,6 +21,7 @@ use super::ref_cell_ext::RefCellExt;
|
||||
use super::thread_local::{FullySend, ThreadLocal};
|
||||
use super::StdResult;
|
||||
use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec};
|
||||
use crate::heed_codec::CompressedObkvU16;
|
||||
use crate::index::db_name;
|
||||
use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY};
|
||||
use crate::update::new::KvReaderFieldId;
|
||||
@ -825,14 +826,31 @@ impl FieldIdDocidFacetSender<'_, '_> {
|
||||
pub struct DocumentsSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
||||
|
||||
impl DocumentsSender<'_, '_> {
|
||||
/// TODO do that efficiently
|
||||
pub fn uncompressed(
|
||||
pub fn write_uncompressed(
|
||||
&self,
|
||||
docid: DocumentId,
|
||||
external_id: String,
|
||||
document: &KvReaderFieldId,
|
||||
) -> crate::Result<()> {
|
||||
self.0.write_key_value(Database::Documents, &docid.to_be_bytes(), document.as_bytes())?;
|
||||
self.write_raw(docid, external_id, document.as_bytes())
|
||||
}
|
||||
|
||||
pub fn write_compressed(
|
||||
&self,
|
||||
docid: DocumentId,
|
||||
external_id: String,
|
||||
document: &CompressedObkvU16,
|
||||
) -> crate::Result<()> {
|
||||
self.write_raw(docid, external_id, document.as_bytes())
|
||||
}
|
||||
|
||||
fn write_raw(
|
||||
&self,
|
||||
docid: DocumentId,
|
||||
external_id: String,
|
||||
raw_document_bytes: &[u8],
|
||||
) -> crate::Result<()> {
|
||||
self.0.write_key_value(Database::Documents, &docid.to_be_bytes(), raw_document_bytes)?;
|
||||
self.0.write_key_value(
|
||||
Database::ExternalDocumentsIds,
|
||||
external_id.as_bytes(),
|
||||
|
@ -1,9 +1,11 @@
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use bumpalo::Bump;
|
||||
use bumparaw_collections::RawMap;
|
||||
use heed::RoTxn;
|
||||
use rustc_hash::FxBuildHasher;
|
||||
use serde_json::value::RawValue;
|
||||
use zstd::dict::DecoderDictionary;
|
||||
|
||||
use super::vector_document::VectorDocument;
|
||||
use super::{KvReaderFieldId, KvWriterFieldId};
|
||||
@ -62,6 +64,7 @@ impl<'t, Mapper: FieldIdMapper> Clone for DocumentFromDb<'t, Mapper> {
|
||||
*self
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t, Mapper: FieldIdMapper> Copy for DocumentFromDb<'t, Mapper> {}
|
||||
|
||||
impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
|
||||
@ -128,10 +131,19 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
|
||||
rtxn: &'t RoTxn,
|
||||
index: &'t Index,
|
||||
db_fields_ids_map: &'t Mapper,
|
||||
db_document_decompression_dictionary: Option<&DecoderDictionary<'static>>,
|
||||
doc_alloc: &'t Bump,
|
||||
) -> Result<Option<Self>> {
|
||||
index.documents.get(rtxn, &docid).map_err(crate::Error::from).map(|reader| {
|
||||
reader.map(|reader| Self { fields_ids_map: db_fields_ids_map, content: reader })
|
||||
})
|
||||
match index.compressed_document(rtxn, docid)? {
|
||||
Some(compressed) => {
|
||||
let content = match db_document_decompression_dictionary {
|
||||
Some(dictionary) => compressed.decompress_into_bump(doc_alloc, dictionary)?,
|
||||
None => compressed.as_non_compressed(),
|
||||
};
|
||||
Ok(Some(Self { fields_ids_map: db_fields_ids_map, content }))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn field(&self, name: &str) -> Result<Option<&'t RawValue>> {
|
||||
@ -195,9 +207,18 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
|
||||
rtxn: &'t RoTxn,
|
||||
index: &'t Index,
|
||||
db_fields_ids_map: &'t Mapper,
|
||||
db_document_decompression_dictionary: Option<&'t DecoderDictionary<'static>>,
|
||||
doc_alloc: &'t Bump,
|
||||
new_doc: DocumentFromVersions<'a, 'doc>,
|
||||
) -> Result<Self> {
|
||||
let db = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?;
|
||||
let db = DocumentFromDb::new(
|
||||
docid,
|
||||
rtxn,
|
||||
index,
|
||||
db_fields_ids_map,
|
||||
db_document_decompression_dictionary,
|
||||
doc_alloc,
|
||||
)?;
|
||||
Ok(Self { new_doc, db })
|
||||
}
|
||||
|
||||
@ -240,9 +261,10 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
|
||||
return Ok(Some(vectors));
|
||||
}
|
||||
|
||||
let Some(db) = self.db else { return Ok(None) };
|
||||
|
||||
db.vectors_field()
|
||||
match &self.db {
|
||||
Some(db) => db.vectors_field(),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn geo_field(&self) -> Result<Option<&'d RawValue>> {
|
||||
@ -250,9 +272,10 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
|
||||
return Ok(Some(geo));
|
||||
}
|
||||
|
||||
let Some(db) = self.db else { return Ok(None) };
|
||||
|
||||
db.geo_field()
|
||||
match &self.db {
|
||||
Some(db) => db.geo_field(),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn top_level_fields_count(&self) -> usize {
|
||||
@ -263,7 +286,7 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
|
||||
if let Some(f) = self.new_doc.top_level_field(k)? {
|
||||
return Ok(Some(f));
|
||||
}
|
||||
if let Some(db) = self.db {
|
||||
if let Some(db) = &self.db {
|
||||
return db.field(k);
|
||||
}
|
||||
Ok(None)
|
||||
|
@ -1,5 +1,6 @@
|
||||
use bumpalo::Bump;
|
||||
use heed::RoTxn;
|
||||
use zstd::dict::DecoderDictionary;
|
||||
|
||||
use super::document::{
|
||||
Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
|
||||
@ -72,8 +73,10 @@ impl<'doc> Deletion<'doc> {
|
||||
rtxn: &'a RoTxn,
|
||||
index: &'a Index,
|
||||
mapper: &'a Mapper,
|
||||
dictionary: Option<&'a DecoderDictionary<'static>>,
|
||||
doc_alloc: &'a Bump,
|
||||
) -> Result<DocumentFromDb<'a, Mapper>> {
|
||||
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
|
||||
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, dictionary, doc_alloc)?.ok_or(
|
||||
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
||||
)?)
|
||||
}
|
||||
@ -91,6 +94,7 @@ impl<'doc> Insertion<'doc> {
|
||||
pub fn external_document_id(&self) -> &'doc str {
|
||||
self.external_document_id
|
||||
}
|
||||
|
||||
pub fn inserted(&self) -> DocumentFromVersions<'_, 'doc> {
|
||||
DocumentFromVersions::new(&self.new)
|
||||
}
|
||||
@ -126,8 +130,10 @@ impl<'doc> Update<'doc> {
|
||||
rtxn: &'a RoTxn,
|
||||
index: &'a Index,
|
||||
mapper: &'a Mapper,
|
||||
dictionary: Option<&'a DecoderDictionary<'static>>,
|
||||
doc_alloc: &'a Bump,
|
||||
) -> Result<DocumentFromDb<'a, Mapper>> {
|
||||
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
|
||||
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper, dictionary, doc_alloc)?.ok_or(
|
||||
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
||||
)?)
|
||||
}
|
||||
@ -137,11 +143,13 @@ impl<'doc> Update<'doc> {
|
||||
rtxn: &'a RoTxn,
|
||||
index: &'a Index,
|
||||
mapper: &'a Mapper,
|
||||
dictionary: Option<&'a DecoderDictionary<'static>>,
|
||||
doc_alloc: &'a Bump,
|
||||
) -> Result<VectorDocumentFromDb<'a>> {
|
||||
Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or(
|
||||
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
||||
)?)
|
||||
Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, dictionary, doc_alloc)?
|
||||
.ok_or(crate::error::UserError::UnknownInternalDocumentId {
|
||||
document_id: self.docid,
|
||||
})?)
|
||||
}
|
||||
|
||||
pub fn updated(&self) -> DocumentFromVersions<'_, 'doc> {
|
||||
@ -153,6 +161,8 @@ impl<'doc> Update<'doc> {
|
||||
rtxn: &'t RoTxn,
|
||||
index: &'t Index,
|
||||
mapper: &'t Mapper,
|
||||
dictionary: Option<&'t DecoderDictionary<'static>>,
|
||||
doc_alloc: &'t Bump,
|
||||
) -> Result<MergedDocument<'_, 'doc, 't, Mapper>> {
|
||||
if self.has_deletion {
|
||||
Ok(MergedDocument::without_db(DocumentFromVersions::new(&self.new)))
|
||||
@ -162,6 +172,8 @@ impl<'doc> Update<'doc> {
|
||||
rtxn,
|
||||
index,
|
||||
mapper,
|
||||
dictionary,
|
||||
doc_alloc,
|
||||
DocumentFromVersions::new(&self.new),
|
||||
)
|
||||
}
|
||||
@ -177,6 +189,8 @@ impl<'doc> Update<'doc> {
|
||||
rtxn: &'t RoTxn,
|
||||
index: &'t Index,
|
||||
mapper: &'t Mapper,
|
||||
dictionary: Option<&'t DecoderDictionary<'static>>,
|
||||
doc_alloc: &'t Bump,
|
||||
) -> Result<bool> {
|
||||
let mut changed = false;
|
||||
let mut cached_current = None;
|
||||
@ -192,7 +206,7 @@ impl<'doc> Update<'doc> {
|
||||
updated_selected_field_count += 1;
|
||||
let current = match cached_current {
|
||||
Some(current) => current,
|
||||
None => self.current(rtxn, index, mapper)?,
|
||||
None => self.current(rtxn, index, mapper, dictionary, doc_alloc)?,
|
||||
};
|
||||
let current_value = current.top_level_field(key)?;
|
||||
let Some(current_value) = current_value else {
|
||||
@ -222,7 +236,7 @@ impl<'doc> Update<'doc> {
|
||||
let has_deleted_fields = {
|
||||
let current = match cached_current {
|
||||
Some(current) => current,
|
||||
None => self.current(rtxn, index, mapper)?,
|
||||
None => self.current(rtxn, index, mapper, dictionary, doc_alloc)?,
|
||||
};
|
||||
|
||||
let mut current_selected_field_count = 0;
|
||||
@ -254,6 +268,7 @@ impl<'doc> Update<'doc> {
|
||||
rtxn: &'doc RoTxn,
|
||||
index: &'doc Index,
|
||||
mapper: &'doc Mapper,
|
||||
dictionary: Option<&'doc DecoderDictionary<'static>>,
|
||||
doc_alloc: &'doc Bump,
|
||||
embedders: &'doc EmbeddingConfigs,
|
||||
) -> Result<Option<MergedVectorDocument<'doc>>> {
|
||||
@ -271,6 +286,7 @@ impl<'doc> Update<'doc> {
|
||||
index,
|
||||
rtxn,
|
||||
mapper,
|
||||
dictionary,
|
||||
&self.new,
|
||||
doc_alloc,
|
||||
embedders,
|
||||
|
252
crates/milli/src/update/new/extract/documents/compression.rs
Normal file
252
crates/milli/src/update/new/extract/documents/compression.rs
Normal file
@ -0,0 +1,252 @@
|
||||
use std::cell::RefCell;
|
||||
use std::sync::atomic::{self, AtomicUsize};
|
||||
|
||||
use bumpalo::Bump;
|
||||
use heed::{RoTxn, RwTxn};
|
||||
use rayon::iter::{ParallelBridge, ParallelIterator as _};
|
||||
use roaring::RoaringBitmap;
|
||||
use zstd::bulk::Compressor;
|
||||
use zstd::dict::{from_continuous, EncoderDictionary};
|
||||
|
||||
use crate::heed_codec::CompressedObkvU16;
|
||||
use crate::update::new::document::Document as _;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
DocumentChangeContext, DocumentChanges, Extractor, IndexingContext,
|
||||
};
|
||||
use crate::update::new::indexer::extract;
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::{Index, Result};
|
||||
|
||||
/// The compression level to use when compressing documents.
|
||||
const COMPRESSION_LEVEL: i32 = 19;
|
||||
/// The number of documents required as a sample for generating
|
||||
/// the compression dictionary.
|
||||
const SAMPLE_SIZE: usize = 10_000;
|
||||
/// The maximum size the document compression dictionary can be.
|
||||
const DICTIONARY_MAX_SIZE: usize = 64_000;
|
||||
/// The maximum number of documents we accept to compress if they
|
||||
/// have not already been compressed in the database. If this threshold
|
||||
/// is reached, we do not generate a dictionary and continue as is.
|
||||
const COMPRESS_LIMIT: usize = 5_000_000;
|
||||
/// This is 10KiB.
|
||||
const TEN_KIB: usize = 10 * 1024;
|
||||
|
||||
/// A function dedicated to use the existing or generate an appropriate
|
||||
/// document compression dictionay based on the documents available in
|
||||
/// the database and the ones in the payload.
|
||||
///
|
||||
/// If it has to compute a new compression dictionary it immediately
|
||||
/// writes the dictionary in the database and compresses the documents
|
||||
/// that are not part of the current update with it.
|
||||
///
|
||||
/// If there are too many documents already in the database and no
|
||||
/// compression dictionary we prefer not to generate a dictionary to avoid
|
||||
/// compressing all of the documents and potentially blow up disk space.
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
|
||||
pub fn retrieve_or_compute_document_compression_dictionary<'pl, 'extractor, DC, MSP>(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
document_changes: &DC,
|
||||
indexing_context: IndexingContext<MSP>,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
) -> Result<Option<EncoderDictionary<'static>>>
|
||||
where
|
||||
DC: DocumentChanges<'pl>,
|
||||
MSP: Fn() -> bool + Sync,
|
||||
{
|
||||
let number_of_documents = index.number_of_documents(wtxn)? as usize;
|
||||
match index.document_compression_raw_dictionary(wtxn)? {
|
||||
Some(dict) => Ok(Some(EncoderDictionary::copy(dict, COMPRESSION_LEVEL))),
|
||||
None if !indexing_context.allow_creating_compression_dictionary => Ok(None),
|
||||
None if number_of_documents >= COMPRESS_LIMIT => Ok(None),
|
||||
None if number_of_documents + document_changes.len() < SAMPLE_SIZE => Ok(None),
|
||||
None => {
|
||||
let mut sample_data = Vec::new();
|
||||
let mut sample_sizes = Vec::new();
|
||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||
let extractor = CompressorExtractor {
|
||||
total_documents_to_extract: SAMPLE_SIZE,
|
||||
extracted_documents_count: AtomicUsize::new(0),
|
||||
};
|
||||
|
||||
// We first collect all the documents for the database into a buffer.
|
||||
for result in index.all_compressed_documents(wtxn)? {
|
||||
let (_docid, compressed_document) = result?;
|
||||
// The documents are not compressed with any dictionary at this point.
|
||||
let document = compressed_document.as_non_compressed();
|
||||
sample_data.extend_from_slice(document.as_bytes());
|
||||
sample_sizes.push(document.as_bytes().len());
|
||||
}
|
||||
|
||||
// This extraction only takes care about documents replacements
|
||||
// and not updates (merges). The merged documents are ignored as
|
||||
// we will only use the previous version of them in the database,
|
||||
// just above.
|
||||
extract(
|
||||
document_changes,
|
||||
&extractor,
|
||||
indexing_context,
|
||||
extractor_allocs,
|
||||
&datastore,
|
||||
IndexingStep::PreparingCompressionDictionary,
|
||||
)?;
|
||||
|
||||
let mut all_documents_seen = RoaringBitmap::new();
|
||||
for data in datastore {
|
||||
let CompressorExtractorData { documents_seen, fields, fields_count, must_stop: _ } =
|
||||
data.into_inner();
|
||||
|
||||
all_documents_seen |= documents_seen;
|
||||
|
||||
let mut fields_iter = fields.into_iter();
|
||||
for field_count in fields_count {
|
||||
let mut document_fields_size = 0;
|
||||
for field in fields_iter.by_ref().take(field_count) {
|
||||
sample_data.extend_from_slice(field);
|
||||
document_fields_size += field.len();
|
||||
}
|
||||
sample_sizes.push(document_fields_size);
|
||||
}
|
||||
|
||||
debug_assert_eq!(
|
||||
fields_iter.count(),
|
||||
0,
|
||||
"We must have consumed all the documents' \
|
||||
fields but there were some remaining ones"
|
||||
);
|
||||
}
|
||||
|
||||
// We avoid generating a dictionary if most (> 1/3) of the sample sizes are
|
||||
// smaller than 8 bytes, or if the sample data size is smaller than 10KiB.
|
||||
//
|
||||
// <https://github.com/facebook/zstd/blob/0218c8de0fa77bbd87e75f2ea70ba00b93460e15/lib/zdict.h#L190-L209>
|
||||
if sample_sizes.iter().filter(|s| **s < 8).count() > sample_sizes.len() / 3
|
||||
|| sample_data.len() < TEN_KIB
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let dictionary = from_continuous(&sample_data, &sample_sizes, DICTIONARY_MAX_SIZE)?;
|
||||
index.put_document_compression_dictionary(wtxn, &dictionary)?;
|
||||
let encoder_dictionary = EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL);
|
||||
|
||||
let all_documents = index.documents_ids(wtxn)?;
|
||||
let documents_to_compress = all_documents - all_documents_seen;
|
||||
let datastore = ThreadLocal::with_capacity(rayon::max_num_threads());
|
||||
let pi = documents_to_compress.into_iter().par_bridge().map(|docid| {
|
||||
let data = datastore.get_or_try(|| {
|
||||
crate::Result::Ok(RefCell::new(ParallelCompressionData {
|
||||
rtxn: index.read_txn()?,
|
||||
compressor: Compressor::with_dictionary(COMPRESSION_LEVEL, &dictionary)?,
|
||||
}))
|
||||
})?;
|
||||
|
||||
let mut data = data.borrow_mut_or_yield();
|
||||
let ParallelCompressionData { rtxn, compressor } = &mut *data;
|
||||
|
||||
let compressed_document = index.compressed_document(rtxn, docid)?.unwrap();
|
||||
// The documents are not compressed with any dictionary at this point.
|
||||
let document = compressed_document.as_non_compressed();
|
||||
let compressed = CompressedObkvU16::with_compressor(document, compressor)?;
|
||||
Ok((docid, compressed)) as crate::Result<_>
|
||||
});
|
||||
|
||||
// We compress in parallel and sequentially write the documents
|
||||
// in the database using the above parallel iterator.
|
||||
rayon_par_bridge::par_bridge(100, pi, |seq_iter| {
|
||||
for result in seq_iter {
|
||||
let (docid, compressed_document) = result?;
|
||||
index.documents.put(wtxn, &docid, &compressed_document)?;
|
||||
}
|
||||
Ok(()) as crate::Result<_>
|
||||
})?;
|
||||
|
||||
Ok(Some(encoder_dictionary))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Used when we are compressing documents in parallel.
|
||||
struct ParallelCompressionData<'extractor> {
|
||||
rtxn: RoTxn<'extractor>,
|
||||
compressor: Compressor<'extractor>,
|
||||
}
|
||||
|
||||
unsafe impl<'extractor> MostlySend for RefCell<ParallelCompressionData<'extractor>> {}
|
||||
|
||||
struct CompressorExtractor {
|
||||
/// The total number of documents we must extract from all threads.
|
||||
total_documents_to_extract: usize,
|
||||
/// The combined, shared, number of extracted documents.
|
||||
extracted_documents_count: AtomicUsize,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct CompressorExtractorData<'extractor> {
|
||||
/// The set of documents impacted by this update: deleted, modified, or updated.
|
||||
documents_seen: RoaringBitmap,
|
||||
/// The field content in JSON but as bytes.
|
||||
fields: Vec<&'extractor [u8]>,
|
||||
/// The number of fields associated to single documents.
|
||||
/// It is used to provide good sample to the dictionary generator.
|
||||
fields_count: Vec<usize>,
|
||||
/// We extracted the expected count of documents, we can skip everything now.
|
||||
must_stop: bool,
|
||||
}
|
||||
|
||||
unsafe impl<'extractor> MostlySend for RefCell<CompressorExtractorData<'extractor>> {}
|
||||
|
||||
impl<'extractor> Extractor<'extractor> for CompressorExtractor {
|
||||
type Data = RefCell<CompressorExtractorData<'extractor>>;
|
||||
|
||||
fn init_data<'doc>(
|
||||
&'doc self,
|
||||
_extractor_alloc: &'extractor bumpalo::Bump,
|
||||
) -> crate::Result<Self::Data> {
|
||||
Ok(RefCell::new(CompressorExtractorData::default()))
|
||||
}
|
||||
|
||||
fn process<'doc>(
|
||||
&'doc self,
|
||||
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
|
||||
context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>,
|
||||
) -> crate::Result<()> {
|
||||
let mut data = context.data.borrow_mut_or_yield();
|
||||
|
||||
for change in changes {
|
||||
if data.must_stop {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let change = change?;
|
||||
let docid = match change {
|
||||
DocumentChange::Deletion(deletion) => deletion.docid(),
|
||||
DocumentChange::Update(update) => update.docid(),
|
||||
DocumentChange::Insertion(insertion) => {
|
||||
let mut fields_count = 0;
|
||||
for result in insertion.inserted().iter_top_level_fields() {
|
||||
let (_field_name, raw_value) = result?;
|
||||
let bytes = raw_value.get().as_bytes();
|
||||
data.fields.push(context.extractor_alloc.alloc_slice_copy(bytes));
|
||||
fields_count += 1;
|
||||
}
|
||||
|
||||
let previous_count =
|
||||
self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
data.must_stop = previous_count >= self.total_documents_to_extract;
|
||||
data.fields_count.push(fields_count);
|
||||
|
||||
insertion.docid()
|
||||
}
|
||||
};
|
||||
|
||||
let is_new = data.documents_seen.insert(docid);
|
||||
debug_assert!(is_new, "We must not see the same documents multiple times");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,10 +1,14 @@
|
||||
use std::cell::RefCell;
|
||||
|
||||
use bumpalo::Bump;
|
||||
pub use compression::retrieve_or_compute_document_compression_dictionary;
|
||||
use hashbrown::HashMap;
|
||||
use zstd::bulk::Compressor;
|
||||
use zstd::dict::EncoderDictionary;
|
||||
|
||||
use super::DelAddRoaringBitmap;
|
||||
use crate::constants::RESERVED_GEO_FIELD_NAME;
|
||||
use crate::heed_codec::CompressedObkvU16;
|
||||
use crate::update::new::channel::DocumentsSender;
|
||||
use crate::update::new::document::{write_to_obkv, Document as _};
|
||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
|
||||
@ -14,28 +18,44 @@ use crate::update::new::DocumentChange;
|
||||
use crate::vector::EmbeddingConfigs;
|
||||
use crate::Result;
|
||||
|
||||
mod compression;
|
||||
|
||||
pub struct DocumentsExtractor<'a, 'b> {
|
||||
document_sender: DocumentsSender<'a, 'b>,
|
||||
documents_compression_dictionary: Option<&'a EncoderDictionary<'a>>,
|
||||
embedders: &'a EmbeddingConfigs,
|
||||
}
|
||||
|
||||
impl<'a, 'b> DocumentsExtractor<'a, 'b> {
|
||||
pub fn new(document_sender: DocumentsSender<'a, 'b>, embedders: &'a EmbeddingConfigs) -> Self {
|
||||
Self { document_sender, embedders }
|
||||
pub fn new(
|
||||
document_sender: DocumentsSender<'a, 'b>,
|
||||
documents_compression_dictionary: Option<&'a EncoderDictionary<'a>>,
|
||||
embedders: &'a EmbeddingConfigs,
|
||||
) -> Self {
|
||||
Self { document_sender, documents_compression_dictionary, embedders }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct DocumentExtractorData {
|
||||
pub struct DocumentExtractorData<'a> {
|
||||
pub docids_delta: DelAddRoaringBitmap,
|
||||
pub field_distribution_delta: HashMap<String, i64>,
|
||||
pub documents_compressor: Option<Compressor<'a>>,
|
||||
}
|
||||
|
||||
impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
||||
type Data = FullySend<RefCell<DocumentExtractorData>>;
|
||||
type Data = FullySend<RefCell<DocumentExtractorData<'a>>>;
|
||||
|
||||
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
||||
Ok(FullySend(Default::default()))
|
||||
let documents_compressor = match self.documents_compression_dictionary {
|
||||
Some(dictionary) => Some(Compressor::with_prepared_dictionary(dictionary)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(FullySend(RefCell::new(DocumentExtractorData {
|
||||
docids_delta: Default::default(),
|
||||
field_distribution_delta: Default::default(),
|
||||
documents_compressor,
|
||||
})))
|
||||
}
|
||||
|
||||
fn process<'doc>(
|
||||
@ -48,10 +68,11 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
||||
|
||||
for change in changes {
|
||||
let change = change?;
|
||||
// **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop
|
||||
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
|
||||
// **WARNING**: The exclusive borrow on `new_fields_ids_map` needs to be taken
|
||||
// **inside** of the `for change in changes` loop. Otherwise,
|
||||
// `BorrowMutError` will occur for document changes that also need
|
||||
// the new_fields_ids_map (e.g.: UpdateByFunction).
|
||||
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
|
||||
|
||||
let external_docid = change.external_docid().to_owned();
|
||||
|
||||
// document but we need to create a function that collects and compresses documents.
|
||||
@ -62,6 +83,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
&context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?;
|
||||
let geo_iter = content
|
||||
.geo_field()
|
||||
@ -80,8 +103,13 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
||||
}
|
||||
DocumentChange::Update(update) => {
|
||||
let docid = update.docid();
|
||||
let content =
|
||||
update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||
let content = update.current(
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
&context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?;
|
||||
let geo_iter = content
|
||||
.geo_field()
|
||||
.transpose()
|
||||
@ -94,8 +122,13 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
||||
.or_default();
|
||||
*entry -= 1;
|
||||
}
|
||||
let content =
|
||||
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||
let content = update.merged(
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
&context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?;
|
||||
let geo_iter = content
|
||||
.geo_field()
|
||||
.transpose()
|
||||
@ -109,12 +142,18 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
||||
*entry += 1;
|
||||
}
|
||||
|
||||
let content =
|
||||
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||
let content = update.merged(
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
&context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?;
|
||||
let vector_content = update.merged_vectors(
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
&context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
self.embedders,
|
||||
)?;
|
||||
@ -124,7 +163,19 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
||||
&mut new_fields_ids_map,
|
||||
&mut document_buffer,
|
||||
)?;
|
||||
self.document_sender.uncompressed(docid, external_docid, content).unwrap();
|
||||
|
||||
match document_extractor_data.documents_compressor.as_mut() {
|
||||
Some(compressor) => {
|
||||
let doc = CompressedObkvU16::with_compressor(content, compressor)?;
|
||||
self.document_sender
|
||||
.write_compressed(docid, external_docid, &doc)
|
||||
.unwrap();
|
||||
}
|
||||
None => self
|
||||
.document_sender
|
||||
.write_uncompressed(docid, external_docid, content)
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
DocumentChange::Insertion(insertion) => {
|
||||
let docid = insertion.docid();
|
||||
@ -150,7 +201,18 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
||||
&mut document_buffer,
|
||||
)?;
|
||||
document_extractor_data.docids_delta.insert_add_u32(docid);
|
||||
self.document_sender.uncompressed(docid, external_docid, content).unwrap();
|
||||
match document_extractor_data.documents_compressor.as_mut() {
|
||||
Some(compressor) => {
|
||||
let doc = CompressedObkvU16::with_compressor(content, compressor)?;
|
||||
self.document_sender
|
||||
.write_compressed(docid, external_docid, &doc)
|
||||
.unwrap();
|
||||
}
|
||||
None => self
|
||||
.document_sender
|
||||
.write_uncompressed(docid, external_docid, content)
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -79,7 +79,13 @@ impl FacetedDocidsExtractor {
|
||||
let res = match document_change {
|
||||
DocumentChange::Deletion(inner) => extract_document_facets(
|
||||
attributes_to_extract,
|
||||
inner.current(rtxn, index, context.db_fields_ids_map)?,
|
||||
inner.current(
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
inner.external_document_id(),
|
||||
new_fields_ids_map.deref_mut(),
|
||||
&mut |fid, depth, value| {
|
||||
@ -102,13 +108,21 @@ impl FacetedDocidsExtractor {
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
extract_document_facets(
|
||||
attributes_to_extract,
|
||||
inner.current(rtxn, index, context.db_fields_ids_map)?,
|
||||
inner.current(
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
inner.external_document_id(),
|
||||
new_fields_ids_map.deref_mut(),
|
||||
&mut |fid, depth, value| {
|
||||
@ -128,7 +142,13 @@ impl FacetedDocidsExtractor {
|
||||
|
||||
extract_document_facets(
|
||||
attributes_to_extract,
|
||||
inner.merged(rtxn, index, context.db_fields_ids_map)?,
|
||||
inner.merged(
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
inner.external_document_id(),
|
||||
new_fields_ids_map.deref_mut(),
|
||||
&mut |fid, depth, value| {
|
||||
|
@ -159,6 +159,8 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
||||
let index = context.index;
|
||||
let max_memory = self.grenad_parameters.max_memory_by_thread();
|
||||
let db_fields_ids_map = context.db_fields_ids_map;
|
||||
let db_document_decompression_dictionary = context.db_document_decompression_dictionary;
|
||||
let doc_alloc = &context.doc_alloc;
|
||||
let mut data_ref = context.data.borrow_mut_or_yield();
|
||||
|
||||
for change in changes {
|
||||
@ -174,7 +176,13 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
||||
DocumentChange::Deletion(deletion) => {
|
||||
let docid = deletion.docid();
|
||||
let external_id = deletion.external_document_id();
|
||||
let current = deletion.current(rtxn, index, db_fields_ids_map)?;
|
||||
let current = deletion.current(
|
||||
rtxn,
|
||||
index,
|
||||
db_fields_ids_map,
|
||||
db_document_decompression_dictionary,
|
||||
doc_alloc,
|
||||
)?;
|
||||
let current_geo = current
|
||||
.geo_field()?
|
||||
.map(|geo| extract_geo_coordinates(external_id, geo))
|
||||
@ -189,7 +197,13 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
||||
}
|
||||
}
|
||||
DocumentChange::Update(update) => {
|
||||
let current = update.current(rtxn, index, db_fields_ids_map)?;
|
||||
let current = update.current(
|
||||
rtxn,
|
||||
index,
|
||||
db_fields_ids_map,
|
||||
db_document_decompression_dictionary,
|
||||
doc_alloc,
|
||||
)?;
|
||||
let external_id = update.external_document_id();
|
||||
let docid = update.docid();
|
||||
|
||||
|
@ -6,9 +6,7 @@ mod searchable;
|
||||
mod vectors;
|
||||
|
||||
use bumpalo::Bump;
|
||||
pub use cache::{
|
||||
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
|
||||
};
|
||||
pub use cache::*;
|
||||
pub use documents::*;
|
||||
pub use faceted::*;
|
||||
pub use geo::*;
|
||||
|
@ -338,7 +338,13 @@ impl WordDocidsExtractors {
|
||||
)
|
||||
};
|
||||
document_tokenizer.tokenize_document(
|
||||
inner.current(rtxn, index, context.db_fields_ids_map)?,
|
||||
inner.current(
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
new_fields_ids_map,
|
||||
&mut token_fn,
|
||||
)?;
|
||||
@ -349,6 +355,8 @@ impl WordDocidsExtractors {
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)? {
|
||||
return Ok(());
|
||||
}
|
||||
@ -364,7 +372,13 @@ impl WordDocidsExtractors {
|
||||
)
|
||||
};
|
||||
document_tokenizer.tokenize_document(
|
||||
inner.current(rtxn, index, context.db_fields_ids_map)?,
|
||||
inner.current(
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
new_fields_ids_map,
|
||||
&mut token_fn,
|
||||
)?;
|
||||
@ -380,7 +394,13 @@ impl WordDocidsExtractors {
|
||||
)
|
||||
};
|
||||
document_tokenizer.tokenize_document(
|
||||
inner.merged(rtxn, index, context.db_fields_ids_map)?,
|
||||
inner.merged(
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
new_fields_ids_map,
|
||||
&mut token_fn,
|
||||
)?;
|
||||
|
@ -58,7 +58,13 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
|
||||
let docid = document_change.docid();
|
||||
match document_change {
|
||||
DocumentChange::Deletion(inner) => {
|
||||
let document = inner.current(rtxn, index, context.db_fields_ids_map)?;
|
||||
let document = inner.current(
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?;
|
||||
process_document_tokens(
|
||||
document,
|
||||
document_tokenizer,
|
||||
@ -75,11 +81,19 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let document = inner.current(rtxn, index, context.db_fields_ids_map)?;
|
||||
let document = inner.current(
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?;
|
||||
process_document_tokens(
|
||||
document,
|
||||
document_tokenizer,
|
||||
@ -89,7 +103,13 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
|
||||
del_word_pair_proximity.push(((w1, w2), prox));
|
||||
},
|
||||
)?;
|
||||
let document = inner.merged(rtxn, index, context.db_fields_ids_map)?;
|
||||
let document = inner.merged(
|
||||
rtxn,
|
||||
index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?;
|
||||
process_document_tokens(
|
||||
document,
|
||||
document_tokenizer,
|
||||
|
@ -1,7 +1,3 @@
|
||||
mod extract_word_docids;
|
||||
mod extract_word_pair_proximity_docids;
|
||||
mod tokenize_document;
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
@ -22,6 +18,10 @@ use crate::update::new::DocumentChange;
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
||||
|
||||
mod extract_word_docids;
|
||||
mod extract_word_pair_proximity_docids;
|
||||
mod tokenize_document;
|
||||
|
||||
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
|
||||
tokenizer: &'a DocumentTokenizer<'a>,
|
||||
grenad_parameters: &'a GrenadParameters,
|
||||
|
@ -97,6 +97,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?;
|
||||
let new_vectors = update.updated_vectors(&context.doc_alloc, self.embedders)?;
|
||||
@ -135,6 +136,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
context.new_fields_ids_map,
|
||||
&context.doc_alloc,
|
||||
@ -145,6 +148,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
context.new_fields_ids_map,
|
||||
&context.doc_alloc,
|
||||
@ -165,6 +170,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
context.new_fields_ids_map,
|
||||
&context.doc_alloc,
|
||||
@ -175,6 +182,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
context.db_document_decompression_dictionary,
|
||||
&context.doc_alloc,
|
||||
)?,
|
||||
context.new_fields_ids_map,
|
||||
&context.doc_alloc,
|
||||
|
@ -5,6 +5,7 @@ use std::sync::{Arc, RwLock};
|
||||
use bumpalo::Bump;
|
||||
use heed::RoTxn;
|
||||
use rayon::iter::IndexedParallelIterator;
|
||||
use zstd::dict::DecoderDictionary;
|
||||
|
||||
use super::super::document_change::DocumentChange;
|
||||
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
|
||||
@ -27,6 +28,8 @@ pub struct DocumentChangeContext<
|
||||
/// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents
|
||||
/// inside of the DB.
|
||||
pub db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
/// The dictionary used to decompress the documents in the database.
|
||||
pub db_document_decompression_dictionary: Option<&'indexer DecoderDictionary<'static>>,
|
||||
/// A transaction providing data from the DB before all indexing operations
|
||||
pub rtxn: RoTxn<'indexer>,
|
||||
|
||||
@ -62,6 +65,7 @@ impl<
|
||||
pub fn new<F>(
|
||||
index: &'indexer Index,
|
||||
db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
db_document_decompression_dictionary: Option<&'indexer DecoderDictionary<'static>>,
|
||||
new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
|
||||
extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
|
||||
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
|
||||
@ -80,14 +84,13 @@ impl<
|
||||
|
||||
let fields_ids_map = &fields_ids_map.0;
|
||||
let extractor_alloc = extractor_allocs.get_or_default();
|
||||
|
||||
let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
|
||||
|
||||
let txn = index.read_txn()?;
|
||||
Ok(DocumentChangeContext {
|
||||
index,
|
||||
rtxn: txn,
|
||||
rtxn: index.read_txn()?,
|
||||
db_fields_ids_map,
|
||||
db_document_decompression_dictionary,
|
||||
new_fields_ids_map: fields_ids_map,
|
||||
doc_alloc,
|
||||
extractor_alloc: &extractor_alloc.0,
|
||||
@ -106,7 +109,7 @@ pub trait Extractor<'extractor>: Sync {
|
||||
fn process<'doc>(
|
||||
&'doc self,
|
||||
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
|
||||
context: &'doc DocumentChangeContext<Self::Data>,
|
||||
context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>,
|
||||
) -> Result<()>;
|
||||
}
|
||||
|
||||
@ -122,8 +125,10 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
fn item_to_document_change<'doc, // lifetime of a single `process` call
|
||||
T: MostlySend>(
|
||||
fn item_to_document_change<
|
||||
'doc, // lifetime of a single `process` call
|
||||
T: MostlySend,
|
||||
>(
|
||||
&'doc self,
|
||||
context: &'doc DocumentChangeContext<T>,
|
||||
item: &'doc Self::Item,
|
||||
@ -141,6 +146,8 @@ pub struct IndexingContext<
|
||||
{
|
||||
pub index: &'index Index,
|
||||
pub db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
pub allow_creating_compression_dictionary: bool,
|
||||
pub db_document_decompression_dictionary: Option<&'indexer DecoderDictionary<'static>>,
|
||||
pub new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
|
||||
pub doc_allocs: &'indexer ThreadLocal<FullySend<Cell<Bump>>>,
|
||||
pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
|
||||
@ -204,6 +211,8 @@ pub fn extract<
|
||||
IndexingContext {
|
||||
index,
|
||||
db_fields_ids_map,
|
||||
allow_creating_compression_dictionary,
|
||||
db_document_decompression_dictionary,
|
||||
new_fields_ids_map,
|
||||
doc_allocs,
|
||||
fields_ids_map_store,
|
||||
@ -237,6 +246,7 @@ where
|
||||
DocumentChangeContext::new(
|
||||
index,
|
||||
db_fields_ids_map,
|
||||
db_document_decompression_dictionary,
|
||||
new_fields_ids_map,
|
||||
extractor_allocs,
|
||||
doc_allocs,
|
||||
|
@ -64,7 +64,11 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
|
||||
where
|
||||
'pl: 'doc, // the payload must survive the process calls
|
||||
{
|
||||
let current = context.index.document(&context.rtxn, *docid)?;
|
||||
let compressed = context.index.compressed_document(&context.rtxn, *docid)?.unwrap();
|
||||
let current = match context.db_document_decompression_dictionary {
|
||||
Some(dict) => compressed.decompress_into_bump(&context.doc_alloc, dict)?,
|
||||
None => compressed.as_non_compressed(),
|
||||
};
|
||||
|
||||
let external_document_id = self.primary_key.extract_docid_from_db(
|
||||
current,
|
||||
@ -140,7 +144,6 @@ mod test {
|
||||
let indexer = Bump::new();
|
||||
|
||||
let index = TempIndex::new();
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
|
||||
let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
||||
@ -148,8 +151,12 @@ mod test {
|
||||
let fields_ids_map =
|
||||
RwLock::new(FieldIdMapWithMetadata::new(db_fields_ids_map.clone(), metadata_builder));
|
||||
|
||||
let fields_ids_map_store = ThreadLocal::new();
|
||||
let db_document_decompression_dictionary = index
|
||||
.document_compression_raw_dictionary(&rtxn)
|
||||
.unwrap()
|
||||
.map(zstd::dict::DecoderDictionary::copy);
|
||||
|
||||
let fields_ids_map_store = ThreadLocal::new();
|
||||
let mut extractor_allocs = ThreadLocal::new();
|
||||
let doc_allocs = ThreadLocal::new();
|
||||
|
||||
@ -161,6 +168,7 @@ mod test {
|
||||
let context = IndexingContext {
|
||||
index: &index,
|
||||
db_fields_ids_map: &db_fields_ids_map,
|
||||
db_document_decompression_dictionary: db_document_decompression_dictionary.as_ref(),
|
||||
new_fields_ids_map: &fields_ids_map,
|
||||
doc_allocs: &doc_allocs,
|
||||
fields_ids_map_store: &fields_ids_map_store,
|
||||
|
@ -5,6 +5,7 @@ use std::sync::OnceLock;
|
||||
use bumpalo::Bump;
|
||||
use roaring::RoaringBitmap;
|
||||
use tracing::Span;
|
||||
use zstd::dict::EncoderDictionary;
|
||||
|
||||
use super::super::channel::*;
|
||||
use super::super::extract::*;
|
||||
@ -26,6 +27,7 @@ pub(super) fn extract_all<'pl, 'extractor, DC, MSP>(
|
||||
indexing_context: IndexingContext<MSP>,
|
||||
indexer_span: Span,
|
||||
extractor_sender: ExtractorBbqueueSender,
|
||||
document_compression_dictionary: Option<&'_ EncoderDictionary<'_>>,
|
||||
embedders: &EmbeddingConfigs,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
finished_extraction: &AtomicBool,
|
||||
@ -46,7 +48,8 @@ where
|
||||
|
||||
// document but we need to create a function that collects and compresses documents.
|
||||
let document_sender = extractor_sender.documents();
|
||||
let document_extractor = DocumentsExtractor::new(document_sender, embedders);
|
||||
let document_extractor =
|
||||
DocumentsExtractor::new(document_sender, document_compression_dictionary, embedders);
|
||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
|
||||
|
@ -3,7 +3,8 @@ use std::sync::RwLock;
|
||||
use std::thread::{self, Builder};
|
||||
|
||||
use big_s::S;
|
||||
use document_changes::{DocumentChanges, IndexingContext};
|
||||
|
||||
pub use document_changes::{extract, DocumentChanges, IndexingContext};
|
||||
pub use document_deletion::DocumentDeletion;
|
||||
pub use document_operation::{DocumentOperation, PayloadStats};
|
||||
use hashbrown::HashMap;
|
||||
@ -11,13 +12,18 @@ use heed::RwTxn;
|
||||
pub use partial_dump::PartialDump;
|
||||
pub use update_by_function::UpdateByFunction;
|
||||
use write::{build_vectors, update_index, write_to_db};
|
||||
use zstd::dict::DecoderDictionary;
|
||||
|
||||
use super::channel::*;
|
||||
use super::extract::*;
|
||||
use super::steps::IndexingStep;
|
||||
use super::thread_local::ThreadLocal;
|
||||
|
||||
use super::channel::*;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
||||
|
||||
use crate::progress::Progress;
|
||||
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
|
||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
|
||||
@ -89,6 +95,9 @@ where
|
||||
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
||||
.unwrap();
|
||||
|
||||
let db_document_decompression_dictionary = index
|
||||
.document_compression_raw_dictionary(wtxn)
|
||||
.map(|opt| opt.map(DecoderDictionary::copy))?;
|
||||
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
|
||||
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
|
||||
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
|
||||
@ -99,6 +108,8 @@ where
|
||||
let indexing_context = IndexingContext {
|
||||
index,
|
||||
db_fields_ids_map,
|
||||
allow_creating_compression_dictionary,
|
||||
db_document_decompression_dictionary: db_document_decompression_dictionary.as_ref(),
|
||||
new_fields_ids_map: &new_fields_ids_map,
|
||||
doc_allocs: &doc_allocs,
|
||||
fields_ids_map_store: &fields_ids_map_store,
|
||||
@ -107,6 +118,18 @@ where
|
||||
grenad_parameters: &grenad_parameters,
|
||||
};
|
||||
|
||||
let document_compression_dictionary = pool
|
||||
.install(|| {
|
||||
retrieve_or_compute_document_compression_dictionary(
|
||||
index,
|
||||
wtxn,
|
||||
document_changes,
|
||||
indexing_context,
|
||||
&mut extractor_allocs,
|
||||
)
|
||||
})
|
||||
.unwrap()?;
|
||||
|
||||
let index_embeddings = index.embedding_configs(wtxn)?;
|
||||
let mut field_distribution = index.field_distribution(wtxn)?;
|
||||
let mut document_ids = index.documents_ids(wtxn)?;
|
||||
@ -126,6 +149,7 @@ where
|
||||
indexing_context,
|
||||
indexer_span,
|
||||
extractor_sender,
|
||||
document_compression_dictionary.as_ref(),
|
||||
embedders,
|
||||
&mut extractor_allocs,
|
||||
finished_extraction,
|
||||
|
@ -95,6 +95,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
|
||||
let DocumentChangeContext {
|
||||
index,
|
||||
db_fields_ids_map,
|
||||
db_document_decompression_dictionary,
|
||||
rtxn: txn,
|
||||
new_fields_ids_map,
|
||||
doc_alloc,
|
||||
@ -105,7 +106,11 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
|
||||
|
||||
// safety: Both documents *must* exists in the database as
|
||||
// their IDs comes from the list of documents ids.
|
||||
let document = index.document(txn, docid)?;
|
||||
let compressed_document = index.compressed_document(txn, docid)?.unwrap();
|
||||
let document = match db_document_decompression_dictionary {
|
||||
Some(dictionary) => compressed_document.decompress_into_bump(doc_alloc, dictionary)?,
|
||||
None => compressed_document.as_non_compressed(),
|
||||
};
|
||||
let rhai_document = obkv_to_rhaimap(document, db_fields_ids_map)?;
|
||||
let json_document = all_obkv_to_json(document, db_fields_ids_map)?;
|
||||
|
||||
|
@ -3,9 +3,9 @@ use std::sync::Arc;
|
||||
use rayon::iter::ParallelIterator;
|
||||
|
||||
pub trait ParallelIteratorExt: ParallelIterator {
|
||||
/// A method to run a closure of all the items and return an owned error.
|
||||
/// A method to run a closure on all the items and return an owned error.
|
||||
///
|
||||
/// The init function is ran only as necessary which is basically once by thread.
|
||||
/// The init function is ran only as necessary which is basically once per thread.
|
||||
fn try_arc_for_each_try_init<F, INIT, T, E>(self, init: INIT, op: F) -> Result<(), E>
|
||||
where
|
||||
E: Send + Sync,
|
||||
|
@ -1,4 +1,5 @@
|
||||
use heed::RwTxn;
|
||||
use zstd::dict::DecoderDictionary;
|
||||
|
||||
use super::document::{Document, DocumentFromDb};
|
||||
use crate::progress::{self, AtomicSubStep, Progress};
|
||||
@ -15,11 +16,23 @@ pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progre
|
||||
progress.update_progress(sub_step);
|
||||
|
||||
let docids = index.documents_ids(wtxn)?;
|
||||
let mut doc_alloc = bumpalo::Bump::new();
|
||||
|
||||
let db_document_decompression_dictionary =
|
||||
index.document_compression_raw_dictionary(wtxn)?.map(|raw| DecoderDictionary::copy(raw));
|
||||
|
||||
for docid in docids {
|
||||
update_document_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let Some(document) = DocumentFromDb::new(docid, wtxn, index, &field_id_map)? else {
|
||||
let Some(document) = DocumentFromDb::new(
|
||||
docid,
|
||||
wtxn,
|
||||
index,
|
||||
&field_id_map,
|
||||
db_document_decompression_dictionary.as_ref(),
|
||||
&doc_alloc,
|
||||
)?
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let geo_iter = document.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||
@ -31,8 +44,11 @@ pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progre
|
||||
distribution.insert(field_name.to_owned(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
doc_alloc.reset();
|
||||
}
|
||||
|
||||
index.put_field_distribution(wtxn, &distribution)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ use crate::progress::Step;
|
||||
#[repr(u8)]
|
||||
pub enum IndexingStep {
|
||||
PreparingPayloads,
|
||||
PreparingCompressionDictionary,
|
||||
ExtractingDocuments,
|
||||
ExtractingFacets,
|
||||
ExtractingWords,
|
||||
@ -26,6 +27,9 @@ impl Step for IndexingStep {
|
||||
fn name(&self) -> Cow<'static, str> {
|
||||
match self {
|
||||
IndexingStep::PreparingPayloads => "preparing update file",
|
||||
IndexingStep::PreparingCompressionDictionary => {
|
||||
"preparing documents compression dictionary"
|
||||
}
|
||||
IndexingStep::ExtractingDocuments => "extracting documents",
|
||||
IndexingStep::ExtractingFacets => "extracting facets",
|
||||
IndexingStep::ExtractingWords => "extracting words",
|
||||
|
@ -7,6 +7,7 @@ use heed::RoTxn;
|
||||
use rustc_hash::FxBuildHasher;
|
||||
use serde::Serialize;
|
||||
use serde_json::value::RawValue;
|
||||
use zstd::dict::DecoderDictionary;
|
||||
|
||||
use super::document::{Document, DocumentFromDb, DocumentFromVersions, Versions};
|
||||
use super::indexer::de::DeserrRawValue;
|
||||
@ -95,9 +96,18 @@ impl<'t> VectorDocumentFromDb<'t> {
|
||||
index: &'t Index,
|
||||
rtxn: &'t RoTxn,
|
||||
db_fields_ids_map: &'t Mapper,
|
||||
db_document_decompression_dictionary: Option<&'t DecoderDictionary<'static>>,
|
||||
doc_alloc: &'t Bump,
|
||||
) -> Result<Option<Self>> {
|
||||
let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)? else {
|
||||
let Some(document) = DocumentFromDb::new(
|
||||
docid,
|
||||
rtxn,
|
||||
index,
|
||||
db_fields_ids_map,
|
||||
db_document_decompression_dictionary,
|
||||
doc_alloc,
|
||||
)?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let vectors = document.vectors_field()?;
|
||||
@ -281,11 +291,19 @@ impl<'doc> MergedVectorDocument<'doc> {
|
||||
index: &'doc Index,
|
||||
rtxn: &'doc RoTxn,
|
||||
db_fields_ids_map: &'doc Mapper,
|
||||
db_document_decompression_dictionary: Option<&'doc DecoderDictionary<'static>>,
|
||||
versions: &Versions<'doc>,
|
||||
doc_alloc: &'doc Bump,
|
||||
embedders: &'doc EmbeddingConfigs,
|
||||
) -> Result<Option<Self>> {
|
||||
let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
|
||||
let db = VectorDocumentFromDb::new(
|
||||
docid,
|
||||
index,
|
||||
rtxn,
|
||||
db_fields_ids_map,
|
||||
db_document_decompression_dictionary,
|
||||
doc_alloc,
|
||||
)?;
|
||||
let new_doc =
|
||||
VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?;
|
||||
Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) })
|
||||
|
@ -1944,6 +1944,8 @@ mod tests {
|
||||
|
||||
// Check that the searchable field is correctly set to "name" only.
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let mut buffer = Vec::new();
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
// When we search for something that is not in
|
||||
// the searchable fields it must not return any document.
|
||||
let result = index.search(&rtxn).query("23").execute().unwrap();
|
||||
@ -1952,10 +1954,17 @@ mod tests {
|
||||
// When we search for something that is in the searchable fields
|
||||
// we must find the appropriate document.
|
||||
let result = index.search(&rtxn).query(r#""kevin""#).execute().unwrap();
|
||||
let documents = index.documents(&rtxn, result.documents_ids).unwrap();
|
||||
let mut compressed_documents =
|
||||
index.compressed_documents(&rtxn, result.documents_ids).unwrap();
|
||||
let fid_map = index.fields_ids_map(&rtxn).unwrap();
|
||||
assert_eq!(documents.len(), 1);
|
||||
assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
|
||||
assert_eq!(compressed_documents.len(), 1);
|
||||
let (_id, compressed_document) = compressed_documents.remove(0);
|
||||
let document = compressed_document
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(document.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
|
||||
drop(dictionary);
|
||||
drop(rtxn);
|
||||
|
||||
// We change the searchable fields to be the "name" field only.
|
||||
@ -1980,6 +1989,7 @@ mod tests {
|
||||
|
||||
// Check that the searchable field have been reset and documents are found now.
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let fid_map = index.fields_ids_map(&rtxn).unwrap();
|
||||
let user_defined_searchable_fields = index.user_defined_searchable_fields(&rtxn).unwrap();
|
||||
snapshot!(format!("{user_defined_searchable_fields:?}"), @"None");
|
||||
@ -1988,8 +1998,13 @@ mod tests {
|
||||
snapshot!(format!("{searchable_fields:?}"), @r###"["id", "name", "age"]"###);
|
||||
let result = index.search(&rtxn).query("23").execute().unwrap();
|
||||
assert_eq!(result.documents_ids.len(), 1);
|
||||
let documents = index.documents(&rtxn, result.documents_ids).unwrap();
|
||||
assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
|
||||
let mut compressed_documents =
|
||||
index.compressed_documents(&rtxn, result.documents_ids).unwrap();
|
||||
let (_id, compressed_document) = compressed_documents.remove(0);
|
||||
let document = compressed_document
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
assert_eq!(document.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -2120,15 +2135,20 @@ mod tests {
|
||||
|
||||
// Check that the displayed fields are correctly set.
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let mut buffer = Vec::new();
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let fields_ids = index.filterable_fields(&rtxn).unwrap();
|
||||
assert_eq!(fields_ids, hashset! { S("age") });
|
||||
// Only count the field_id 0 and level 0 facet values.
|
||||
// TODO we must support typed CSVs for numbers to be understood.
|
||||
let fidmap = index.fields_ids_map(&rtxn).unwrap();
|
||||
for document in index.all_documents(&rtxn).unwrap() {
|
||||
let document = document.unwrap();
|
||||
let json = crate::obkv_to_json(&fidmap.ids().collect::<Vec<_>>(), &fidmap, document.1)
|
||||
for result in index.all_compressed_documents(&rtxn).unwrap() {
|
||||
let (_id, compressed_document) = result.unwrap();
|
||||
let document = compressed_document
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
let json =
|
||||
crate::obkv_to_json(&fidmap.ids().collect::<Vec<_>>(), &fidmap, document).unwrap();
|
||||
println!("json: {:?}", json);
|
||||
}
|
||||
let count = index
|
||||
@ -2139,6 +2159,7 @@ mod tests {
|
||||
.unwrap()
|
||||
.count();
|
||||
assert_eq!(count, 3);
|
||||
drop(dictionary);
|
||||
drop(rtxn);
|
||||
|
||||
// Index a little more documents with new and current facets values.
|
||||
@ -2228,6 +2249,7 @@ mod tests {
|
||||
#[test]
|
||||
fn set_asc_desc_field() {
|
||||
let index = TempIndex::new();
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
// Set the filterable fields to be the age.
|
||||
index
|
||||
@ -2248,12 +2270,16 @@ mod tests {
|
||||
|
||||
// Run an empty query just to ensure that the search results are ordered.
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let SearchResult { documents_ids, .. } = index.search(&rtxn).execute().unwrap();
|
||||
let documents = index.documents(&rtxn, documents_ids).unwrap();
|
||||
let compressed_documents = index.compressed_documents(&rtxn, documents_ids).unwrap();
|
||||
|
||||
// Fetch the documents "age" field in the ordre in which the documents appear.
|
||||
let age_field_id = index.fields_ids_map(&rtxn).unwrap().id("age").unwrap();
|
||||
let iter = documents.into_iter().map(|(_, doc)| {
|
||||
let iter = compressed_documents.into_iter().map(|(_, compressed_doc)| {
|
||||
let doc = compressed_doc
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
let bytes = doc.get(age_field_id).unwrap();
|
||||
let string = std::str::from_utf8(bytes).unwrap();
|
||||
string.parse::<u32>().unwrap()
|
||||
@ -2645,6 +2671,7 @@ mod tests {
|
||||
#[test]
|
||||
fn setting_impact_relevancy() {
|
||||
let index = TempIndex::new();
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
// Set the genres setting
|
||||
index
|
||||
@ -2676,8 +2703,12 @@ mod tests {
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let SearchResult { documents_ids, .. } = index.search(&rtxn).query("S").execute().unwrap();
|
||||
let first_id = documents_ids[0];
|
||||
let documents = index.documents(&rtxn, documents_ids).unwrap();
|
||||
let (_, content) = documents.iter().find(|(id, _)| *id == first_id).unwrap();
|
||||
let documents = index.compressed_documents(&rtxn, documents_ids).unwrap();
|
||||
let (_, compressed_content) = documents.iter().find(|(id, _)| *id == first_id).unwrap();
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let content = compressed_content
|
||||
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
|
||||
let fid = index.fields_ids_map(&rtxn).unwrap().id("title").unwrap();
|
||||
let line = std::str::from_utf8(content.get(fid).unwrap()).unwrap();
|
||||
@ -2851,7 +2882,7 @@ mod tests {
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
let rtxn = index.write_txn().unwrap();
|
||||
let docs: StdResult<Vec<_>, _> = index.all_documents(&rtxn).unwrap().collect();
|
||||
let docs: StdResult<Vec<_>, _> = index.all_compressed_documents(&rtxn).unwrap().collect();
|
||||
let docs = docs.unwrap();
|
||||
assert_eq!(docs.len(), 5);
|
||||
}
|
||||
|
@ -349,7 +349,20 @@ fn criteria_ascdesc() {
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let documents = index.all_documents(&rtxn).unwrap().map(|doc| doc.unwrap()).collect::<Vec<_>>();
|
||||
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
|
||||
let mut buffers = vec![Vec::new(); index.number_of_documents(&rtxn).unwrap() as usize];
|
||||
let documents = index
|
||||
.all_compressed_documents(&rtxn)
|
||||
.unwrap()
|
||||
.zip(buffers.iter_mut())
|
||||
.map(|(compressed, buffer)| {
|
||||
let (id, compressed) = compressed.unwrap();
|
||||
let doc = compressed
|
||||
.decompress_with_optional_dictionary(buffer, dictionary.as_ref())
|
||||
.unwrap();
|
||||
(id, doc)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for criterion in [Asc(S("name")), Desc(S("name")), Asc(S("age")), Desc(S("age"))] {
|
||||
eprintln!("Testing with criterion: {:?}", &criterion);
|
||||
|
Reference in New Issue
Block a user