Only run word pair proximity docids extraction if proximity_precision enables it

This commit is contained in:
ManyTheFish
2024-10-09 11:35:45 +02:00
committed by Clément Renault
parent 3d29226a7f
commit ad52c950ba
26 changed files with 1977 additions and 2504 deletions

1470
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -33,13 +33,11 @@ use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::new::indexer::{
self, retrieve_or_guess_primary_key, UpdateByFunction,
};
use meilisearch_types::milli::update::{
IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
};
use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSettings};
use meilisearch_types::milli::vector::parsed_vectors::{
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
};
use meilisearch_types::milli::{self, Filter, Object};
use meilisearch_types::milli::{self, Filter};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
@ -50,7 +48,7 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///

View File

@ -11,6 +11,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
insta = { version = "^1.39.0", features = ["json", "redactions"] }
insta = { version = "=1.39.0", features = ["json", "redactions"] }
md5 = "0.7.0"
once_cell = "1.19"

View File

@ -94,10 +94,12 @@ tracing = "0.1.40"
ureq = { version = "2.10.0", features = ["json"] }
url = "2.5.2"
rayon-par-bridge = "0.1.0"
hashbrown = "0.14.5"
hashbrown = "0.15.0"
raw-collections = { git = "https://github.com/dureuill/raw-collections.git", version = "0.1.0" }
bumpalo = "3.16.0"
thread_local = "1.1.8"
allocator-api2 = "0.2.18"
rustc-hash = "2.0.0"
[dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false }

View File

@ -83,6 +83,8 @@ pub fn writer_into_reader(
grenad::Reader::new(BufReader::new(file)).map_err(Into::into)
}
/// # Safety
/// We use memory mapping inside. So, according to the Rust community, it's unsafe.
pub unsafe fn as_cloneable_grenad(
reader: &grenad::Reader<BufReader<File>>,
) -> Result<grenad::Reader<CursorClonableMmap>> {

View File

@ -1,42 +1,33 @@
use std::fs::File;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use grenad::Merger;
use heed::types::Bytes;
use memmap2::Mmap;
use roaring::RoaringBitmap;
use super::extract::FacetKind;
use super::StdResult;
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::index::main_key::DOCUMENTS_IDS_KEY;
use crate::update::new::KvReaderFieldId;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{DocumentId, Index};
/// The capacity of the channel is currently in number of messages.
pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) {
pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver) {
let (sender, receiver) = crossbeam_channel::bounded(cap);
(
MergerSender {
ExtractorSender {
sender,
send_count: Default::default(),
writer_contentious_count: Default::default(),
merger_contentious_count: Default::default(),
extractor_contentious_count: Default::default(),
},
WriterReceiver(receiver),
)
}
/// The capacity of the channel is currently in number of messages.
pub fn extractors_merger_channels(cap: usize) -> (ExtractorSender, MergerReceiver) {
let (sender, receiver) = crossbeam_channel::bounded(cap);
(ExtractorSender(sender), MergerReceiver(receiver))
}
pub enum KeyValueEntry {
SmallInMemory { key_length: usize, data: Box<[u8]> },
LargeOnDisk { key: Box<[u8]>, value: Mmap },
pub struct KeyValueEntry {
pub key_length: usize,
pub data: Box<[u8]>,
}
impl KeyValueEntry {
@ -44,32 +35,22 @@ impl KeyValueEntry {
let mut data = Vec::with_capacity(key.len() + value.len());
data.extend_from_slice(key);
data.extend_from_slice(value);
KeyValueEntry::SmallInMemory { key_length: key.len(), data: data.into_boxed_slice() }
KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() }
}
pub fn from_small_key_bitmap(key: &[u8], bitmap: RoaringBitmap) -> Self {
let mut data = Vec::with_capacity(key.len() + bitmap.serialized_size());
data.extend_from_slice(key);
bitmap.serialize_into(&mut data).unwrap();
KeyValueEntry::SmallInMemory { key_length: key.len(), data: data.into_boxed_slice() }
}
pub fn from_large_key_value(key: &[u8], value: Mmap) -> Self {
KeyValueEntry::LargeOnDisk { key: key.to_vec().into_boxed_slice(), value }
KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() }
}
pub fn key(&self) -> &[u8] {
match self {
KeyValueEntry::SmallInMemory { key_length, data } => &data.as_ref()[..*key_length],
KeyValueEntry::LargeOnDisk { key, value: _ } => key.as_ref(),
}
&self.data[..self.key_length]
}
pub fn value(&self) -> &[u8] {
match self {
KeyValueEntry::SmallInMemory { key_length, data } => &data.as_ref()[*key_length..],
KeyValueEntry::LargeOnDisk { key: _, value } => value.as_ref(),
}
&self.data[self.key_length..]
}
}
@ -92,37 +73,6 @@ pub enum EntryOperation {
Write(KeyValueEntry),
}
pub struct DocumentEntry {
docid: DocumentId,
content: Box<[u8]>,
}
impl DocumentEntry {
pub fn new_uncompressed(docid: DocumentId, content: Box<KvReaderFieldId>) -> Self {
DocumentEntry { docid, content: content.into() }
}
pub fn new_compressed(docid: DocumentId, content: Box<[u8]>) -> Self {
DocumentEntry { docid, content }
}
pub fn key(&self) -> [u8; 4] {
self.docid.to_be_bytes()
}
pub fn content(&self) -> &[u8] {
&self.content
}
}
pub struct DocumentDeletionEntry(DocumentId);
impl DocumentDeletionEntry {
pub fn key(&self) -> [u8; 4] {
self.0.to_be_bytes()
}
}
pub struct WriterOperation {
database: Database,
entry: EntryOperation,
@ -206,34 +156,32 @@ impl IntoIterator for WriterReceiver {
}
}
pub struct MergerSender {
pub struct ExtractorSender {
sender: Sender<WriterOperation>,
/// The number of message we send in total in the channel.
send_count: std::cell::Cell<usize>,
/// The number of message we sent in total in the channel.
send_count: AtomicUsize,
/// The number of times we sent something in a channel that was full.
writer_contentious_count: std::cell::Cell<usize>,
writer_contentious_count: AtomicUsize,
/// The number of times we sent something in a channel that was empty.
merger_contentious_count: std::cell::Cell<usize>,
extractor_contentious_count: AtomicUsize,
}
impl Drop for MergerSender {
impl Drop for ExtractorSender {
fn drop(&mut self) {
let send_count = *self.send_count.get_mut();
let writer_contentious_count = *self.writer_contentious_count.get_mut();
let extractor_contentious_count = *self.extractor_contentious_count.get_mut();
eprintln!(
"Merger channel stats: {} sends, {} writer contentions ({}%), {} merger contentions ({}%)",
self.send_count.get(),
self.writer_contentious_count.get(),
(self.writer_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0,
self.merger_contentious_count.get(),
(self.merger_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0
"Extractor channel stats: {send_count} sends, \
{writer_contentious_count} writer contentions ({}%), \
{extractor_contentious_count} extractor contentions ({}%)",
(writer_contentious_count as f32 / send_count as f32) * 100.0,
(extractor_contentious_count as f32 / send_count as f32) * 100.0
)
}
}
impl MergerSender {
pub fn main(&self) -> MainSender<'_> {
MainSender(self)
}
impl ExtractorSender {
pub fn docids<D: DatabaseType>(&self) -> WordDocidsSender<'_, D> {
WordDocidsSender { sender: self, _marker: PhantomData }
}
@ -263,12 +211,12 @@ impl MergerSender {
fn send(&self, op: WriterOperation) -> StdResult<(), SendError<()>> {
if self.sender.is_full() {
self.writer_contentious_count.set(self.writer_contentious_count.get() + 1);
self.writer_contentious_count.fetch_add(1, Ordering::SeqCst);
}
if self.sender.is_empty() {
self.merger_contentious_count.set(self.merger_contentious_count.get() + 1);
self.extractor_contentious_count.fetch_add(1, Ordering::SeqCst);
}
self.send_count.set(self.send_count.get() + 1);
self.send_count.fetch_add(1, Ordering::SeqCst);
match self.sender.send(op) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
@ -276,129 +224,48 @@ impl MergerSender {
}
}
pub struct MainSender<'a>(&'a MergerSender);
impl MainSender<'_> {
pub fn write_words_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value(
WORDS_FST_KEY.as_bytes(),
value,
));
match self.0.send(WriterOperation { database: Database::Main, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn write_words_prefixes_fst(&self, value: Mmap) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value(
WORDS_PREFIXES_FST_KEY.as_bytes(),
value,
));
match self.0.send(WriterOperation { database: Database::Main, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self.0.send(WriterOperation { database: Database::Main, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
pub enum ExactWordDocids {}
pub enum FidWordCountDocids {}
pub enum WordDocids {}
pub enum WordFidDocids {}
pub enum WordPairProximityDocids {}
pub enum WordPositionDocids {}
pub enum FacetDocids {}
pub trait DatabaseType {
const DATABASE: Database;
}
pub trait MergerOperationType {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation;
}
impl DatabaseType for ExactWordDocids {
const DATABASE: Database = Database::ExactWordDocids;
}
impl MergerOperationType for ExactWordDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::ExactWordDocidsMerger(merger)
}
}
impl DatabaseType for FidWordCountDocids {
const DATABASE: Database = Database::FidWordCountDocids;
}
impl MergerOperationType for FidWordCountDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::FidWordCountDocidsMerger(merger)
}
}
impl DatabaseType for WordDocids {
const DATABASE: Database = Database::WordDocids;
}
impl MergerOperationType for WordDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordDocidsMerger(merger)
}
}
impl DatabaseType for WordFidDocids {
const DATABASE: Database = Database::WordFidDocids;
}
impl MergerOperationType for WordFidDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordFidDocidsMerger(merger)
}
}
impl DatabaseType for WordPairProximityDocids {
const DATABASE: Database = Database::WordPairProximityDocids;
}
impl MergerOperationType for WordPairProximityDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordPairProximityDocidsMerger(merger)
}
}
impl DatabaseType for WordPositionDocids {
const DATABASE: Database = Database::WordPositionDocids;
}
impl MergerOperationType for WordPositionDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordPositionDocidsMerger(merger)
}
}
impl MergerOperationType for FacetDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::FacetDocidsMerger(merger)
}
}
pub trait DocidsSender {
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>>;
fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>>;
}
pub struct WordDocidsSender<'a, D> {
sender: &'a MergerSender,
sender: &'a ExtractorSender,
_marker: PhantomData<D>,
}
@ -421,7 +288,7 @@ impl<D: DatabaseType> DocidsSender for WordDocidsSender<'_, D> {
}
pub struct FacetDocidsSender<'a> {
sender: &'a MergerSender,
sender: &'a ExtractorSender,
}
impl DocidsSender for FacetDocidsSender<'_> {
@ -456,7 +323,7 @@ impl DocidsSender for FacetDocidsSender<'_> {
}
pub struct FacetSearchableSender<'a> {
sender: &'a MergerSender,
sender: &'a ExtractorSender,
}
impl FacetSearchableSender<'_> {
@ -481,25 +348,9 @@ impl FacetSearchableSender<'_> {
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn write_fst(&self, key: &[u8], value: Mmap) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value(key, value));
match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete_fst(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
pub struct DocumentsSender<'a>(&'a MergerSender);
pub struct DocumentsSender<'a>(&'a ExtractorSender);
impl DocumentsSender<'_> {
/// TODO do that efficiently
@ -542,86 +393,3 @@ impl DocumentsSender<'_> {
}
}
}
pub enum MergerOperation {
ExactWordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
FidWordCountDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordFidDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordPairProximityDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordPositionDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
FacetDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
DeleteDocument { docid: DocumentId, external_id: String },
InsertDocument { docid: DocumentId, external_id: String, document: Box<KvReaderFieldId> },
FinishedDocument,
}
pub struct MergerReceiver(Receiver<MergerOperation>);
impl IntoIterator for MergerReceiver {
type Item = MergerOperation;
type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
pub struct ExtractorSender(Sender<MergerOperation>);
impl ExtractorSender {
pub fn document_sender(&self) -> DocumentSender<'_> {
DocumentSender(Some(&self.0))
}
pub fn send_searchable<D: MergerOperationType>(
&self,
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
) -> StdResult<(), SendError<()>> {
match self.0.send(D::new_merger_operation(merger)) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
pub struct DocumentSender<'a>(Option<&'a Sender<MergerOperation>>);
impl DocumentSender<'_> {
pub fn insert(
&self,
docid: DocumentId,
external_id: String,
document: Box<KvReaderFieldId>,
) -> StdResult<(), SendError<()>> {
let sender = self.0.unwrap();
match sender.send(MergerOperation::InsertDocument { docid, external_id, document }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete(&self, docid: DocumentId, external_id: String) -> StdResult<(), SendError<()>> {
let sender = self.0.unwrap();
match sender.send(MergerOperation::DeleteDocument { docid, external_id }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn finish(mut self) -> StdResult<(), SendError<()>> {
let sender = self.0.take().unwrap();
match sender.send(MergerOperation::FinishedDocument) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
impl Drop for DocumentSender<'_> {
fn drop(&mut self) {
if let Some(sender) = self.0.take() {
let _ = sender.send(MergerOperation::FinishedDocument);
}
}
}

View File

@ -1,299 +1,611 @@
use std::fmt::Write as _;
use std::mem;
use std::num::NonZeroUsize;
//! # How the Merge Algorithm works
//!
//! Each extractor create #Threads caches and balances the entries
//! based on the hash of the keys. To do that we can use the
//! hashbrown::hash_map::RawEntryBuilderMut::from_key_hashed_nocheck.
//! This way we can compute the hash on our own, decide on the cache to
//! target, and insert it into the right HashMap.
//!
//! #Thread -> caches
//! t1 -> [t1c1, t1c2, t1c3]
//! t2 -> [t2c1, t2c2, t2c3]
//! t3 -> [t3c1, t3c2, t3c3]
//!
//! When the extractors are done filling the caches, we want to merge
//! the content of all the caches. We do a transpose and each thread is
//! assigned the associated cache. By doing that we know that every key
//! is put in a known cache and will collide with keys in the other
//! caches of the other threads.
//!
//! #Thread -> caches
//! t1 -> [t1c1, t2c1, t3c1]
//! t2 -> [t1c2, t2c2, t3c2]
//! t3 -> [t1c3, t2c3, t3c3]
//!
//! When we encountered a miss in the other caches we must still try
//! to find it in the spilled entries. This is the reason why we use
//! a grenad sorter/reader so that we can seek "efficiently" for a key.
//!
//! ## More Detailled Algorithm
//!
//! Each sub-cache has an in-memory HashMap and some spilled
//! lexicographically ordered entries on disk (grenad). We first iterate
//! over the spilled entries of all the caches at once by using a merge
//! join algorithm. This algorithm will merge the entries by using its
//! merge function.
//!
//! Everytime a merged entry is emited by the merge join algorithm we also
//! fetch the value from the other in-memory caches (HashMaps) to finish
//! the merge. Everytime we retrieve an entry from the in-memory caches
//! we mark them with a tombstone for later.
//!
//! Once we are done with the spilled entries we iterate over the in-memory
//! HashMaps. We iterate over the first one, retrieve the content from the
//! other onces and mark them with a tombstone again. We also make sure
//! to ignore the dead (tombstoned) ones.
//!
//! ## Memory Control
//!
//! We can detect that there are no more memory available when the
//! bump allocator reaches a threshold. When this is the case we
//! freeze the cache. There is one bump allocator by thread and the
//! memory must be well balanced as we manage one type of extraction
//! at a time with well-balanced documents.
//!
//! It means that the unknown new keys added to the
//! cache are directly spilled to disk: basically a key followed by a
//! del/add bitmap. For the known keys we can keep modifying them in
//! the materialized version in the cache: update the del/add bitmaps.
//!
//! For now we can use a grenad sorter for spilling even thought I think
//! it's not the most efficient way (too many files open, sorting entries).
use grenad::{MergeFunction, Sorter};
use roaring::bitmap::Statistics;
use std::cmp::Ordering;
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use std::fs::File;
use std::hash::BuildHasher;
use std::io::BufReader;
use std::{io, iter, mem};
use bumpalo::Bump;
use grenad::ReaderCursor;
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use raw_collections::map::FrozenMap;
use roaring::RoaringBitmap;
use smallvec::SmallVec;
use rustc_hash::FxBuildHasher;
use super::lru::Lru;
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::CboRoaringBitmapCodec;
use crate::update::new::indexer::document_changes::MostlySend;
use crate::update::new::KvReaderDelAdd;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Result};
const KEY_SIZE: usize = 12;
/// A cache that stores bytes keys associated to CboDelAddRoaringBitmaps.
///
/// Internally balances the content over `N` buckets for future merging.
pub struct BalancedCaches<'extractor> {
hasher: FxBuildHasher,
alloc: &'extractor Bump,
max_memory: Option<usize>,
caches: InnerCaches<'extractor>,
}
#[derive(Debug)]
pub struct CboCachedSorter<MF> {
cache: Lru<SmallVec<[u8; KEY_SIZE]>, DelAddRoaringBitmap>,
sorter: Sorter<MF>,
enum InnerCaches<'extractor> {
Normal(NormalCaches<'extractor>),
Spilling(SpillingCaches<'extractor>),
}
impl<'extractor> BalancedCaches<'extractor> {
pub fn new_in(buckets: usize, max_memory: Option<usize>, alloc: &'extractor Bump) -> Self {
Self {
hasher: FxBuildHasher,
max_memory,
caches: InnerCaches::Normal(NormalCaches {
caches: iter::repeat_with(|| HashMap::with_hasher_in(FxBuildHasher, alloc))
.take(buckets)
.collect(),
}),
alloc,
}
}
fn buckets(&self) -> usize {
match &self.caches {
InnerCaches::Normal(caches) => caches.caches.len(),
InnerCaches::Spilling(caches) => caches.caches.len(),
}
}
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> Result<()> {
if self.max_memory.map_or(false, |mm| self.alloc.allocated_bytes() >= mm) {
self.start_spilling()?;
}
let buckets = self.buckets();
match &mut self.caches {
InnerCaches::Normal(normal) => {
normal.insert_del_u32(&self.hasher, self.alloc, buckets, key, n);
Ok(())
}
InnerCaches::Spilling(spilling) => {
spilling.insert_del_u32(&self.hasher, buckets, key, n)
}
}
}
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> Result<()> {
if self.max_memory.map_or(false, |mm| self.alloc.allocated_bytes() >= mm) {
self.start_spilling()?;
}
let buckets = self.buckets();
match &mut self.caches {
InnerCaches::Normal(normal) => {
normal.insert_add_u32(&self.hasher, self.alloc, buckets, key, n);
Ok(())
}
InnerCaches::Spilling(spilling) => {
spilling.insert_add_u32(&self.hasher, buckets, key, n)
}
}
}
/// Make sure the cache is no longer allocating data
/// and writes every new and unknow entry to disk.
fn start_spilling(&mut self) -> Result<()> {
let BalancedCaches { hasher: _, alloc, max_memory: _, caches } = self;
if let InnerCaches::Normal(normal_caches) = caches {
eprintln!(
"We are spilling after we allocated {} bytes on thread #{}",
alloc.allocated_bytes(),
rayon::current_thread_index().unwrap_or(0)
);
let allocated: usize = normal_caches.caches.iter().map(|m| m.allocation_size()).sum();
eprintln!("The last allocated HasMap took {allocated} bytes");
let dummy = NormalCaches { caches: Vec::new() };
let NormalCaches { caches: cache_maps } = mem::replace(normal_caches, dummy);
*caches = InnerCaches::Spilling(SpillingCaches::from_cache_maps(cache_maps));
}
Ok(())
}
pub fn freeze(&mut self) -> Result<Vec<FrozenCache<'_, 'extractor>>> {
match &mut self.caches {
InnerCaches::Normal(NormalCaches { caches }) => caches
.iter_mut()
.enumerate()
.map(|(bucket, map)| {
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() })
})
.collect(),
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches
.iter_mut()
.zip(mem::take(spilled_entries))
.enumerate()
.map(|(bucket, (map, sorter))| {
let spilled = sorter
.into_reader_cursors()?
.into_iter()
.map(ReaderCursor::into_inner)
.map(BufReader::new)
.map(|bufreader| grenad::Reader::new(bufreader).map_err(Into::into))
.collect::<Result<_>>()?;
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled })
})
.collect(),
}
}
}
unsafe impl MostlySend for BalancedCaches<'_> {}
struct NormalCaches<'extractor> {
caches: Vec<HashMap<&'extractor [u8], DelAddRoaringBitmap, FxBuildHasher, &'extractor Bump>>,
}
impl<'extractor> NormalCaches<'extractor> {
pub fn insert_del_u32(
&mut self,
hasher: &FxBuildHasher,
alloc: &'extractor Bump,
buckets: usize,
key: &[u8],
n: u32,
) {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n);
}
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(
hash,
alloc.alloc_slice_copy(key),
DelAddRoaringBitmap::new_del_u32(n),
);
}
}
}
pub fn insert_add_u32(
&mut self,
hasher: &FxBuildHasher,
alloc: &'extractor Bump,
buckets: usize,
key: &[u8],
n: u32,
) {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n);
}
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(
hash,
alloc.alloc_slice_copy(key),
DelAddRoaringBitmap::new_add_u32(n),
);
}
}
}
}
struct SpillingCaches<'extractor> {
caches: Vec<HashMap<&'extractor [u8], DelAddRoaringBitmap, FxBuildHasher, &'extractor Bump>>,
spilled_entries: Vec<grenad::Sorter<MergeDeladdCboRoaringBitmaps>>,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
total_insertions: usize,
fitted_in_key: usize,
}
impl<MF> CboCachedSorter<MF> {
pub fn new(cap: NonZeroUsize, sorter: Sorter<MF>) -> Self {
CboCachedSorter {
cache: Lru::new(cap),
sorter,
impl<'extractor> SpillingCaches<'extractor> {
fn from_cache_maps(
caches: Vec<
HashMap<&'extractor [u8], DelAddRoaringBitmap, FxBuildHasher, &'extractor Bump>,
>,
) -> SpillingCaches<'extractor> {
SpillingCaches {
spilled_entries: iter::repeat_with(|| {
let mut builder = grenad::SorterBuilder::new(MergeDeladdCboRoaringBitmaps);
builder.dump_threshold(0);
builder.allow_realloc(false);
builder.build()
})
.take(caches.len())
.collect(),
caches,
deladd_buffer: Vec::new(),
cbo_buffer: Vec::new(),
total_insertions: 0,
fitted_in_key: 0,
}
}
}
impl<MF: MergeFunction> CboCachedSorter<MF> {
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> {
match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del, add: _ }) => {
del.get_or_insert_with(RoaringBitmap::default).insert(n);
}
None => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_del_u32(n);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry(key, deladd)?;
}
}
}
Ok(())
}
pub fn insert_del(
pub fn insert_del_u32(
&mut self,
hasher: &FxBuildHasher,
buckets: usize,
key: &[u8],
bitmap: RoaringBitmap,
) -> grenad::Result<(), MF::Error> {
match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del, add: _ }) => {
*del.get_or_insert_with(RoaringBitmap::default) |= bitmap;
}
None => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_del(bitmap);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry(key, deladd)?;
}
}
}
n: u32,
) -> Result<()> {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n);
Ok(())
}
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> {
match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del: _, add }) => {
add.get_or_insert_with(RoaringBitmap::default).insert(n);
}
None => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_add_u32(n);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry(key, deladd)?;
RawEntryMut::Vacant(_entry) => {
let deladd = DelAddRoaringBitmap::new_del_u32(n);
spill_entry_to_sorter(
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
key,
deladd,
)
}
}
}
Ok(())
}
pub fn insert_add(
pub fn insert_add_u32(
&mut self,
hasher: &FxBuildHasher,
buckets: usize,
key: &[u8],
bitmap: RoaringBitmap,
) -> grenad::Result<(), MF::Error> {
match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del: _, add }) => {
*add.get_or_insert_with(RoaringBitmap::default) |= bitmap;
}
None => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_add(bitmap);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry(key, deladd)?;
}
}
}
n: u32,
) -> Result<()> {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n);
Ok(())
}
pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> {
match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del, add }) => {
del.get_or_insert_with(RoaringBitmap::default).insert(n);
add.get_or_insert_with(RoaringBitmap::default).insert(n);
RawEntryMut::Vacant(_entry) => {
let deladd = DelAddRoaringBitmap::new_add_u32(n);
spill_entry_to_sorter(
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
key,
deladd,
)
}
None => {
self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_del_add_u32(n);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry(key, deladd)?;
}
}
}
Ok(())
#[inline]
fn compute_bucket_from_hash(buckets: usize, hash: u64) -> usize {
hash as usize % buckets
}
fn write_entry<A: AsRef<[u8]>>(
&mut self,
key: A,
fn spill_entry_to_sorter(
spilled_entries: &mut grenad::Sorter<MergeDeladdCboRoaringBitmaps>,
deladd_buffer: &mut Vec<u8>,
cbo_buffer: &mut Vec<u8>,
key: &[u8],
deladd: DelAddRoaringBitmap,
) -> grenad::Result<(), MF::Error> {
/// TODO we must create a serialization trait to correctly serialize bitmaps
self.deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer);
) -> Result<()> {
deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(deladd_buffer);
match deladd {
DelAddRoaringBitmap { del: Some(del), add: None } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: Some(add) } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
}
let bytes = value_writer.into_inner().unwrap();
self.sorter.insert(key, bytes)
spilled_entries.insert(key, bytes).map_err(Into::into)
}
pub fn direct_insert(&mut self, key: &[u8], val: &[u8]) -> grenad::Result<(), MF::Error> {
self.sorter.insert(key, val)
pub struct FrozenCache<'a, 'extractor> {
bucket: usize,
cache: FrozenMap<'a, 'extractor, &'extractor [u8], DelAddRoaringBitmap, FxBuildHasher>,
spilled: Vec<grenad::Reader<BufReader<File>>>,
}
pub fn into_sorter(mut self) -> grenad::Result<Sorter<MF>, MF::Error> {
let mut all_n_containers = Vec::new();
let mut all_n_array_containers = Vec::new();
let mut all_n_bitset_containers = Vec::new();
let mut all_n_values_array_containers = Vec::new();
let mut all_n_values_bitset_containers = Vec::new();
let mut all_cardinality = Vec::new();
pub fn transpose_and_freeze_caches<'a, 'extractor>(
caches: &'a mut [BalancedCaches<'extractor>],
) -> Result<Vec<Vec<FrozenCache<'a, 'extractor>>>> {
let width = caches.first().map(BalancedCaches::buckets).unwrap_or(0);
let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect();
let default_arc = Lru::new(NonZeroUsize::MIN);
for (key, deladd) in mem::replace(&mut self.cache, default_arc) {
for bitmap in [&deladd.del, &deladd.add].into_iter().flatten() {
let Statistics {
n_containers,
n_array_containers,
n_bitset_containers,
n_values_array_containers,
n_values_bitset_containers,
cardinality,
..
} = bitmap.statistics();
all_n_containers.push(n_containers);
all_n_array_containers.push(n_array_containers);
all_n_bitset_containers.push(n_bitset_containers);
all_n_values_array_containers.push(n_values_array_containers);
all_n_values_bitset_containers.push(n_values_bitset_containers);
all_cardinality.push(cardinality as u32);
}
self.write_entry(key, deladd)?;
}
let mut output = String::new();
for (name, mut slice) in [
("n_containers", all_n_containers),
("n_array_containers", all_n_array_containers),
("n_bitset_containers", all_n_bitset_containers),
("n_values_array_containers", all_n_values_array_containers),
("n_values_bitset_containers", all_n_values_bitset_containers),
("cardinality", all_cardinality),
] {
let _ = writeln!(&mut output, "{name} (p100) {:?}", Stats::from_slice(&mut slice));
// let _ = writeln!(&mut output, "{name} (p99) {:?}", Stats::from_slice_p99(&mut slice));
}
let _ = writeln!(
&mut output,
"LruCache stats: {} <= {KEY_SIZE} bytes ({}%) on a total of {} insertions",
self.fitted_in_key,
(self.fitted_in_key as f32 / self.total_insertions as f32) * 100.0,
self.total_insertions,
);
eprintln!("{output}");
Ok(self.sorter)
for thread_cache in caches {
for frozen in thread_cache.freeze()? {
bucket_caches[frozen.bucket].push(frozen);
}
}
#[derive(Default, Debug)]
struct Stats {
pub len: usize,
pub average: f32,
pub mean: u32,
pub min: u32,
pub max: u32,
Ok(bucket_caches)
}
impl Stats {
fn from_slice(slice: &mut [u32]) -> Stats {
slice.sort_unstable();
Self::from_sorted_slice(slice)
/// Merges the caches that must be all associated to the same bucket.
///
/// # Panics
///
/// - If the bucket IDs in these frozen caches are not exactly the same.
pub fn merge_caches<F>(frozen: Vec<FrozenCache>, mut f: F) -> Result<()>
where
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
{
let mut maps = Vec::new();
let mut readers = Vec::new();
let mut current_bucket = None;
for FrozenCache { bucket, cache, ref mut spilled } in frozen {
assert_eq!(*current_bucket.get_or_insert(bucket), bucket);
maps.push(cache);
readers.append(spilled);
}
fn from_slice_p99(slice: &mut [u32]) -> Stats {
slice.sort_unstable();
let new_len = slice.len() - (slice.len() as f32 / 100.0) as usize;
match slice.get(..new_len) {
Some(slice) => Self::from_sorted_slice(slice),
None => Stats::default(),
// First manage the spilled entries by looking into the HashMaps,
// merge them and mark them as dummy.
let mut heap = BinaryHeap::new();
for (source_index, source) in readers.into_iter().enumerate() {
let mut cursor = source.into_cursor()?;
if cursor.move_on_next()?.is_some() {
heap.push(Entry { cursor, source_index });
}
}
fn from_sorted_slice(slice: &[u32]) -> Stats {
let sum: f64 = slice.iter().map(|i| *i as f64).sum();
let average = (sum / slice.len() as f64) as f32;
let mean = *slice.len().checked_div(2).and_then(|middle| slice.get(middle)).unwrap_or(&0);
let min = *slice.first().unwrap_or(&0);
let max = *slice.last().unwrap_or(&0);
Stats { len: slice.len(), average, mean, min, max }
loop {
let mut first_entry = match heap.pop() {
Some(entry) => entry,
None => break,
};
let (first_key, first_value) = match first_entry.cursor.current() {
Some((key, value)) => (key, value),
None => break,
};
let mut output = DelAddRoaringBitmap::from_bytes(first_value)?;
while let Some(mut entry) = heap.peek_mut() {
if let Some((key, _value)) = entry.cursor.current() {
if first_key == key {
let new = DelAddRoaringBitmap::from_bytes(first_value)?;
output = output.merge(new);
// When we are done we the current value of this entry move make
// it move forward and let the heap reorganize itself (on drop)
if entry.cursor.move_on_next()?.is_none() {
PeekMut::pop(entry);
}
} else {
break;
}
}
}
#[derive(Debug, Clone)]
// Once we merged all of the spilled bitmaps we must also
// fetch the entries from the non-spilled entries (the HashMaps).
for (map_index, map) in maps.iter_mut().enumerate() {
if first_entry.source_index != map_index {
if let Some(new) = map.get_mut(first_key) {
output = output.merge(mem::take(new));
}
}
}
// We send the merged entry outside.
(f)(first_key, output)?;
// Don't forget to put the first entry back into the heap.
if first_entry.cursor.move_on_next()?.is_some() {
heap.push(first_entry)
}
}
// Then manage the content on the HashMap entries that weren't taken (mem::take).
while let Some(mut map) = maps.pop() {
for (key, output) in map.iter_mut() {
let mut output = mem::take(output);
// Make sure we don't try to work with entries already managed by the spilled
if !output.is_empty() {
for rhs in maps.iter_mut() {
if let Some(new) = rhs.get_mut(key) {
output = output.merge(mem::take(new));
}
}
// We send the merged entry outside.
(f)(key, output)?;
}
}
}
Ok(())
}
struct Entry<R> {
cursor: ReaderCursor<R>,
source_index: usize,
}
impl<R> Ord for Entry<R> {
fn cmp(&self, other: &Entry<R>) -> Ordering {
let skey = self.cursor.current().map(|(k, _)| k);
let okey = other.cursor.current().map(|(k, _)| k);
skey.cmp(&okey).then(self.source_index.cmp(&other.source_index)).reverse()
}
}
impl<R> Eq for Entry<R> {}
impl<R> PartialEq for Entry<R> {
fn eq(&self, other: &Entry<R>) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<R> PartialOrd for Entry<R> {
fn partial_cmp(&self, other: &Entry<R>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Default, Clone)]
pub struct DelAddRoaringBitmap {
pub(crate) del: Option<RoaringBitmap>,
pub(crate) add: Option<RoaringBitmap>,
pub del: Option<RoaringBitmap>,
pub add: Option<RoaringBitmap>,
}
impl DelAddRoaringBitmap {
fn new_del_add_u32(n: u32) -> Self {
DelAddRoaringBitmap {
del: Some(RoaringBitmap::from([n])),
add: Some(RoaringBitmap::from([n])),
}
fn from_bytes(bytes: &[u8]) -> io::Result<DelAddRoaringBitmap> {
let reader = KvReaderDelAdd::from_slice(bytes);
let del = match reader.get(DelAdd::Deletion) {
Some(bytes) => CboRoaringBitmapCodec::deserialize_from(bytes).map(Some)?,
None => None,
};
let add = match reader.get(DelAdd::Addition) {
Some(bytes) => CboRoaringBitmapCodec::deserialize_from(bytes).map(Some)?,
None => None,
};
Ok(DelAddRoaringBitmap { del, add })
}
fn new_del(bitmap: RoaringBitmap) -> Self {
DelAddRoaringBitmap { del: Some(bitmap), add: None }
pub fn empty() -> DelAddRoaringBitmap {
DelAddRoaringBitmap { del: None, add: None }
}
fn new_del_u32(n: u32) -> Self {
pub fn is_empty(&self) -> bool {
let DelAddRoaringBitmap { del, add } = self;
del.is_none() && add.is_none()
}
pub fn insert_del_u32(&mut self, n: u32) {
self.del.get_or_insert_with(RoaringBitmap::new).insert(n);
}
pub fn insert_add_u32(&mut self, n: u32) {
self.add.get_or_insert_with(RoaringBitmap::new).insert(n);
}
pub fn new_del_u32(n: u32) -> Self {
DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: None }
}
fn new_add(bitmap: RoaringBitmap) -> Self {
DelAddRoaringBitmap { del: None, add: Some(bitmap) }
}
fn new_add_u32(n: u32) -> Self {
pub fn new_add_u32(n: u32) -> Self {
DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) }
}
pub fn merge(self, rhs: DelAddRoaringBitmap) -> DelAddRoaringBitmap {
let DelAddRoaringBitmap { del, add } = self;
let DelAddRoaringBitmap { del: ndel, add: nadd } = rhs;
let del = match (del, ndel) {
(None, None) => None,
(None, Some(del)) | (Some(del), None) => Some(del),
(Some(del), Some(ndel)) => Some(del | ndel),
};
let add = match (add, nadd) {
(None, None) => None,
(None, Some(add)) | (Some(add), None) => Some(add),
(Some(add), Some(nadd)) => Some(add | nadd),
};
DelAddRoaringBitmap { del, add }
}
pub fn apply_to(&self, documents_ids: &mut RoaringBitmap) {
let DelAddRoaringBitmap { del, add } = self;
if let Some(del) = del {
*documents_ids -= del;
}
if let Some(add) = add {
*documents_ids |= add;
}
}
}

View File

@ -0,0 +1,73 @@
use std::cell::RefCell;
use bumpalo::Bump;
use super::DelAddRoaringBitmap;
use crate::update::new::channel::DocumentsSender;
use crate::update::new::document::write_to_obkv;
use crate::update::new::indexer::document_changes::{
DocumentChangeContext, Extractor, FullySend, RefCellExt as _,
};
use crate::update::new::DocumentChange;
use crate::Result;
pub struct DocumentsExtractor<'a> {
documents_sender: &'a DocumentsSender<'a>,
}
impl<'a> DocumentsExtractor<'a> {
pub fn new(documents_sender: &'a DocumentsSender<'a>) -> Self {
Self { documents_sender }
}
}
impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
type Data = FullySend<RefCell<DelAddRoaringBitmap>>;
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(FullySend(RefCell::new(DelAddRoaringBitmap::empty())))
}
fn process(
&self,
change: DocumentChange,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
let mut document_buffer = Vec::new();
let mut delta_documents_ids = context.data.0.borrow_mut_or_yield();
let new_fields_ids_map = context.new_fields_ids_map.borrow_or_yield();
let new_fields_ids_map = &*new_fields_ids_map;
let new_fields_ids_map = new_fields_ids_map.local_map();
let external_docid = change.external_docid().to_owned();
// document but we need to create a function that collects and compresses documents.
match change {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
self.documents_sender.delete(docid, external_docid).unwrap();
delta_documents_ids.insert_del_u32(docid);
}
/// TODO: change NONE by SOME(vector) when implemented
DocumentChange::Update(update) => {
let docid = update.docid();
let content =
update.new(&context.txn, context.index, &context.db_fields_ids_map)?;
let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.documents_sender.uncompressed(docid, external_docid, content).unwrap();
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
let content = insertion.new();
let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.documents_sender.uncompressed(docid, external_docid, content).unwrap();
delta_documents_ids.insert_add_u32(docid);
// extracted_dictionary_sender.send(self, dictionary: &[u8]);
}
}
Ok(())
}
}

View File

@ -1,16 +1,12 @@
use std::cell::RefCell;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fs::File;
use std::ops::DerefMut as _;
use bumpalo::Bump;
use grenad::{MergeFunction, Merger};
use heed::RoTxn;
use rayon::iter::{ParallelBridge as _, ParallelIterator as _};
use serde_json::Value;
use super::super::cache::CboCachedSorter;
use super::super::cache::BalancedCaches;
use super::facet_document::extract_document_facets;
use super::FacetKind;
use crate::facet::value_encoding::f64_into_bytes;
@ -20,44 +16,30 @@ use crate::update::new::indexer::document_changes::{
IndexingContext, RefCellExt, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::update::GrenadParameters;
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedExtractorData<'extractor> {
attributes_to_extract: &'extractor [&'extractor str],
pub struct FacetedExtractorData<'a> {
attributes_to_extract: &'a [&'a str],
grenad_parameters: GrenadParameters,
max_memory: Option<usize>,
buckets: usize,
}
impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> {
type Data = FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>;
impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> {
type Data = RefCell<BalancedCaches<'extractor>>;
fn init_data(
&self,
_extractor_alloc: raw_collections::alloc::RefBump<'extractor>,
) -> Result<Self::Data> {
Ok(FullySend(RefCell::new(CboCachedSorter::new(
// TODO use a better value
1_000_000.try_into().unwrap(),
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
self.grenad_parameters.chunk_compression_type,
self.grenad_parameters.chunk_compression_level,
self.grenad_parameters.max_nb_chunks,
self.max_memory,
// *NOTE*: this must not be set to true:
// 1. we're already using max parallelism in the pool, so it wouldn't help
// 2. it creates correctness issues if it causes to yield a borrow-mut wielding task
false,
),
))))
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(BalancedCaches::new_in(
self.buckets,
self.grenad_parameters.max_memory,
extractor_alloc,
)))
}
fn process(
&self,
change: DocumentChange,
context: &crate::update::new::indexer::document_changes::DocumentChangeContext<Self::Data>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
FacetedDocidsExtractor::extract_document_change(context, self.attributes_to_extract, change)
}
@ -67,16 +49,14 @@ pub struct FacetedDocidsExtractor;
impl FacetedDocidsExtractor {
fn extract_document_change(
context: &DocumentChangeContext<
FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>,
>,
context: &DocumentChangeContext<RefCell<BalancedCaches>>,
attributes_to_extract: &[&str],
document_change: DocumentChange,
) -> Result<()> {
let index = &context.index;
let rtxn = &context.txn;
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let mut cached_sorter = context.data.0.borrow_mut_or_yield();
let mut cached_sorter = context.data.borrow_mut_or_yield();
match document_change {
DocumentChange::Deletion(inner) => extract_document_facets(
attributes_to_extract,
@ -86,7 +66,7 @@ impl FacetedDocidsExtractor {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
CboCachedSorter::insert_del_u32,
BalancedCaches::insert_del_u32,
inner.docid(),
fid,
value,
@ -102,7 +82,7 @@ impl FacetedDocidsExtractor {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
CboCachedSorter::insert_del_u32,
BalancedCaches::insert_del_u32,
inner.docid(),
fid,
value,
@ -118,7 +98,7 @@ impl FacetedDocidsExtractor {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
CboCachedSorter::insert_add_u32,
BalancedCaches::insert_add_u32,
inner.docid(),
fid,
value,
@ -134,7 +114,7 @@ impl FacetedDocidsExtractor {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
CboCachedSorter::insert_add_u32,
BalancedCaches::insert_add_u32,
inner.docid(),
fid,
value,
@ -144,25 +124,20 @@ impl FacetedDocidsExtractor {
}
}
fn facet_fn_with_options<MF>(
fn facet_fn_with_options<'extractor>(
doc_alloc: &Bump,
cached_sorter: &mut CboCachedSorter<MF>,
cache_fn: impl Fn(&mut CboCachedSorter<MF>, &[u8], u32) -> grenad::Result<(), MF::Error>,
cached_sorter: &mut BalancedCaches<'extractor>,
cache_fn: impl Fn(&mut BalancedCaches<'extractor>, &[u8], u32) -> Result<()>,
docid: DocumentId,
fid: FieldId,
value: &Value,
) -> Result<()>
where
MF: MergeFunction,
MF::Error: Debug,
grenad::Error<MF::Error>: Into<crate::Error>,
{
) -> Result<()> {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
// Exists
// key: fid
buffer.push(FacetKind::Exists as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)?;
cache_fn(cached_sorter, &buffer, docid)?;
match value {
// Number
@ -177,8 +152,7 @@ impl FacetedDocidsExtractor {
buffer.push(0); // level 0
buffer.extend_from_slice(&ordered);
buffer.extend_from_slice(&n.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)
cache_fn(cached_sorter, &buffer, docid)
} else {
Ok(())
}
@ -193,7 +167,7 @@ impl FacetedDocidsExtractor {
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(0); // level 0
buffer.extend_from_slice(truncated.as_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)
cache_fn(cached_sorter, &buffer, docid)
}
// Null
// key: fid
@ -201,7 +175,7 @@ impl FacetedDocidsExtractor {
buffer.clear();
buffer.push(FacetKind::Null as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)
cache_fn(cached_sorter, &buffer, docid)
}
// Empty
// key: fid
@ -209,13 +183,13 @@ impl FacetedDocidsExtractor {
buffer.clear();
buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)
cache_fn(cached_sorter, &buffer, docid)
}
Value::Object(o) if o.is_empty() => {
buffer.clear();
buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)
cache_fn(cached_sorter, &buffer, docid)
}
// Otherwise, do nothing
/// TODO: What about Value::Bool?
@ -242,16 +216,13 @@ fn truncate_str(s: &str) -> &str {
impl DocidsExtractor for FacetedDocidsExtractor {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
let max_memory = grenad_parameters.max_memory_by_thread();
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<BalancedCaches<'extractor>>> {
let index = indexing_context.index;
let rtxn = index.read_txn()?;
let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?;
let attributes_to_extract: Vec<_> =
@ -266,7 +237,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
let extractor = FacetedExtractorData {
attributes_to_extract: &attributes_to_extract,
grenad_parameters,
max_memory,
buckets: rayon::current_num_threads(),
};
for_each_document_change(
document_changes,
@ -276,26 +247,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
&datastore,
)?;
}
{
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
let readers: Vec<_> = datastore
.into_iter()
.par_bridge()
.map(|cached_sorter| {
let cached_sorter = cached_sorter.0.into_inner();
let sorter = cached_sorter.into_sorter()?;
sorter.into_reader_cursors()
})
.collect();
for reader in readers {
builder.extend(reader?);
}
Ok(builder.build())
}
Ok(datastore.into_iter().map(RefCell::into_inner).collect())
}
}

View File

@ -1,234 +0,0 @@
use std::borrow::Borrow;
use std::hash::{BuildHasher, Hash};
use std::iter::repeat_with;
use std::mem;
use std::num::NonZeroUsize;
use hashbrown::hash_map::{DefaultHashBuilder, Entry};
use hashbrown::HashMap;
#[derive(Debug)]
pub struct Lru<K, V, S = DefaultHashBuilder> {
lookup: HashMap<K, usize, S>,
storage: FixedSizeList<LruNode<K, V>>,
}
impl<K: Eq + Hash, V> Lru<K, V> {
/// Creates a new LRU cache that holds at most `capacity` elements.
pub fn new(capacity: NonZeroUsize) -> Self {
Self { lookup: HashMap::new(), storage: FixedSizeList::new(capacity.get()) }
}
}
impl<K: Eq + Hash, V, S: BuildHasher> Lru<K, V, S> {
/// Creates a new LRU cache that holds at most `capacity` elements
/// and uses the provided hash builder to hash keys.
pub fn with_hasher(capacity: NonZeroUsize, hash_builder: S) -> Lru<K, V, S> {
Self {
lookup: HashMap::with_hasher(hash_builder),
storage: FixedSizeList::new(capacity.get()),
}
}
}
impl<K: Eq + Hash, V, S: BuildHasher> Lru<K, V, S> {
/// Returns a mutable reference to the value of the key in the cache or `None` if it is not present in the cache.
///
/// Moves the key to the head of the LRU list if it exists.
pub fn get_mut<Q>(&mut self, key: &Q) -> Option<&mut V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let idx = *self.lookup.get(key)?;
self.storage.move_front(idx).map(|node| &mut node.value)
}
}
impl<K: Clone + Eq + Hash, V, S: BuildHasher> Lru<K, V, S> {
pub fn push(&mut self, key: K, value: V) -> Option<(K, V)> {
match self.lookup.entry(key) {
Entry::Occupied(occ) => {
// It's fine to unwrap here because:
// * the entry already exists
let node = self.storage.move_front(*occ.get()).unwrap();
let old_value = mem::replace(&mut node.value, value);
let old_key = occ.replace_key();
Some((old_key, old_value))
}
Entry::Vacant(vac) => {
let key = vac.key().clone();
if self.storage.is_full() {
// It's fine to unwrap here because:
// * the cache capacity is non zero
// * the cache is full
let idx = self.storage.back_idx();
let node = self.storage.move_front(idx).unwrap();
let LruNode { key, value } = mem::replace(node, LruNode { key, value });
vac.insert(idx);
self.lookup.remove(&key);
Some((key, value))
} else {
// It's fine to unwrap here because:
// * the cache capacity is non zero
// * the cache is not full
let (idx, _) = self.storage.push_front(LruNode { key, value }).unwrap();
vac.insert(idx);
None
}
}
}
}
}
impl<K, V, S> IntoIterator for Lru<K, V, S> {
type Item = (K, V);
type IntoIter = IntoIter<K, V>;
fn into_iter(self) -> Self::IntoIter {
IntoIter { lookup_iter: self.lookup.into_iter(), nodes: self.storage.nodes }
}
}
pub struct IntoIter<K, V> {
lookup_iter: hashbrown::hash_map::IntoIter<K, usize>,
nodes: Box<[Option<FixedSizeListNode<LruNode<K, V>>>]>,
}
impl<K, V> Iterator for IntoIter<K, V> {
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
let (_key, idx) = self.lookup_iter.next()?;
let LruNode { key, value } = self.nodes.get_mut(idx)?.take()?.data;
Some((key, value))
}
}
#[derive(Debug)]
struct LruNode<K, V> {
key: K,
value: V,
}
#[derive(Debug)]
struct FixedSizeListNode<T> {
prev: usize,
next: usize,
data: T,
}
#[derive(Debug)]
struct FixedSizeList<T> {
nodes: Box<[Option<FixedSizeListNode<T>>]>,
/// Also corresponds to the first `None` in the nodes.
length: usize,
// TODO Also, we probably do not need one of the front and back cursors.
front: usize,
back: usize,
}
impl<T> FixedSizeList<T> {
fn new(capacity: usize) -> Self {
Self {
nodes: repeat_with(|| None).take(capacity).collect::<Vec<_>>().into_boxed_slice(),
length: 0,
front: usize::MAX,
back: usize::MAX,
}
}
#[inline]
fn capacity(&self) -> usize {
self.nodes.len()
}
#[inline]
fn len(&self) -> usize {
self.length
}
#[inline]
fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
fn is_full(&self) -> bool {
self.len() == self.capacity()
}
#[inline]
fn back_idx(&self) -> usize {
self.back
}
#[inline]
fn next(&mut self) -> Option<usize> {
if self.is_full() {
None
} else {
let current_free = self.length;
self.length += 1;
Some(current_free)
}
}
#[inline]
fn node_mut(&mut self, idx: usize) -> Option<&mut FixedSizeListNode<T>> {
self.nodes.get_mut(idx).and_then(|node| node.as_mut())
}
#[inline]
fn node_ref(&self, idx: usize) -> Option<&FixedSizeListNode<T>> {
self.nodes.get(idx).and_then(|node| node.as_ref())
}
#[inline]
fn move_front(&mut self, idx: usize) -> Option<&mut T> {
let node = self.nodes.get_mut(idx)?.take()?;
if let Some(prev) = self.node_mut(node.prev) {
prev.next = node.next;
} else {
self.front = node.next;
}
if let Some(next) = self.node_mut(node.next) {
next.prev = node.prev;
} else {
self.back = node.prev;
}
if let Some(front) = self.node_mut(self.front) {
front.prev = idx;
}
if self.node_ref(self.back).is_none() {
self.back = idx;
}
let node = self.nodes.get_mut(idx).unwrap().insert(FixedSizeListNode {
prev: usize::MAX,
next: self.front,
data: node.data,
});
self.front = idx;
Some(&mut node.data)
}
#[inline]
fn push_front(&mut self, data: T) -> Option<(usize, &mut T)> {
let idx = self.next()?;
if let Some(front) = self.node_mut(self.front) {
front.prev = idx;
}
if self.node_ref(self.back).is_none() {
self.back = idx;
}
let node = self.nodes.get_mut(idx).unwrap().insert(FixedSizeListNode {
prev: usize::MAX,
next: self.front,
data,
});
self.front = idx;
Some((idx, &mut node.data))
}
}

View File

@ -1,27 +1,25 @@
mod cache;
mod documents;
mod faceted;
mod lru;
mod searchable;
use std::cell::RefCell;
use std::fs::File;
use bumpalo::Bump;
pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap};
pub use documents::*;
pub use faceted::*;
use grenad::Merger;
pub use searchable::*;
use super::indexer::document_changes::{DocumentChanges, FullySend, IndexingContext, ThreadLocal};
use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::update::GrenadParameters;
use crate::Result;
pub trait DocidsExtractor {
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>;
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<BalancedCaches<'extractor>>>;
}
/// TODO move in permissive json pointer

View File

@ -1,113 +1,46 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::fs::File;
use std::mem::size_of;
use std::num::NonZero;
use std::ops::DerefMut as _;
use bumpalo::collections::vec::Vec as BumpVec;
use bumpalo::Bump;
use grenad::{Merger, MergerBuilder};
use heed::RoTxn;
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::extract::cache::BalancedCaches;
use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, RefCellExt, ThreadLocal,
IndexingContext, MostlySend, RefCellExt, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::update::GrenadParameters;
use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
const MAX_COUNTED_WORDS: usize = 30;
pub struct WordDocidsCachedSorters {
word_fid_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
word_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
exact_word_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
word_position_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
fid_word_count_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
pub struct WordDocidsBalancedCaches<'extractor> {
word_fid_docids: BalancedCaches<'extractor>,
word_docids: BalancedCaches<'extractor>,
exact_word_docids: BalancedCaches<'extractor>,
word_position_docids: BalancedCaches<'extractor>,
fid_word_count_docids: BalancedCaches<'extractor>,
fid_word_count: HashMap<FieldId, (usize, usize)>,
current_docid: Option<DocumentId>,
}
impl WordDocidsCachedSorters {
pub fn new(
indexer: GrenadParameters,
max_memory: Option<usize>,
capacity: NonZero<usize>,
) -> Self {
let max_memory = max_memory.map(|max_memory| max_memory / 4);
let word_fid_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
);
let word_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
);
let exact_word_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
);
let word_position_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
);
let fid_word_count_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
false,
),
);
unsafe impl<'extractor> MostlySend for WordDocidsBalancedCaches<'extractor> {}
impl<'extractor> WordDocidsBalancedCaches<'extractor> {
/// TODO Make sure to give the same max_memory to all of them, without splitting it
pub fn new_in(buckets: usize, max_memory: Option<usize>, alloc: &'extractor Bump) -> Self {
Self {
word_fid_docids,
word_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
word_fid_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
word_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
exact_word_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
word_position_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
fid_word_count_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
fid_word_count: HashMap::new(),
current_docid: None,
}
@ -198,6 +131,7 @@ impl WordDocidsCachedSorters {
.entry(field_id)
.and_modify(|(current_count, _new_count)| *current_count += 1)
.or_insert((1, 0));
self.current_docid = Some(docid);
Ok(())
@ -227,37 +161,29 @@ impl WordDocidsCachedSorters {
}
}
struct WordDocidsMergerBuilders {
word_fid_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
word_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
exact_word_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
word_position_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
fid_word_count_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
pub struct WordDocidsCaches<'extractor> {
pub word_docids: Vec<BalancedCaches<'extractor>>,
pub word_fid_docids: Vec<BalancedCaches<'extractor>>,
pub exact_word_docids: Vec<BalancedCaches<'extractor>>,
pub word_position_docids: Vec<BalancedCaches<'extractor>>,
pub fid_word_count_docids: Vec<BalancedCaches<'extractor>>,
}
pub struct WordDocidsMergers {
pub word_fid_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub word_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub exact_word_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub word_position_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub fid_word_count_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
}
impl WordDocidsMergerBuilders {
impl<'extractor> WordDocidsCaches<'extractor> {
fn new() -> Self {
Self {
word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
exact_word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
word_position_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
fid_word_count_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
word_docids: Vec::new(),
word_fid_docids: Vec::new(),
exact_word_docids: Vec::new(),
word_position_docids: Vec::new(),
fid_word_count_docids: Vec::new(),
}
}
fn add_sorters(&mut self, other: WordDocidsCachedSorters) -> Result<()> {
let WordDocidsCachedSorters {
word_fid_docids,
fn push(&mut self, other: WordDocidsBalancedCaches<'extractor>) -> Result<()> {
let WordDocidsBalancedCaches {
word_docids,
word_fid_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
@ -265,78 +191,37 @@ impl WordDocidsMergerBuilders {
current_docid: _,
} = other;
let mut word_fid_docids_readers = Ok(vec![]);
let mut word_docids_readers = Ok(vec![]);
let mut exact_word_docids_readers = Ok(vec![]);
let mut word_position_docids_readers = Ok(vec![]);
let mut fid_word_count_docids_readers = Ok(vec![]);
rayon::scope(|s| {
s.spawn(|_| {
word_fid_docids_readers =
word_fid_docids.into_sorter().and_then(|s| s.into_reader_cursors());
});
s.spawn(|_| {
word_docids_readers =
word_docids.into_sorter().and_then(|s| s.into_reader_cursors());
});
s.spawn(|_| {
exact_word_docids_readers =
exact_word_docids.into_sorter().and_then(|s| s.into_reader_cursors());
});
s.spawn(|_| {
word_position_docids_readers =
word_position_docids.into_sorter().and_then(|s| s.into_reader_cursors());
});
s.spawn(|_| {
fid_word_count_docids_readers =
fid_word_count_docids.into_sorter().and_then(|s| s.into_reader_cursors());
});
});
self.word_fid_docids.extend(word_fid_docids_readers?);
self.word_docids.extend(word_docids_readers?);
self.exact_word_docids.extend(exact_word_docids_readers?);
self.word_position_docids.extend(word_position_docids_readers?);
self.fid_word_count_docids.extend(fid_word_count_docids_readers?);
self.word_docids.push(word_docids);
self.word_fid_docids.push(word_fid_docids);
self.exact_word_docids.push(exact_word_docids);
self.word_position_docids.push(word_position_docids);
self.fid_word_count_docids.push(fid_word_count_docids);
Ok(())
}
fn build(self) -> WordDocidsMergers {
WordDocidsMergers {
word_fid_docids: self.word_fid_docids.build(),
word_docids: self.word_docids.build(),
exact_word_docids: self.exact_word_docids.build(),
word_position_docids: self.word_position_docids.build(),
fid_word_count_docids: self.fid_word_count_docids.build(),
}
}
}
pub struct WordDocidsExtractorData<'extractor> {
tokenizer: &'extractor DocumentTokenizer<'extractor>,
pub struct WordDocidsExtractorData<'a> {
tokenizer: &'a DocumentTokenizer<'a>,
grenad_parameters: GrenadParameters,
max_memory: Option<usize>,
buckets: usize,
}
impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
type Data = FullySend<RefCell<WordDocidsCachedSorters>>;
impl<'a, 'extractor> Extractor<'extractor> for WordDocidsExtractorData<'a> {
type Data = RefCell<Option<WordDocidsBalancedCaches<'extractor>>>;
fn init_data(
&self,
_extractor_alloc: raw_collections::alloc::RefBump<'extractor>,
) -> Result<Self::Data> {
Ok(FullySend(RefCell::new(WordDocidsCachedSorters::new(
self.grenad_parameters,
self.max_memory,
// TODO use a better value
200_000.try_into().unwrap(),
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(Some(WordDocidsBalancedCaches::new_in(
self.buckets,
self.grenad_parameters.max_memory,
extractor_alloc,
))))
}
fn process(
&self,
change: DocumentChange,
context: &crate::update::new::indexer::document_changes::DocumentChangeContext<Self::Data>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
WordDocidsExtractors::extract_document_change(context, self.tokenizer, change)
}
@ -345,16 +230,15 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
pub struct WordDocidsExtractors;
impl WordDocidsExtractors {
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<WordDocidsMergers> {
let max_memory = grenad_parameters.max_memory_by_thread();
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<WordDocidsCaches<'extractor>> {
let index = indexing_context.index;
let rtxn = index.read_txn()?;
let stop_words = index.stop_words(&rtxn)?;
let allowed_separators = index.allowed_separators(&rtxn)?;
let allowed_separators: Option<Vec<_>> =
@ -392,7 +276,7 @@ impl WordDocidsExtractors {
let extractor = WordDocidsExtractorData {
tokenizer: &document_tokenizer,
grenad_parameters,
max_memory,
buckets: rayon::current_num_threads(),
};
for_each_document_change(
@ -404,28 +288,23 @@ impl WordDocidsExtractors {
)?;
}
{
let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
let mut builder = WordDocidsMergerBuilders::new();
for cache in datastore.into_iter().map(|cache| cache.0.into_inner()) {
builder.add_sorters(cache)?;
let mut merger = WordDocidsCaches::new();
for cache in datastore.into_iter().flat_map(RefCell::into_inner) {
merger.push(cache)?;
}
Ok(builder.build())
}
Ok(merger)
}
fn extract_document_change(
context: &DocumentChangeContext<FullySend<RefCell<WordDocidsCachedSorters>>>,
context: &DocumentChangeContext<RefCell<Option<WordDocidsBalancedCaches>>>,
document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange,
) -> Result<()> {
let index = &context.index;
let rtxn = &context.txn;
let mut cached_sorter = context.data.0.borrow_mut_or_yield();
let cached_sorter = cached_sorter.deref_mut();
let mut cached_sorter_ref = context.data.borrow_mut_or_yield();
let cached_sorter = cached_sorter_ref.as_mut().unwrap();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let new_fields_ids_map = new_fields_ids_map.deref_mut();
let doc_alloc = &context.doc_alloc;
@ -436,8 +315,7 @@ impl WordDocidsExtractors {
match document_change {
DocumentChange::Deletion(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_del_u32(
cached_sorter.insert_del_u32(
fid,
pos,
word,
@ -445,7 +323,6 @@ impl WordDocidsExtractors {
inner.docid(),
doc_alloc,
)
.map_err(crate::Error::from)
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index, context.db_fields_ids_map)?,
@ -455,8 +332,7 @@ impl WordDocidsExtractors {
}
DocumentChange::Update(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_del_u32(
cached_sorter.insert_del_u32(
fid,
pos,
word,
@ -464,7 +340,6 @@ impl WordDocidsExtractors {
inner.docid(),
doc_alloc,
)
.map_err(crate::Error::from)
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index, context.db_fields_ids_map)?,
@ -473,8 +348,7 @@ impl WordDocidsExtractors {
)?;
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_add_u32(
cached_sorter.insert_add_u32(
fid,
pos,
word,
@ -482,7 +356,6 @@ impl WordDocidsExtractors {
inner.docid(),
doc_alloc,
)
.map_err(crate::Error::from)
};
document_tokenizer.tokenize_document(
inner.new(rtxn, index, context.db_fields_ids_map)?,
@ -492,8 +365,7 @@ impl WordDocidsExtractors {
}
DocumentChange::Insertion(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_add_u32(
cached_sorter.insert_add_u32(
fid,
pos,
word,
@ -501,7 +373,6 @@ impl WordDocidsExtractors {
inner.docid(),
doc_alloc,
)
.map_err(crate::Error::from)
};
document_tokenizer.tokenize_document(
inner.new(),

View File

@ -8,13 +8,13 @@ use super::tokenize_document::DocumentTokenizer;
use super::SearchableExtractor;
use crate::proximity::{index_proximity, MAX_DISTANCE};
use crate::update::new::document::Document;
use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, FullySend, RefCellExt};
use crate::update::new::extract::cache::BalancedCaches;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, RefCellExt};
use crate::update::new::DocumentChange;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{FieldId, GlobalFieldsIdsMap, Index, Result};
pub struct WordPairProximityDocidsExtractor;
impl SearchableExtractor for WordPairProximityDocidsExtractor {
fn attributes_to_extract<'a>(
rtxn: &'a RoTxn,
@ -28,11 +28,10 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
}
// This method is reimplemented to count the number of words in the document in each field
// and to store the docids of the documents that have a number of words in a given field equal to or under than MAX_COUNTED_WORDS.
// and to store the docids of the documents that have a number of words in a given field
// equal to or under than MAX_COUNTED_WORDS.
fn extract_document_change(
context: &DocumentChangeContext<
FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>,
>,
context: &DocumentChangeContext<RefCell<BalancedCaches>>,
document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange,
) -> Result<()> {
@ -48,7 +47,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let new_fields_ids_map = &mut *new_fields_ids_map;
let mut cached_sorter = context.data.0.borrow_mut_or_yield();
let mut cached_sorter = context.data.borrow_mut_or_yield();
let cached_sorter = &mut *cached_sorter;
// is a vecdequeue, and will be smol, so can stay on the heap for now
@ -139,7 +138,7 @@ fn build_key<'a>(
fn word_positions_into_word_pair_proximity(
word_positions: &mut VecDeque<(Rc<str>, u16)>,
word_pair_proximity: &mut impl FnMut((Rc<str>, Rc<str>), u8),
) -> Result<()> {
) {
let (head_word, head_position) = word_positions.pop_front().unwrap();
for (word, position) in word_positions.iter() {
let prox = index_proximity(head_position as u32, *position as u32) as u8;
@ -147,7 +146,6 @@ fn word_positions_into_word_pair_proximity(
word_pair_proximity((head_word.clone(), word.clone()), prox);
}
}
Ok(())
}
fn process_document_tokens<'doc>(
@ -163,7 +161,7 @@ fn process_document_tokens<'doc>(
.front()
.map_or(false, |(_w, p)| index_proximity(*p as u32, pos as u32) >= MAX_DISTANCE)
{
word_positions_into_word_pair_proximity(word_positions, word_pair_proximity)?;
word_positions_into_word_pair_proximity(word_positions, word_pair_proximity);
}
// insert the new word.
@ -173,7 +171,7 @@ fn process_document_tokens<'doc>(
document_tokenizer.tokenize_document(document, fields_ids_map, &mut token_fn)?;
while !word_positions.is_empty() {
word_positions_into_word_pair_proximity(word_positions, word_pair_proximity)?;
word_positions_into_word_pair_proximity(word_positions, word_pair_proximity);
}
Ok(())

View File

@ -3,76 +3,60 @@ mod extract_word_pair_proximity_docids;
mod tokenize_document;
use std::cell::RefCell;
use std::fs::File;
use std::marker::PhantomData;
use bumpalo::Bump;
pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers};
pub use extract_word_docids::{WordDocidsCaches, WordDocidsExtractors};
pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
use grenad::Merger;
use heed::RoTxn;
use rayon::iter::{ParallelBridge, ParallelIterator};
use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter;
use super::cache::BalancedCaches;
use super::DocidsExtractor;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::update::GrenadParameters;
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> {
tokenizer: &'extractor DocumentTokenizer<'extractor>,
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
tokenizer: &'a DocumentTokenizer<'a>,
grenad_parameters: GrenadParameters,
max_memory: Option<usize>,
buckets: usize,
_ex: PhantomData<EX>,
}
impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
for SearchableExtractorData<'extractor, EX>
impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
for SearchableExtractorData<'a, EX>
{
type Data = FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>;
type Data = RefCell<BalancedCaches<'extractor>>;
fn init_data(
&self,
_extractor_alloc: raw_collections::alloc::RefBump<'extractor>,
) -> Result<Self::Data> {
Ok(FullySend(RefCell::new(CboCachedSorter::new(
// TODO use a better value
1_000_000.try_into().unwrap(),
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
self.grenad_parameters.chunk_compression_type,
self.grenad_parameters.chunk_compression_level,
self.grenad_parameters.max_nb_chunks,
self.max_memory,
false,
),
))))
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(BalancedCaches::new_in(
self.buckets,
self.grenad_parameters.max_memory,
extractor_alloc,
)))
}
fn process(
&self,
change: DocumentChange,
context: &crate::update::new::indexer::document_changes::DocumentChangeContext<Self::Data>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
EX::extract_document_change(context, self.tokenizer, change)
}
}
pub trait SearchableExtractor: Sized + Sync {
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
let max_memory = grenad_parameters.max_memory_by_thread();
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<BalancedCaches<'extractor>>> {
let rtxn = indexing_context.index.read_txn()?;
let stop_words = indexing_context.index.stop_words(&rtxn)?;
let allowed_separators = indexing_context.index.allowed_separators(&rtxn)?;
@ -104,7 +88,7 @@ pub trait SearchableExtractor: Sized + Sync {
let extractor_data: SearchableExtractorData<Self> = SearchableExtractorData {
tokenizer: &document_tokenizer,
grenad_parameters,
max_memory,
buckets: rayon::current_num_threads(),
_ex: PhantomData,
};
@ -122,37 +106,12 @@ pub trait SearchableExtractor: Sized + Sync {
&datastore,
)?;
}
{
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
let readers: Vec<_> = datastore
.into_iter()
.par_bridge()
.map(|cache_entry| {
let cached_sorter: FullySend<
RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>,
> = cache_entry;
let cached_sorter = cached_sorter.0.into_inner();
let sorter = cached_sorter.into_sorter()?;
sorter.into_reader_cursors()
})
.collect();
for reader in readers {
builder.extend(reader?);
}
Ok(builder.build())
}
Ok(datastore.into_iter().map(RefCell::into_inner).collect())
}
fn extract_document_change(
context: &DocumentChangeContext<
FullySend<RefCell<CboCachedSorter<MergeDeladdCboRoaringBitmaps>>>,
>,
context: &DocumentChangeContext<RefCell<BalancedCaches>>,
document_tokenizer: &DocumentTokenizer,
document_change: DocumentChange,
) -> Result<()>;
@ -164,12 +123,12 @@ pub trait SearchableExtractor: Sized + Sync {
}
impl<T: SearchableExtractor> DocidsExtractor for T {
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<Vec<BalancedCaches<'extractor>>> {
Self::run_extraction(
grenad_parameters,
document_changes,

View File

@ -171,7 +171,6 @@ mod test {
use bumpalo::Bump;
use charabia::TokenizerBuilder;
use meili_snap::snapshot;
use raw_collections::RawMap;
use serde_json::json;
use serde_json::value::RawValue;

View File

@ -1,31 +1,24 @@
use std::collections::{BTreeSet, HashMap};
use charabia::{normalizer::NormalizerOption, Language, Normalize, StrDetection, Token};
use charabia::normalizer::NormalizerOption;
use charabia::{Language, Normalize, StrDetection, Token};
use grenad::Sorter;
use heed::{
types::{Bytes, SerdeJson},
BytesDecode, BytesEncode, RoTxn,
};
use heed::types::{Bytes, SerdeJson};
use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn};
use super::channel::FacetSearchableSender;
use super::extract::FacetKind;
use super::fst_merger_builder::FstMergerBuilder;
use super::KvReaderDelAdd;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
use crate::heed_codec::StrRefCodec;
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::{create_sorter, MergeDeladdBtreesetString};
use crate::{
heed_codec::{
facet::{FacetGroupKey, FacetGroupKeyCodec},
StrRefCodec,
},
update::{
create_sorter,
del_add::{DelAdd, KvWriterDelAdd},
MergeDeladdBtreesetString,
},
BEU16StrCodec, FieldId, GlobalFieldsIdsMap, Index, LocalizedAttributesRule, Result,
MAX_FACET_VALUE_LENGTH,
};
use super::{
channel::FacetSearchableSender, extract::FacetKind, fst_merger_builder::FstMergerBuilder,
KvReaderDelAdd,
};
pub struct FacetSearchBuilder<'indexer> {
registered_facets: HashMap<FieldId, usize>,
normalized_facet_string_docids_sorter: Sorter<MergeDeladdBtreesetString>,
@ -49,6 +42,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
None,
None,
Some(0),
false,
);
Self {
@ -84,7 +78,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
}
let locales = self.locales(field_id);
let hyper_normalized_value = normalize_facet_string(left_bound, locales.as_deref());
let hyper_normalized_value = normalize_facet_string(left_bound, locales);
let set = BTreeSet::from_iter(std::iter::once(left_bound));
@ -103,7 +97,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
}
fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> {
if self.localized_field_ids.get(&field_id).is_none() {
if !self.localized_field_ids.contains_key(&field_id) {
let Some(field_name) = self.global_fields_ids_map.name(field_id) else {
unreachable!("Field id {} not found in the global fields ids map", field_id);
};
@ -124,7 +118,8 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
pub fn merge_and_send(
self,
index: &Index,
rtxn: &RoTxn<'_>,
wtxn: &mut RwTxn,
rtxn: &RoTxn,
sender: FacetSearchableSender,
) -> Result<()> {
let reader = self.normalized_facet_string_docids_sorter.into_reader_cursors()?;
@ -139,13 +134,14 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
let mut fst_merger_builder: Option<FstMergerBuilder> = None;
while let Some((key, deladd)) = merger_iter.next()? {
let (field_id, normalized_facet_string) =
BEU16StrCodec::bytes_decode(&key).map_err(heed::Error::Encoding)?;
BEU16StrCodec::bytes_decode(key).map_err(heed::Error::Encoding)?;
if current_field_id != Some(field_id) {
if let Some(fst_merger_builder) = fst_merger_builder {
// send the previous fst to the channel
let mmap = fst_merger_builder.build(&mut callback)?;
sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
// sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
todo!("What to do");
}
println!("getting fst for field_id: {}", field_id);
@ -198,7 +194,8 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) {
let mmap = fst_merger_builder.build(&mut callback)?;
sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
// sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
todo!("What to do");
}
Ok(())
@ -209,7 +206,7 @@ fn callback(_bytes: &[u8], _deladd: DelAdd, _is_modified: bool) -> Result<()> {
Ok(())
}
fn merge_btreesets<'a>(
fn merge_btreesets(
current: Option<&[u8]>,
del: Option<&[u8]>,
add: Option<&[u8]>,

View File

@ -49,7 +49,7 @@ impl<'de, 'p, 'indexer: 'de, Mapper: MutFieldIdMapper> Visitor<'de>
visitor: MutFieldIdMapVisitor(self.fields_ids_map),
})?
{
let Some(fid) = fid else {
let Some(_fid) = fid else {
return Ok(Err(crate::UserError::AttributeLimitReached));
};
self.fields_ids_map = fields_ids_map;

View File

@ -3,7 +3,6 @@ use std::sync::{Arc, RwLock};
use bumpalo::Bump;
use heed::RoTxn;
use raw_collections::alloc::RefBump;
use rayon::iter::IndexedParallelIterator;
use super::super::document_change::DocumentChange;
@ -104,6 +103,10 @@ pub struct FullySend<T>(pub T);
// SAFETY: a type **fully** send is always mostly send as well.
unsafe impl<T> MostlySend for FullySend<T> where T: Send {}
unsafe impl<T> MostlySend for RefCell<T> where T: MostlySend {}
unsafe impl<T> MostlySend for Option<T> where T: MostlySend {}
impl<T> FullySend<T> {
pub fn into(self) -> T {
self.0
@ -256,7 +259,7 @@ pub struct DocumentChangeContext<
pub doc_alloc: Bump,
/// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills.
pub extractor_alloc: RefBump<'extractor>,
pub extractor_alloc: &'extractor Bump,
/// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
@ -279,14 +282,14 @@ impl<
index: &'indexer Index,
db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: &'fid RwLock<FieldsIdsMap>,
extractor_allocs: &'extractor ThreadLocal<FullySend<RefCell<Bump>>>,
extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
datastore: &'data ThreadLocal<T>,
fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
init_data: F,
) -> Result<Self>
where
F: FnOnce(RefBump<'extractor>) -> Result<T>,
F: FnOnce(&'extractor Bump) -> Result<T>,
{
let doc_alloc =
doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024 * 1024))));
@ -297,9 +300,7 @@ impl<
let fields_ids_map = &fields_ids_map.0;
let extractor_alloc = extractor_allocs.get_or_default();
let extractor_alloc = RefBump::new(extractor_alloc.0.borrow_or_yield());
let data = datastore.get_or_try(|| init_data(RefBump::clone(&extractor_alloc)))?;
let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
let txn = index.read_txn()?;
Ok(DocumentChangeContext {
@ -308,7 +309,7 @@ impl<
db_fields_ids_map,
new_fields_ids_map: fields_ids_map,
doc_alloc,
extractor_alloc,
extractor_alloc: &extractor_alloc.0,
data,
doc_allocs,
})
@ -319,7 +320,7 @@ impl<
pub trait Extractor<'extractor>: Sync {
type Data: MostlySend;
fn init_data<'doc>(&'doc self, extractor_alloc: RefBump<'extractor>) -> Result<Self::Data>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data>;
fn process<'doc>(
&'doc self,
@ -375,15 +376,17 @@ pub fn for_each_document_change<
doc_allocs,
fields_ids_map_store,
}: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<RefCell<Bump>>>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
datastore: &'data ThreadLocal<EX::Data>,
) -> Result<()>
where
EX: Extractor<'extractor>,
{
eprintln!("We are resetting the extractor allocators");
// Clean up and reuse the extractor allocs
for extractor_alloc in extractor_allocs.iter_mut() {
extractor_alloc.0.get_mut().reset();
eprintln!("\tWith {} bytes resetted", extractor_alloc.0.allocated_bytes());
extractor_alloc.0.reset();
}
let pi = document_changes.iter();

View File

@ -80,7 +80,6 @@ mod test {
use std::sync::RwLock;
use bumpalo::Bump;
use raw_collections::alloc::RefBump;
use crate::index::tests::TempIndex;
use crate::update::new::indexer::document_changes::{
@ -95,11 +94,7 @@ mod test {
fn test_deletions() {
struct DeletionWithData<'extractor> {
deleted: RefCell<
hashbrown::HashSet<
DocumentId,
hashbrown::hash_map::DefaultHashBuilder,
RefBump<'extractor>,
>,
hashbrown::HashSet<DocumentId, hashbrown::DefaultHashBuilder, &'extractor Bump>,
>,
}
@ -110,10 +105,7 @@ mod test {
impl<'extractor> Extractor<'extractor> for TrackDeletion<'extractor> {
type Data = DeletionWithData<'extractor>;
fn init_data(
&self,
extractor_alloc: raw_collections::alloc::RefBump<'extractor>,
) -> crate::Result<Self::Data> {
fn init_data(&self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> {
let deleted = RefCell::new(hashbrown::HashSet::new_in(extractor_alloc));
Ok(DeletionWithData { deleted })
}
@ -173,8 +165,7 @@ mod test {
println!("deleted by {index}: {:?}", data.deleted.borrow());
}
for alloc in extractor_allocs.iter_mut() {
let alloc = &mut alloc.0;
alloc.get_mut().reset();
alloc.0.reset();
}
}
}

View File

@ -1,26 +1,24 @@
use std::cell::RefCell;
use std::cmp::Ordering;
use std::sync::RwLock;
use std::thread::{self, Builder};
use big_s::S;
use bumpalo::Bump;
use document_changes::{
for_each_document_change, DocumentChanges, Extractor, FullySend, IndexingContext, RefCellExt,
ThreadLocal,
for_each_document_change, DocumentChanges, FullySend, IndexingContext, ThreadLocal,
};
pub use document_deletion::DocumentDeletion;
pub use document_operation::DocumentOperation;
use heed::types::{Bytes, DecodeIgnore, Str};
use heed::{RoTxn, RwTxn};
use itertools::{merge_join_by, EitherOrBoth};
pub use partial_dump::PartialDump;
use rayon::ThreadPool;
use time::OffsetDateTime;
pub use update_by_function::UpdateByFunction;
use super::channel::*;
use super::document::write_to_obkv;
use super::document_change::DocumentChange;
use super::extract::*;
use super::merger::{merge_grenad_entries, FacetFieldIdsDelta};
use super::merger::{FacetDatabases, FacetFieldIdsDelta};
use super::word_fst_builder::PrefixDelta;
use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
@ -28,75 +26,23 @@ use super::words_prefix_docids::{
use super::{StdResult, TopLevelMap};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::facet::FacetType;
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::proximity::ProximityPrecision;
use crate::update::new::channel::ExtractorSender;
use crate::update::del_add::DelAdd;
use crate::update::new::word_fst_builder::{PrefixData, WordFstBuilder};
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids};
use crate::update::settings::InnerIndexSettings;
use crate::update::{FacetsUpdateBulk, GrenadParameters};
use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
pub(crate) mod de;
pub mod de;
pub mod document_changes;
mod document_deletion;
mod document_operation;
mod partial_dump;
mod update_by_function;
struct DocumentExtractor<'a> {
document_sender: &'a DocumentSender<'a>,
}
impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
type Data = FullySend<()>;
fn init_data(
&self,
_extractor_alloc: raw_collections::alloc::RefBump<'extractor>,
) -> Result<Self::Data> {
Ok(FullySend(()))
}
fn process(
&self,
change: DocumentChange,
context: &document_changes::DocumentChangeContext<Self::Data>,
) -> Result<()> {
let mut document_buffer = Vec::new();
let new_fields_ids_map = context.new_fields_ids_map.borrow_or_yield();
let new_fields_ids_map = &*new_fields_ids_map;
let new_fields_ids_map = new_fields_ids_map.local_map();
let external_docid = change.external_docid().to_owned();
// document but we need to create a function that collects and compresses documents.
match change {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
self.document_sender.delete(docid, external_docid).unwrap();
}
/// TODO: change NONE by SOME(vector) when implemented
DocumentChange::Update(update) => {
let docid = update.docid();
let content =
update.new(&context.txn, context.index, &context.db_fields_ids_map)?;
let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.document_sender.insert(docid, external_docid, content.boxed()).unwrap();
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
let content = insertion.new();
let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.document_sender.insert(docid, external_docid, content.boxed()).unwrap();
// extracted_dictionary_sender.send(self, dictionary: &[u8]);
}
}
Ok(())
}
}
/// This is the main function of this crate.
///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
@ -114,12 +60,11 @@ pub fn index<'pl, 'indexer, 'index, DC>(
where
DC: DocumentChanges<'pl>,
{
let (merger_sender, writer_receiver) = merger_writer_channel(10_000);
// This channel acts as a rendezvous point to ensure that we are one task ahead
let (extractor_sender, merger_receiver) = extractors_merger_channels(4);
// TODO find a better channel limit
let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000);
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
@ -132,42 +77,48 @@ where
fields_ids_map_store: &fields_ids_map_store,
};
thread::scope(|s| {
thread::scope(|s| -> crate::Result<_> {
let indexer_span = tracing::Span::current();
// TODO manage the errors correctly
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.in_place_scope(|_s| {
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let _entered = span.enter();
// document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.document_sender();
let document_extractor = DocumentExtractor { document_sender: &document_sender};
let rtxn = index.read_txn().unwrap();
let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender);
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?;
document_sender.finish().unwrap();
let mut documents_ids = index.documents_ids(&rtxn)?;
let delta_documents_ids = datastore.into_iter().map(|FullySend(d)| d.into_inner()).reduce(DelAddRoaringBitmap::merge).unwrap_or_default();
delta_documents_ids.apply_to(&mut documents_ids);
extractor_sender.send_documents_ids(documents_ids).unwrap();
// document_sender.finish().unwrap();
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let max_memory = TEN_GIB / dbg!(rayon::current_num_threads());
let current_num_threads = rayon::current_num_threads();
let max_memory = TEN_GIB / current_num_threads;
eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads");
let grenad_parameters = GrenadParameters {
max_memory: Some(max_memory),
..GrenadParameters::default()
};
let facet_field_ids_delta;
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter();
extract_and_send_docids::<
_,
FacetedDocidsExtractor,
FacetDocids,
>(
grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
&extractor_sender,
facet_field_ids_delta = merge_and_send_facet_docids(
global_fields_ids_map,
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?,
FacetDatabases::new(index),
index,
extractor_sender.facet_docids(),
)?;
}
@ -175,37 +126,92 @@ where
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
let WordDocidsMergers {
word_fid_docids,
let WordDocidsCaches {
word_docids,
word_fid_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap();
extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap();
extractor_sender.send_searchable::<WordPositionDocids>(word_position_docids).unwrap();
extractor_sender.send_searchable::<FidWordCountDocids>(fid_word_count_docids).unwrap();
// TODO Word Docids Merger
// extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
merge_and_send_docids(
word_docids,
index.word_docids.remap_types(),
index,
extractor_sender.docids::<WordDocids>(),
)?;
}
// Word Fid Docids Merging
// extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter();
merge_and_send_docids(
word_fid_docids,
index.word_fid_docids.remap_types(),
index,
extractor_sender.docids::<WordFidDocids>()
)?;
}
// Exact Word Docids Merging
// extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter();
merge_and_send_docids(
exact_word_docids,
index.exact_word_docids.remap_types(),
index,
extractor_sender.docids::<ExactWordDocids>(),
)?;
}
// Word Position Docids Merging
// extractor_sender.send_searchable::<WordPositionDocids>(word_position_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter();
merge_and_send_docids(
word_position_docids,
index.word_position_docids.remap_types(),
index,
extractor_sender.docids::<WordPositionDocids>(),
)?;
}
// Fid Word Count Docids Merging
// extractor_sender.send_searchable::<FidWordCountDocids>(fid_word_count_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter();
merge_and_send_docids(
fid_word_count_docids,
index.field_id_word_count_docids.remap_types(),
index,
extractor_sender.docids::<FidWordCountDocids>(),
)?;
}
}
// run the proximity extraction only if the precision is by word
// this works only if the settings didn't change during this transaction.
let rtxn = index.read_txn().unwrap();
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default();
if proximity_precision == ProximityPrecision::ByWord {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter();
extract_and_send_docids::<
_,
WordPairProximityDocidsExtractor,
WordPairProximityDocids,
>(
grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
&extractor_sender,
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
merge_and_send_docids(
caches,
index.word_pair_proximity_docids.remap_types(),
index,
extractor_sender.docids::<WordPairProximityDocids>(),
)?;
}
@ -229,27 +235,13 @@ where
// - [x] Extract fieldid facet number docids
// - [x] Extract fieldid facet string docids
Ok(()) as Result<_>
// TODO use None when needed
Result::Ok(facet_field_ids_delta)
})
})?;
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
let indexer_span = tracing::Span::current();
// TODO manage the errors correctly
let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || {
let span =
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "merge");
let _entered = span.enter();
let rtxn = index.read_txn().unwrap();
merge_grenad_entries(
merger_receiver,
merger_sender,
&rtxn,
index,
global_fields_ids_map,
)
})?;
for operation in writer_receiver {
let database = operation.database(index);
@ -264,18 +256,66 @@ where
}
/// TODO handle the panicking threads
handle.join().unwrap()?;
let merger_result = merger_thread.join().unwrap()?;
let facet_field_ids_delta = extractor_handle.join().unwrap()?;
if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta {
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
let prefix_delta = {
let rtxn = index.read_txn()?;
let words_fst = index.words_fst(&rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
let prefix_settings = index.prefix_settings(&rtxn)?;
word_fst_builder.with_prefix_settings(prefix_settings);
let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::<DecodeIgnore>();
let current_words = index.word_docids.iter(wtxn)?.remap_data_type::<DecodeIgnore>();
for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) {
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
}) {
match eob {
EitherOrBoth::Both(lhs, rhs) => {
if let Some(e) = lhs.err().or(rhs.err()) {
return Err(e.into());
}
}
EitherOrBoth::Left(result) => {
let (word, _) = result?;
word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?;
}
EitherOrBoth::Right(result) => {
let (word, _) = result?;
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
}
}
}
if let Some(prefix_delta) = merger_result.prefix_delta {
let span = tracing::trace_span!(target: "indexing::documents::merge", "words_fst");
let _entered = span.enter();
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
// extractor_sender.main().write_words_fst(word_fst_mmap).unwrap();
index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
// extractor_sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap();
index.main.remap_types::<Str, Bytes>().put(
wtxn,
WORDS_PREFIXES_FST_KEY,
&prefixes_fst_mmap,
)?;
Some(prefix_delta)
} else {
None
}
};
// if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta {
// compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
// }
if let Some(prefix_delta) = prefix_delta {
compute_prefix_database(index, wtxn, prefix_delta)?;
}
Ok(()) as Result<_>
Result::Ok(())
})?;
// required to into_inner the new_fields_ids_map
@ -347,30 +387,6 @@ fn compute_facet_level_database(
Ok(())
}
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
/// TODO: manage the errors correctly
/// TODO: we must have a single trait that also gives the extractor type
fn extract_and_send_docids<
'pl,
'fid,
'indexer,
'index,
DC: DocumentChanges<'pl>,
E: DocidsExtractor,
D: MergerOperationType,
>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
sender: &ExtractorSender,
) -> Result<()> {
let merger =
E::run_extraction(grenad_parameters, document_changes, indexing_context, extractor_allocs)?;
sender.send_searchable::<D>(merger).unwrap();
Ok(())
}
/// Returns the primary key that has already been set for this index or the
/// one we will guess by searching for the first key that contains "id" as a substring,
/// and whether the primary key changed

View File

@ -1,12 +1,10 @@
use std::ops::DerefMut;
use rayon::iter::IndexedParallelIterator;
use serde::Deserializer;
use serde_json::value::RawValue;
use super::de::FieldAndDocidExtractor;
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend, RefCellExt};
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
use crate::documents::PrimaryKey;
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
use crate::update::new::document::DocumentFromVersions;
use crate::update::new::document_change::Versions;

View File

@ -1,5 +1,3 @@
use std::collections::BTreeMap;
use raw_collections::RawMap;
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
@ -12,8 +10,8 @@ use crate::documents::PrimaryKey;
use crate::error::{FieldIdMapMissingEntry, InternalError};
use crate::update::new::document::DocumentFromVersions;
use crate::update::new::document_change::Versions;
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, KvWriterFieldId, Update};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, GlobalFieldsIdsMap, Object, Result, UserError};
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
pub struct UpdateByFunction {
documents: RoaringBitmap,

View File

@ -1,222 +1,20 @@
use std::fs::File;
use std::io::{self};
use bincode::ErrorKind;
use grenad::Merger;
use hashbrown::HashSet;
use heed::types::Bytes;
use heed::{Database, RoTxn};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap;
use super::channel::*;
use super::extract::FacetKind;
use super::extract::{
merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
};
use super::facet_search_builder::FacetSearchBuilder;
use super::word_fst_builder::{PrefixData, PrefixDelta};
use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId};
use super::DocumentChange;
use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation;
use crate::update::new::word_fst_builder::WordFstBuilder;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{
localized_attributes_rules, CboRoaringBitmapCodec, Error, FieldId, GeoPoint,
GlobalFieldsIdsMap, Index, Result,
};
/// TODO We must return some infos/stats
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
pub fn merge_grenad_entries(
receiver: MergerReceiver,
sender: MergerSender,
rtxn: &RoTxn,
index: &Index,
global_fields_ids_map: GlobalFieldsIdsMap<'_>,
) -> Result<MergerResult> {
let mut buffer: Vec<u8> = Vec::new();
let mut documents_ids = index.documents_ids(rtxn)?;
let mut geo_extractor = GeoExtractor::new(rtxn, index)?;
let mut merger_result = MergerResult::default();
for merger_operation in receiver {
match merger_operation {
MergerOperation::ExactWordDocidsMerger(merger) => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
/// TODO do a MergerOperation::database(&Index) -> Database<Bytes, Bytes>.
index.exact_word_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<ExactWordDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::FidWordCountDocidsMerger(merger) => {
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.field_id_word_count_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<FidWordCountDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::WordDocidsMerger(merger) => {
let words_fst = index.words_fst(rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
let prefix_settings = index.prefix_settings(rtxn)?;
word_fst_builder.with_prefix_settings(prefix_settings);
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.word_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordDocids>(),
|deladd, key| word_fst_builder.register_word(deladd, key),
)?;
}
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "words_fst");
let _entered = span.enter();
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, rtxn)?;
sender.main().write_words_fst(word_fst_mmap).unwrap();
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap();
merger_result.prefix_delta = Some(prefix_delta);
}
}
}
MergerOperation::WordFidDocidsMerger(merger) => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.word_fid_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordFidDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::WordPairProximityDocidsMerger(merger) => {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.word_pair_proximity_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordPairProximityDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::WordPositionDocidsMerger(merger) => {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter();
merge_and_send_docids(
merger,
index.word_position_docids.remap_types(),
rtxn,
&mut buffer,
sender.docids::<WordPositionDocids>(),
|_, _key| Ok(()),
)?;
}
MergerOperation::InsertDocument { docid, external_id, document } => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "insert_document");
let _entered = span.enter();
documents_ids.insert(docid);
sender.documents().uncompressed(docid, external_id.clone(), &document).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
let current: Option<&KvReaderFieldId> = current.map(Into::into);
let change = match current {
Some(current) => DocumentChange::Update(todo!()),
None => DocumentChange::Insertion(todo!()),
};
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
}
}
MergerOperation::DeleteDocument { docid, external_id } => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "delete_document");
let _entered = span.enter();
if !documents_ids.remove(docid) {
unreachable!("Tried deleting a document that we do not know about");
}
sender.documents().delete(docid, external_id.clone()).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let change = DocumentChange::Deletion(Deletion::create(docid, todo!()));
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
}
}
MergerOperation::FinishedDocument => {
// send the rtree
}
MergerOperation::FacetDocidsMerger(merger) => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "facet_docids");
let _entered = span.enter();
let mut facet_field_ids_delta = FacetFieldIdsDelta::new();
let localized_attributes_rules =
index.localized_attributes_rules(rtxn)?.unwrap_or_default();
let mut facet_search_builder = FacetSearchBuilder::new(
global_fields_ids_map.clone(),
localized_attributes_rules,
);
merge_and_send_facet_docids(
merger,
FacetDatabases::new(index),
rtxn,
&mut buffer,
sender.facet_docids(),
&mut facet_field_ids_delta,
&mut facet_search_builder,
)?;
merger_result.facet_field_ids_delta = Some(facet_field_ids_delta);
// merge and send the facet fst and the searchable facet values
facet_search_builder.merge_and_send(index, rtxn, sender.facet_searchable())?;
}
}
}
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "documents_ids");
let _entered = span.enter();
// Send the documents ids unionized with the current one
sender.send_documents_ids(documents_ids).unwrap();
}
// ...
Ok(merger_result)
}
#[derive(Default, Debug)]
pub struct MergerResult {
/// The delta of the prefixes
pub prefix_delta: Option<PrefixDelta>,
/// The field ids that have been modified
pub facet_field_ids_delta: Option<FacetFieldIdsDelta>,
}
use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result};
pub struct GeoExtractor {
rtree: Option<rstar::RTree<GeoPoint>>,
@ -267,80 +65,92 @@ impl GeoExtractor {
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_docids(
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub fn merge_and_send_docids<'extractor>(
mut caches: Vec<BalancedCaches<'extractor>>,
database: Database<Bytes, Bytes>,
rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender,
mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>,
index: &Index,
docids_sender: impl DocidsSender + Sync,
) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
let rtxn = index.read_txn()?;
let mut buffer = Vec::new();
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
docids_sender.write(key, value).unwrap();
register_key(DelAdd::Addition, key)?;
Ok(())
}
Operation::Delete => {
docids_sender.delete(key).unwrap();
register_key(DelAdd::Deletion, key)?;
}
Operation::Ignore => (),
}
}
Ok(())
}
Operation::Ignore => Ok(()),
}
})
})
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_facet_docids(
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub fn merge_and_send_facet_docids<'indexer, 'extractor>(
global_fields_ids_map: GlobalFieldsIdsMap<'indexer>,
mut caches: Vec<BalancedCaches<'extractor>>,
database: FacetDatabases,
rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender,
facet_field_ids_delta: &mut FacetFieldIdsDelta,
facet_search_builder: &mut FacetSearchBuilder,
) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get_cbo_roaring_bytes_value(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
index: &Index,
docids_sender: impl DocidsSender + Sync,
) -> Result<(FacetFieldIdsDelta, FacetSearchBuilder<'indexer>)> {
transpose_and_freeze_caches(&mut caches)?
.into_par_iter()
.map(|frozen| {
let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
let rtxn = index.read_txn()?;
let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?;
let mut facet_search_builder = FacetSearchBuilder::new(
global_fields_ids_map.clone(),
localized_attributes_rules.unwrap_or_default(),
);
let mut buffer = Vec::new();
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key);
facet_search_builder.register_from_key(DelAdd::Addition, key)?;
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
docids_sender.write(key, value).unwrap();
Ok(())
}
Operation::Delete => {
facet_field_ids_delta.register_from_key(key);
facet_search_builder.register_from_key(DelAdd::Deletion, key)?;
docids_sender.delete(key).unwrap();
}
Operation::Ignore => (),
}
}
Ok(())
}
Operation::Ignore => Ok(()),
}
})?;
struct FacetDatabases<'a> {
Ok((facet_field_ids_delta, facet_search_builder))
})
.reduce(
|| Ok((FacetFieldIdsDelta::default(), todo!())),
|lhs, rhs| {
let (lhs_ffid, lhs_fsb) = lhs?;
let (rhs_ffid, rhs_fsb) = rhs?;
let ffid_merged = lhs_ffid.merge(rhs_ffid);
let fsb_merged = todo!();
Ok((ffid_merged, fsb_merged))
},
)
}
pub struct FacetDatabases<'a> {
index: &'a Index,
}
impl<'a> FacetDatabases<'a> {
fn new(index: &'a Index) -> Self {
pub fn new(index: &'a Index) -> Self {
Self { index }
}
@ -361,7 +171,7 @@ impl<'a> FacetDatabases<'a> {
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct FacetFieldIdsDelta {
/// The field ids that have been modified
modified_facet_string_ids: HashSet<FieldId>,
@ -369,13 +179,6 @@ pub struct FacetFieldIdsDelta {
}
impl FacetFieldIdsDelta {
fn new() -> Self {
Self {
modified_facet_string_ids: HashSet::new(),
modified_facet_number_ids: HashSet::new(),
}
}
fn register_facet_string_id(&mut self, field_id: FieldId) {
self.modified_facet_string_ids.insert(field_id);
}
@ -414,6 +217,17 @@ impl FacetFieldIdsDelta {
Some(self.modified_facet_number_ids.iter().copied().collect())
}
}
pub fn merge(mut self, rhs: Self) -> Self {
let Self { modified_facet_number_ids, modified_facet_string_ids } = rhs;
modified_facet_number_ids.into_iter().for_each(|fid| {
self.modified_facet_number_ids.insert(fid);
});
modified_facet_string_ids.into_iter().for_each(|fid| {
self.modified_facet_string_ids.insert(fid);
});
self
}
}
enum Operation {
@ -425,13 +239,10 @@ enum Operation {
/// A function that merges the DelAdd CboRoaringBitmaps with the current bitmap.
fn merge_cbo_bitmaps(
current: Option<&[u8]>,
del: Option<&[u8]>,
add: Option<&[u8]>,
del: Option<RoaringBitmap>,
add: Option<RoaringBitmap>,
) -> Result<Operation> {
let current = current.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
let del = del.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
let add = add.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
match (current, del, add) {
(None, None, None) => Ok(Operation::Ignore), // but it's strange
(None, None, Some(add)) => Ok(Operation::Write(add)),

View File

@ -1,4 +1,7 @@
pub use document_change::{Deletion, DocumentChange, Insertion, Update};
pub use merger::{
merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta,
};
pub use top_level_map::{CowStr, TopLevelMap};
use super::del_add::DelAdd;

View File

@ -1,38 +1,8 @@
use std::sync::Arc;
use rayon::iter::{MapInit, ParallelIterator};
use rayon::iter::ParallelIterator;
pub trait ParallelIteratorExt: ParallelIterator {
/// Maps items based on the init function.
///
/// The init function is ran only as necessary which is basically once by thread.
fn try_map_try_init<F, INIT, T, E, R>(
self,
init: INIT,
map_op: F,
) -> MapInit<
Self,
impl Fn() -> Result<T, Arc<E>> + Sync + Send + Clone,
impl Fn(&mut Result<T, Arc<E>>, Self::Item) -> Result<R, Arc<E>> + Sync + Send + Clone,
>
where
E: Send + Sync,
F: Fn(&mut T, Self::Item) -> Result<R, E> + Sync + Send + Clone,
INIT: Fn() -> Result<T, E> + Sync + Send + Clone,
R: Send,
{
self.map_init(
move || match init() {
Ok(t) => Ok(t),
Err(err) => Err(Arc::new(err)),
},
move |result, item| match result {
Ok(t) => map_op(t, item).map_err(Arc::new),
Err(err) => Err(err.clone()),
},
)
}
/// A method to run a closure of all the items and return an owned error.
///
/// The init function is ran only as necessary which is basically once by thread.
@ -58,17 +28,6 @@ pub trait ParallelIteratorExt: ParallelIterator {
Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")),
}
}
fn try_arc_for_each<F, E>(self, op: F) -> Result<(), E>
where
E: Send + Sync,
F: Fn(Self::Item) -> Result<(), Arc<E>> + Sync + Send + Clone,
{
match self.try_for_each(op) {
Ok(()) => Ok(()),
Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")),
}
}
}
impl<T: ParallelIterator> ParallelIteratorExt for T {}

View File

@ -1,10 +1,16 @@
use std::cell::RefCell;
use std::collections::HashSet;
use std::io::{BufReader, BufWriter, Read, Seek, Write};
use hashbrown::HashMap;
use heed::types::Bytes;
use heed::{BytesDecode, Database, RwTxn};
use roaring::RoaringBitmap;
use heed::{BytesDecode, Database, RoTxn, RwTxn};
use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
use roaring::MultiOps;
use tempfile::tempfile;
use thread_local::ThreadLocal;
use super::indexer::document_changes::RefCellExt;
use crate::heed_codec::StrBEU16Codec;
use crate::{CboRoaringBitmapCodec, Index, Prefix, Result};
@ -38,22 +44,103 @@ impl WordPrefixDocids {
prefixes: &HashSet<Prefix>,
) -> Result<()> {
// We fetch the docids associated to the newly added word prefix fst only.
let mut docids = RoaringBitmap::new();
for prefix in prefixes {
docids.clear();
let prefix = prefix.as_bytes();
for result in self.database.prefix_iter(wtxn, prefix)? {
let (_word, data) = result?;
docids |= &data;
}
// And collect the CboRoaringBitmaps pointers in an HashMap.
let frozen = FrozenPrefixBitmaps::from_prefixes(self.database, wtxn, prefixes)?;
self.prefix_database.put(wtxn, prefix, &docids)?;
// We access this HashMap in parallel to compute the *union* of all
// of them and *serialize* them into files. There is one file by CPU.
let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads());
prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| {
let refcell = local_entries.get_or_try(|| {
tempfile().map(BufWriter::new).map(|f| RefCell::new((Vec::new(), f, Vec::new())))
})?;
let mut refmut = refcell.borrow_mut_or_yield();
let (ref mut index, ref mut file, ref mut buffer) = *refmut;
let output = frozen
.bitmaps(prefix)
.unwrap()
.iter()
.map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes))
.union()?;
buffer.clear();
CboRoaringBitmapCodec::serialize_into(&output, buffer);
index.push(PrefixEntry { prefix, serialized_length: buffer.len() });
file.write_all(buffer)
})?;
drop(frozen);
// We iterate over all the collected and serialized bitmaps through
// the files and entries to eventually put them in the final database.
for refcell in local_entries {
let (index, file, mut buffer) = refcell.into_inner();
let mut file = file.into_inner().map_err(|e| e.into_error())?;
file.rewind()?;
let mut file = BufReader::new(file);
for PrefixEntry { prefix, serialized_length } in index {
buffer.resize(serialized_length, 0);
file.read_exact(&mut buffer)?;
self.prefix_database.remap_data_type::<Bytes>().put(
wtxn,
prefix.as_bytes(),
&buffer,
)?;
}
}
Ok(())
}
}
/// Represents a prefix and the lenght the bitmap takes on disk.
struct PrefixEntry<'a> {
prefix: &'a str,
serialized_length: usize,
}
/// Stores prefixes along with all the pointers to the associated
/// CBoRoaringBitmaps.
///
/// They are collected synchronously and stored into an HashMap. The
/// Synchronous process is doing a small amount of work by just storing
/// pointers. It can then be accessed in parallel to get the associated
/// bitmaps pointers.
struct FrozenPrefixBitmaps<'a, 'rtxn> {
prefixes_bitmaps: HashMap<&'a str, Vec<&'rtxn [u8]>>,
}
impl<'a, 'rtxn> FrozenPrefixBitmaps<'a, 'rtxn> {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
pub fn from_prefixes(
database: Database<Bytes, CboRoaringBitmapCodec>,
rtxn: &'rtxn RoTxn,
prefixes: &'a HashSet<Prefix>,
) -> heed::Result<Self> {
let database = database.remap_data_type::<Bytes>();
let mut prefixes_bitmaps = HashMap::new();
for prefix in prefixes {
let mut bitmap_bytes = Vec::new();
for result in database.prefix_iter(rtxn, prefix.as_bytes())? {
let (_word, bytes) = result?;
bitmap_bytes.push(bytes);
}
assert!(prefixes_bitmaps.insert(prefix.as_str(), bitmap_bytes).is_none());
}
Ok(Self { prefixes_bitmaps })
}
pub fn bitmaps(&self, key: &str) -> Option<&[&'rtxn [u8]]> {
self.prefixes_bitmaps.get(key).map(AsRef::as_ref)
}
}
unsafe impl<'a, 'rtxn> Sync for FrozenPrefixBitmaps<'a, 'rtxn> {}
struct WordPrefixIntegerDocids {
database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,