mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 01:46:28 +00:00 
			
		
		
		
	Introduce a new update for the facet levels
This commit is contained in:
		@@ -2,6 +2,7 @@ use std::borrow::Cow;
 | 
			
		||||
use std::collections::{HashMap, HashSet};
 | 
			
		||||
use std::fs::{File, create_dir_all};
 | 
			
		||||
use std::net::SocketAddr;
 | 
			
		||||
use std::num::NonZeroUsize;
 | 
			
		||||
use std::path::PathBuf;
 | 
			
		||||
use std::str::FromStr;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
@@ -27,7 +28,7 @@ use warp::{Filter, http::Response};
 | 
			
		||||
 | 
			
		||||
use milli::tokenizer::{simple_tokenizer, TokenType};
 | 
			
		||||
use milli::update::UpdateIndexingStep::*;
 | 
			
		||||
use milli::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat};
 | 
			
		||||
use milli::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat, EasingName};
 | 
			
		||||
use milli::{obkv_to_json, Index, UpdateStore, SearchResult, FacetCondition};
 | 
			
		||||
 | 
			
		||||
static GLOBAL_THREAD_POOL: OnceCell<ThreadPool> = OnceCell::new();
 | 
			
		||||
@@ -196,6 +197,7 @@ enum UpdateMeta {
 | 
			
		||||
    DocumentsAddition { method: String, format: String },
 | 
			
		||||
    ClearDocuments,
 | 
			
		||||
    Settings(Settings),
 | 
			
		||||
    FacetLevels(FacetLevels),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
			
		||||
@@ -231,6 +233,15 @@ struct Settings {
 | 
			
		||||
    faceted_attributes: Option<HashMap<String, String>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
			
		||||
#[serde(deny_unknown_fields)]
 | 
			
		||||
#[serde(rename_all = "camelCase")]
 | 
			
		||||
struct FacetLevels {
 | 
			
		||||
    last_level_size: Option<NonZeroUsize>,
 | 
			
		||||
    number_of_levels: Option<NonZeroUsize>,
 | 
			
		||||
    easing_function: Option<String>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Any value that is present is considered Some value, including null.
 | 
			
		||||
fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
 | 
			
		||||
where T: Deserialize<'de>,
 | 
			
		||||
@@ -399,6 +410,36 @@ async fn main() -> anyhow::Result<()> {
 | 
			
		||||
                        Ok(_count) => wtxn.commit().map_err(Into::into),
 | 
			
		||||
                        Err(e) => Err(e.into())
 | 
			
		||||
                    }
 | 
			
		||||
                },
 | 
			
		||||
                UpdateMeta::FacetLevels(levels) => {
 | 
			
		||||
                    // We must use the write transaction of the update here.
 | 
			
		||||
                    let mut wtxn = index_cloned.write_txn()?;
 | 
			
		||||
                    let mut builder = update_builder.facet_levels(&mut wtxn, &index_cloned);
 | 
			
		||||
                    if let Some(value) = levels.last_level_size {
 | 
			
		||||
                        builder.last_level_size(value);
 | 
			
		||||
                    }
 | 
			
		||||
                    if let Some(value) = levels.number_of_levels {
 | 
			
		||||
                        builder.number_of_levels(value);
 | 
			
		||||
                    }
 | 
			
		||||
                    if let Some(value) = levels.easing_function {
 | 
			
		||||
                        let easing_name = if value.eq_ignore_ascii_case("expo") {
 | 
			
		||||
                            EasingName::Expo
 | 
			
		||||
                        } else if value.eq_ignore_ascii_case("quart") {
 | 
			
		||||
                            EasingName::Quart
 | 
			
		||||
                        } else if value.eq_ignore_ascii_case("circ") {
 | 
			
		||||
                            EasingName::Circ
 | 
			
		||||
                        } else if value.eq_ignore_ascii_case("linear") {
 | 
			
		||||
                            EasingName::Linear
 | 
			
		||||
                        } else {
 | 
			
		||||
                            panic!("Invalid easing function name")
 | 
			
		||||
                        };
 | 
			
		||||
                        builder.easing_function(easing_name);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    match builder.execute() {
 | 
			
		||||
                        Ok(()) => wtxn.commit().map_err(Into::into),
 | 
			
		||||
                        Err(e) => Err(e.into())
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
@@ -759,6 +800,19 @@ async fn main() -> anyhow::Result<()> {
 | 
			
		||||
            Ok(warp::reply())
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
    let update_store_cloned = update_store.clone();
 | 
			
		||||
    let update_status_sender_cloned = update_status_sender.clone();
 | 
			
		||||
    let change_facet_levels_route = warp::filters::method::post()
 | 
			
		||||
        .and(warp::path!("facet-levels"))
 | 
			
		||||
        .and(warp::body::json())
 | 
			
		||||
        .map(move |levels: FacetLevels| {
 | 
			
		||||
            let meta = UpdateMeta::FacetLevels(levels);
 | 
			
		||||
            let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
 | 
			
		||||
            let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
 | 
			
		||||
            eprintln!("update {} registered", update_id);
 | 
			
		||||
            warp::reply()
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
    let update_ws_route = warp::ws()
 | 
			
		||||
        .and(warp::path!("updates" / "ws"))
 | 
			
		||||
        .map(move |ws: warp::ws::Ws| {
 | 
			
		||||
@@ -807,6 +861,7 @@ async fn main() -> anyhow::Result<()> {
 | 
			
		||||
        .or(indexing_json_stream_route)
 | 
			
		||||
        .or(clearing_route)
 | 
			
		||||
        .or(change_settings_route)
 | 
			
		||||
        .or(change_facet_levels_route)
 | 
			
		||||
        .or(update_ws_route);
 | 
			
		||||
 | 
			
		||||
    let addr = SocketAddr::from_str(&opt.http_listen_addr)?;
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										247
									
								
								src/update/facet_levels.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										247
									
								
								src/update/facet_levels.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,247 @@
 | 
			
		||||
use std::fs::File;
 | 
			
		||||
use std::num::NonZeroUsize;
 | 
			
		||||
 | 
			
		||||
use grenad::{CompressionType, Reader, Writer, FileFuse};
 | 
			
		||||
use heed::types::{ByteSlice, DecodeIgnore};
 | 
			
		||||
use heed::{BytesEncode, Error};
 | 
			
		||||
use itertools::Itertools;
 | 
			
		||||
use log::debug;
 | 
			
		||||
use roaring::RoaringBitmap;
 | 
			
		||||
 | 
			
		||||
use crate::facet::FacetType;
 | 
			
		||||
use crate::heed_codec::{facet::FacetLevelValueI64Codec, CboRoaringBitmapCodec};
 | 
			
		||||
use crate::Index;
 | 
			
		||||
use crate::update::index_documents::WriteMethod;
 | 
			
		||||
use crate::update::index_documents::{create_writer, writer_into_reader, write_into_lmdb_database};
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Copy, Clone)]
 | 
			
		||||
pub enum EasingName {
 | 
			
		||||
    Expo,
 | 
			
		||||
    Quart,
 | 
			
		||||
    Circ,
 | 
			
		||||
    Linear,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct FacetLevels<'t, 'u, 'i> {
 | 
			
		||||
    wtxn: &'t mut heed::RwTxn<'i, 'u>,
 | 
			
		||||
    index: &'i Index,
 | 
			
		||||
    pub(crate) chunk_compression_type: CompressionType,
 | 
			
		||||
    pub(crate) chunk_compression_level: Option<u32>,
 | 
			
		||||
    pub(crate) chunk_fusing_shrink_size: Option<u64>,
 | 
			
		||||
    number_of_levels: NonZeroUsize,
 | 
			
		||||
    last_level_size: NonZeroUsize,
 | 
			
		||||
    easing_function: EasingName,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<'t, 'u, 'i> FacetLevels<'t, 'u, 'i> {
 | 
			
		||||
    pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> FacetLevels<'t, 'u, 'i> {
 | 
			
		||||
        FacetLevels {
 | 
			
		||||
            wtxn,
 | 
			
		||||
            index,
 | 
			
		||||
            chunk_compression_type: CompressionType::None,
 | 
			
		||||
            chunk_compression_level: None,
 | 
			
		||||
            chunk_fusing_shrink_size: None,
 | 
			
		||||
            number_of_levels: NonZeroUsize::new(5).unwrap(),
 | 
			
		||||
            last_level_size: NonZeroUsize::new(5).unwrap(),
 | 
			
		||||
            easing_function: EasingName::Expo,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn number_of_levels(&mut self, value: NonZeroUsize) -> &mut Self {
 | 
			
		||||
        self.number_of_levels = value;
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn last_level_size(&mut self, value: NonZeroUsize) -> &mut Self {
 | 
			
		||||
        self.last_level_size = value;
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn easing_function(&mut self, value: EasingName) -> &mut Self {
 | 
			
		||||
        self.easing_function = value;
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn execute(self) -> anyhow::Result<()> {
 | 
			
		||||
        // We get the faceted fields to be able to create the facet levels.
 | 
			
		||||
        let faceted_fields = self.index.faceted_fields(self.wtxn)?;
 | 
			
		||||
 | 
			
		||||
        debug!("Computing and writing the facet values levels docids into LMDB on disk...");
 | 
			
		||||
        for (field_id, facet_type) in faceted_fields {
 | 
			
		||||
            if facet_type == FacetType::String { continue }
 | 
			
		||||
 | 
			
		||||
            clear_field_levels(
 | 
			
		||||
                self.wtxn,
 | 
			
		||||
                self.index.facet_field_id_value_docids,
 | 
			
		||||
                field_id,
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
            let content = compute_facet_levels(
 | 
			
		||||
                self.wtxn,
 | 
			
		||||
                self.index.facet_field_id_value_docids,
 | 
			
		||||
                self.chunk_compression_type,
 | 
			
		||||
                self.chunk_compression_level,
 | 
			
		||||
                self.chunk_fusing_shrink_size,
 | 
			
		||||
                self.last_level_size,
 | 
			
		||||
                self.number_of_levels,
 | 
			
		||||
                self.easing_function,
 | 
			
		||||
                field_id,
 | 
			
		||||
                facet_type,
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
            write_into_lmdb_database(
 | 
			
		||||
                self.wtxn,
 | 
			
		||||
                *self.index.facet_field_id_value_docids.as_polymorph(),
 | 
			
		||||
                content,
 | 
			
		||||
                |_, _| anyhow::bail!("invalid facet level merging"),
 | 
			
		||||
                WriteMethod::GetMergePut,
 | 
			
		||||
            )?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn clear_field_levels(
 | 
			
		||||
    wtxn: &mut heed::RwTxn,
 | 
			
		||||
    db: heed::Database<ByteSlice, CboRoaringBitmapCodec>,
 | 
			
		||||
    field_id: u8,
 | 
			
		||||
) -> heed::Result<()>
 | 
			
		||||
{
 | 
			
		||||
    let range = (field_id, 1, i64::MIN, i64::MIN)..=(field_id, u8::MAX, i64::MAX, i64::MAX);
 | 
			
		||||
    db.remap_key_type::<FacetLevelValueI64Codec>()
 | 
			
		||||
        .delete_range(wtxn, &range)
 | 
			
		||||
        .map(drop)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn compute_facet_levels(
 | 
			
		||||
    rtxn: &heed::RoTxn,
 | 
			
		||||
    db: heed::Database<ByteSlice, CboRoaringBitmapCodec>,
 | 
			
		||||
    compression_type: CompressionType,
 | 
			
		||||
    compression_level: Option<u32>,
 | 
			
		||||
    shrink_size: Option<u64>,
 | 
			
		||||
    last_level_size: NonZeroUsize,
 | 
			
		||||
    number_of_levels: NonZeroUsize,
 | 
			
		||||
    easing_function: EasingName,
 | 
			
		||||
    field_id: u8,
 | 
			
		||||
    facet_type: FacetType,
 | 
			
		||||
) -> anyhow::Result<Reader<FileFuse>>
 | 
			
		||||
{
 | 
			
		||||
    let first_level_size = db.prefix_iter(rtxn, &[field_id])?
 | 
			
		||||
        .remap_types::<DecodeIgnore, DecodeIgnore>()
 | 
			
		||||
        .fold(Ok(0usize), |count, result| result.and(count).map(|c| c + 1))?;
 | 
			
		||||
 | 
			
		||||
    // It is forbidden to keep a cursor and write in a database at the same time with LMDB
 | 
			
		||||
    // therefore we write the facet levels entries into a grenad file before transfering them.
 | 
			
		||||
    let mut writer = tempfile::tempfile().and_then(|file| {
 | 
			
		||||
        create_writer(compression_type, compression_level, file)
 | 
			
		||||
    })?;
 | 
			
		||||
 | 
			
		||||
    let level_0_range = (field_id, 0, i64::MIN, i64::MIN)..=(field_id, 0, i64::MAX, i64::MAX);
 | 
			
		||||
    let level_sizes_iter =
 | 
			
		||||
        levels_iterator(first_level_size, last_level_size.get(), number_of_levels.get(), easing_function)
 | 
			
		||||
            .map(|size| (first_level_size as f64 / size as f64).ceil() as usize)
 | 
			
		||||
            .unique()
 | 
			
		||||
            .enumerate()
 | 
			
		||||
            .skip(1);
 | 
			
		||||
 | 
			
		||||
    // TODO we must not create levels with identical group sizes.
 | 
			
		||||
    for (level, level_entry_sizes) in level_sizes_iter {
 | 
			
		||||
        let mut left = 0;
 | 
			
		||||
        let mut right = 0;
 | 
			
		||||
        let mut group_docids = RoaringBitmap::new();
 | 
			
		||||
 | 
			
		||||
        dbg!(level, level_entry_sizes, first_level_size);
 | 
			
		||||
 | 
			
		||||
        let db = db.remap_key_type::<FacetLevelValueI64Codec>();
 | 
			
		||||
        for (i, result) in db.range(rtxn, &level_0_range)?.enumerate() {
 | 
			
		||||
            let ((_field_id, _level, value, _right), docids) = result?;
 | 
			
		||||
 | 
			
		||||
            if i == 0 {
 | 
			
		||||
                left = value;
 | 
			
		||||
            } else if i % level_entry_sizes == 0 {
 | 
			
		||||
                // we found the first bound of the next group, we must store the left
 | 
			
		||||
                // and right bounds associated with the docids.
 | 
			
		||||
                write_entry(&mut writer, field_id, level as u8, left, right, &group_docids)?;
 | 
			
		||||
 | 
			
		||||
                // We save the left bound for the new group and also reset the docids.
 | 
			
		||||
                group_docids = RoaringBitmap::new();
 | 
			
		||||
                left = value;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // The right bound is always the bound we run through.
 | 
			
		||||
            group_docids.union_with(&docids);
 | 
			
		||||
            right = value;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if !group_docids.is_empty() {
 | 
			
		||||
            write_entry(&mut writer, field_id, level as u8, left, right, &group_docids)?;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    writer_into_reader(writer, shrink_size)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn write_entry(
 | 
			
		||||
    writer: &mut Writer<File>,
 | 
			
		||||
    field_id: u8,
 | 
			
		||||
    level: u8,
 | 
			
		||||
    left: i64,
 | 
			
		||||
    right: i64,
 | 
			
		||||
    ids: &RoaringBitmap,
 | 
			
		||||
) -> anyhow::Result<()>
 | 
			
		||||
{
 | 
			
		||||
    let key = (field_id, level, left, right);
 | 
			
		||||
    let key = FacetLevelValueI64Codec::bytes_encode(&key).ok_or(Error::Encoding)?;
 | 
			
		||||
    let data = CboRoaringBitmapCodec::bytes_encode(&ids).ok_or(Error::Encoding)?;
 | 
			
		||||
    writer.insert(&key, &data)?;
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn levels_iterator(
 | 
			
		||||
    first_level_size: usize, // biggest level
 | 
			
		||||
    last_level_size: usize, // smallest level
 | 
			
		||||
    number_of_levels: usize,
 | 
			
		||||
    easing_function: EasingName,
 | 
			
		||||
) -> impl Iterator<Item=usize>
 | 
			
		||||
{
 | 
			
		||||
    let easing_function = match easing_function {
 | 
			
		||||
        EasingName::Expo => ease_out_expo,
 | 
			
		||||
        EasingName::Quart => ease_out_quart,
 | 
			
		||||
        EasingName::Circ => ease_out_circ,
 | 
			
		||||
        EasingName::Linear => ease_out_linear,
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let b = last_level_size as f64;
 | 
			
		||||
    let end = first_level_size as f64;
 | 
			
		||||
    let c = end - b;
 | 
			
		||||
    let d = number_of_levels;
 | 
			
		||||
    (0..=d).map(move |t| ((end + b) - easing_function(t as f64, b, c, d as f64)) as usize)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Go look at the function definitions here:
 | 
			
		||||
// https://docs.rs/easer/0.2.1/easer/index.html
 | 
			
		||||
// https://easings.net/#easeOutExpo
 | 
			
		||||
fn ease_out_expo(t: f64, b: f64, c: f64, d: f64) -> f64 {
 | 
			
		||||
    if t == d {
 | 
			
		||||
        b + c
 | 
			
		||||
    } else {
 | 
			
		||||
        c * (-2.0_f64.powf(-10.0 * t / d) + 1.0) + b
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// https://easings.net/#easeOutCirc
 | 
			
		||||
fn ease_out_circ(t: f64, b: f64, c: f64, d: f64) -> f64 {
 | 
			
		||||
    let t = t / d - 1.0;
 | 
			
		||||
    c * (1.0 - t * t).sqrt() + b
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// https://easings.net/#easeOutQuart
 | 
			
		||||
fn ease_out_quart(t: f64, b: f64, c: f64, d: f64) -> f64 {
 | 
			
		||||
    let t = t / d - 1.0;
 | 
			
		||||
    -c * ((t * t * t * t) - 1.0) + b
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn ease_out_linear(t: f64, b: f64, c: f64, d: f64) -> f64 {
 | 
			
		||||
    c * t / d + b
 | 
			
		||||
}
 | 
			
		||||
@@ -1,125 +0,0 @@
 | 
			
		||||
use std::fs::File;
 | 
			
		||||
 | 
			
		||||
use grenad::{CompressionType, Reader, Writer, FileFuse};
 | 
			
		||||
use heed::types::{ByteSlice, DecodeIgnore};
 | 
			
		||||
use heed::{BytesEncode, Error};
 | 
			
		||||
use roaring::RoaringBitmap;
 | 
			
		||||
 | 
			
		||||
use crate::facet::FacetType;
 | 
			
		||||
use crate::heed_codec::{facet::FacetLevelValueI64Codec, CboRoaringBitmapCodec};
 | 
			
		||||
use crate::update::index_documents::{create_writer, writer_into_reader};
 | 
			
		||||
 | 
			
		||||
pub fn clear_field_levels(
 | 
			
		||||
    wtxn: &mut heed::RwTxn,
 | 
			
		||||
    db: heed::Database<ByteSlice, CboRoaringBitmapCodec>,
 | 
			
		||||
    field_id: u8,
 | 
			
		||||
) -> heed::Result<()>
 | 
			
		||||
{
 | 
			
		||||
    let range = (field_id, 1, i64::MIN, i64::MIN)..=(field_id, u8::MAX, i64::MAX, i64::MAX);
 | 
			
		||||
    db.remap_key_type::<FacetLevelValueI64Codec>()
 | 
			
		||||
        .delete_range(wtxn, &range)
 | 
			
		||||
        .map(drop)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn compute_facet_levels(
 | 
			
		||||
    rtxn: &heed::RoTxn,
 | 
			
		||||
    db: heed::Database<ByteSlice, CboRoaringBitmapCodec>,
 | 
			
		||||
    compression_type: CompressionType,
 | 
			
		||||
    compression_level: Option<u32>,
 | 
			
		||||
    shrink_size: Option<u64>,
 | 
			
		||||
    field_id: u8,
 | 
			
		||||
    facet_type: FacetType,
 | 
			
		||||
) -> anyhow::Result<Reader<FileFuse>>
 | 
			
		||||
{
 | 
			
		||||
    let last_level_size = 5;
 | 
			
		||||
    let number_of_levels = 5;
 | 
			
		||||
    let first_level_size = db.prefix_iter(rtxn, &[field_id])?
 | 
			
		||||
        .remap_types::<DecodeIgnore, DecodeIgnore>()
 | 
			
		||||
        .fold(Ok(0u64), |count, result| result.and(count).map(|c| c + 1))?;
 | 
			
		||||
 | 
			
		||||
    // It is forbidden to keep a cursor and write in a database at the same time with LMDB
 | 
			
		||||
    // therefore we write the facet levels entries into a grenad file before transfering them.
 | 
			
		||||
    let mut writer = tempfile::tempfile().and_then(|file| {
 | 
			
		||||
        create_writer(compression_type, compression_level, file)
 | 
			
		||||
    })?;
 | 
			
		||||
 | 
			
		||||
    let level_0_range = (field_id, 0, i64::MIN, i64::MIN)..=(field_id, 0, i64::MAX, i64::MAX);
 | 
			
		||||
    let level_sizes_iter = levels_iterator(first_level_size, last_level_size, number_of_levels)
 | 
			
		||||
        .enumerate()
 | 
			
		||||
        .skip(1);
 | 
			
		||||
 | 
			
		||||
    // TODO we must not create levels with identical group sizes.
 | 
			
		||||
    for (level, size) in level_sizes_iter {
 | 
			
		||||
        let level_entry_sizes = (first_level_size as f64 / size as f64).ceil() as usize;
 | 
			
		||||
        let mut left = 0;
 | 
			
		||||
        let mut right = 0;
 | 
			
		||||
        let mut group_docids = RoaringBitmap::new();
 | 
			
		||||
 | 
			
		||||
        let db = db.remap_key_type::<FacetLevelValueI64Codec>();
 | 
			
		||||
        for (i, result) in db.range(rtxn, &level_0_range)?.enumerate() {
 | 
			
		||||
            let ((_field_id, _level, value, _right), docids) = result?;
 | 
			
		||||
 | 
			
		||||
            if i == 0 {
 | 
			
		||||
                left = value;
 | 
			
		||||
            } else if i % level_entry_sizes == 0 {
 | 
			
		||||
                // we found the first bound of the next group, we must store the left
 | 
			
		||||
                // and right bounds associated with the docids.
 | 
			
		||||
                write_entry(&mut writer, field_id, level as u8, left, right, &group_docids)?;
 | 
			
		||||
 | 
			
		||||
                // We save the left bound for the new group and also reset the docids.
 | 
			
		||||
                group_docids = RoaringBitmap::new();
 | 
			
		||||
                left = value;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // The right bound is always the bound we run through.
 | 
			
		||||
            group_docids.union_with(&docids);
 | 
			
		||||
            right = value;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if !group_docids.is_empty() {
 | 
			
		||||
            write_entry(&mut writer, field_id, level as u8, left, right, &group_docids)?;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    writer_into_reader(writer, shrink_size)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn write_entry(
 | 
			
		||||
    writer: &mut Writer<File>,
 | 
			
		||||
    field_id: u8,
 | 
			
		||||
    level: u8,
 | 
			
		||||
    left: i64,
 | 
			
		||||
    right: i64,
 | 
			
		||||
    ids: &RoaringBitmap,
 | 
			
		||||
) -> anyhow::Result<()>
 | 
			
		||||
{
 | 
			
		||||
    let key = (field_id, level, left, right);
 | 
			
		||||
    let key = FacetLevelValueI64Codec::bytes_encode(&key).ok_or(Error::Encoding)?;
 | 
			
		||||
    let data = CboRoaringBitmapCodec::bytes_encode(&ids).ok_or(Error::Encoding)?;
 | 
			
		||||
    writer.insert(&key, &data)?;
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn levels_iterator(
 | 
			
		||||
    first_level_size: u64, // biggest level
 | 
			
		||||
    last_level_size: u64, // smallest level
 | 
			
		||||
    number_of_levels: u64,
 | 
			
		||||
) -> impl Iterator<Item=u64>
 | 
			
		||||
{
 | 
			
		||||
    // Go look at the function definitions here:
 | 
			
		||||
    // https://docs.rs/easer/0.2.1/easer/index.html
 | 
			
		||||
    // https://easings.net/#easeOutExpo
 | 
			
		||||
    fn ease_out_expo(t: f64, b: f64, c: f64, d: f64) -> f64 {
 | 
			
		||||
        if t == d {
 | 
			
		||||
            b + c
 | 
			
		||||
        } else {
 | 
			
		||||
            c * (-2.0_f64.powf(-10.0 * t / d) + 1.0) + b
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let b = last_level_size as f64;
 | 
			
		||||
    let end = first_level_size as f64;
 | 
			
		||||
    let c = end - b;
 | 
			
		||||
    let d = number_of_levels;
 | 
			
		||||
    (0..=d).map(move |t| ((end + b) - ease_out_expo(t as f64, b, c, d as f64)) as u64)
 | 
			
		||||
}
 | 
			
		||||
@@ -2,6 +2,7 @@ use std::borrow::Cow;
 | 
			
		||||
use std::collections::HashSet;
 | 
			
		||||
use std::fs::File;
 | 
			
		||||
use std::io::{self, Seek, SeekFrom};
 | 
			
		||||
use std::num::NonZeroUsize;
 | 
			
		||||
use std::sync::mpsc::sync_channel;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
 | 
			
		||||
@@ -14,32 +15,29 @@ use memmap::Mmap;
 | 
			
		||||
use rayon::prelude::*;
 | 
			
		||||
use rayon::ThreadPool;
 | 
			
		||||
 | 
			
		||||
use crate::facet::FacetType;
 | 
			
		||||
use crate::index::Index;
 | 
			
		||||
use crate::update::UpdateIndexingStep;
 | 
			
		||||
use crate::update::{FacetLevels, UpdateIndexingStep};
 | 
			
		||||
use self::store::{Store, Readers};
 | 
			
		||||
use self::merge_function::{
 | 
			
		||||
    main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
 | 
			
		||||
    docid_word_positions_merge, documents_merge, facet_field_value_docids_merge,
 | 
			
		||||
};
 | 
			
		||||
pub use self::transform::{Transform, TransformOutput};
 | 
			
		||||
pub use self::facet_level::{clear_field_levels, compute_facet_levels};
 | 
			
		||||
 | 
			
		||||
use crate::MergeFn;
 | 
			
		||||
use super::UpdateBuilder;
 | 
			
		||||
 | 
			
		||||
mod facet_level;
 | 
			
		||||
mod merge_function;
 | 
			
		||||
mod store;
 | 
			
		||||
mod transform;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Copy, Clone)]
 | 
			
		||||
enum WriteMethod {
 | 
			
		||||
pub enum WriteMethod {
 | 
			
		||||
    Append,
 | 
			
		||||
    GetMergePut,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> {
 | 
			
		||||
pub fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> {
 | 
			
		||||
    let mut builder = Writer::builder();
 | 
			
		||||
    builder.compression_type(typ);
 | 
			
		||||
    if let Some(level) = level {
 | 
			
		||||
@@ -48,7 +46,7 @@ fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Re
 | 
			
		||||
    builder.build(file)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn create_sorter(
 | 
			
		||||
pub fn create_sorter(
 | 
			
		||||
    merge: MergeFn,
 | 
			
		||||
    chunk_compression_type: CompressionType,
 | 
			
		||||
    chunk_compression_level: Option<u32>,
 | 
			
		||||
@@ -74,7 +72,7 @@ fn create_sorter(
 | 
			
		||||
    builder.build()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn writer_into_reader(writer: Writer<File>, shrink_size: Option<u64>) -> anyhow::Result<Reader<FileFuse>> {
 | 
			
		||||
pub fn writer_into_reader(writer: Writer<File>, shrink_size: Option<u64>) -> anyhow::Result<Reader<FileFuse>> {
 | 
			
		||||
    let mut file = writer.into_inner()?;
 | 
			
		||||
    file.seek(SeekFrom::Start(0))?;
 | 
			
		||||
    let file = if let Some(shrink_size) = shrink_size {
 | 
			
		||||
@@ -85,13 +83,13 @@ fn writer_into_reader(writer: Writer<File>, shrink_size: Option<u64>) -> anyhow:
 | 
			
		||||
    Reader::new(file).map_err(Into::into)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn merge_readers(sources: Vec<Reader<FileFuse>>, merge: MergeFn) -> Merger<FileFuse, MergeFn> {
 | 
			
		||||
pub fn merge_readers(sources: Vec<Reader<FileFuse>>, merge: MergeFn) -> Merger<FileFuse, MergeFn> {
 | 
			
		||||
    let mut builder = Merger::builder(merge);
 | 
			
		||||
    builder.extend(sources);
 | 
			
		||||
    builder.build()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn merge_into_lmdb_database(
 | 
			
		||||
pub fn merge_into_lmdb_database(
 | 
			
		||||
    wtxn: &mut heed::RwTxn,
 | 
			
		||||
    database: heed::PolyDatabase,
 | 
			
		||||
    sources: Vec<Reader<FileFuse>>,
 | 
			
		||||
@@ -135,7 +133,7 @@ fn merge_into_lmdb_database(
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn write_into_lmdb_database(
 | 
			
		||||
pub fn write_into_lmdb_database(
 | 
			
		||||
    wtxn: &mut heed::RwTxn,
 | 
			
		||||
    database: heed::PolyDatabase,
 | 
			
		||||
    mut reader: Reader<FileFuse>,
 | 
			
		||||
@@ -210,6 +208,8 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a> {
 | 
			
		||||
    pub(crate) chunk_compression_level: Option<u32>,
 | 
			
		||||
    pub(crate) chunk_fusing_shrink_size: Option<u64>,
 | 
			
		||||
    pub(crate) thread_pool: Option<&'a ThreadPool>,
 | 
			
		||||
    facet_number_of_levels: Option<NonZeroUsize>,
 | 
			
		||||
    facet_last_level_size: Option<NonZeroUsize>,
 | 
			
		||||
    update_method: IndexDocumentsMethod,
 | 
			
		||||
    update_format: UpdateFormat,
 | 
			
		||||
    autogenerate_docids: bool,
 | 
			
		||||
@@ -228,6 +228,8 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
 | 
			
		||||
            chunk_compression_level: None,
 | 
			
		||||
            chunk_fusing_shrink_size: None,
 | 
			
		||||
            thread_pool: None,
 | 
			
		||||
            facet_number_of_levels: None,
 | 
			
		||||
            facet_last_level_size: None,
 | 
			
		||||
            update_method: IndexDocumentsMethod::ReplaceDocuments,
 | 
			
		||||
            update_format: UpdateFormat::Json,
 | 
			
		||||
            autogenerate_docids: true,
 | 
			
		||||
@@ -478,9 +480,6 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
 | 
			
		||||
        // We write the external documents ids into the main database.
 | 
			
		||||
        self.index.put_external_documents_ids(self.wtxn, &external_documents_ids)?;
 | 
			
		||||
 | 
			
		||||
        // We get the faceted fields to be able to create the facet levels.
 | 
			
		||||
        let faceted_fields = self.index.faceted_fields(self.wtxn)?;
 | 
			
		||||
 | 
			
		||||
        // We merge the new documents ids with the existing ones.
 | 
			
		||||
        documents_ids.union_with(&new_documents_ids);
 | 
			
		||||
        documents_ids.union_with(&replaced_documents_ids);
 | 
			
		||||
@@ -583,34 +582,17 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        debug!("Computing and writing the facet values levels docids into LMDB on disk...");
 | 
			
		||||
        for (field_id, facet_type) in faceted_fields {
 | 
			
		||||
            if facet_type == FacetType::String { continue }
 | 
			
		||||
 | 
			
		||||
            clear_field_levels(
 | 
			
		||||
                self.wtxn,
 | 
			
		||||
                self.index.facet_field_id_value_docids,
 | 
			
		||||
                field_id,
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
            let content = compute_facet_levels(
 | 
			
		||||
                self.wtxn,
 | 
			
		||||
                self.index.facet_field_id_value_docids,
 | 
			
		||||
                chunk_compression_type,
 | 
			
		||||
                chunk_compression_level,
 | 
			
		||||
                chunk_fusing_shrink_size,
 | 
			
		||||
                field_id,
 | 
			
		||||
                facet_type,
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
            write_into_lmdb_database(
 | 
			
		||||
                self.wtxn,
 | 
			
		||||
                *self.index.facet_field_id_value_docids.as_polymorph(),
 | 
			
		||||
                content,
 | 
			
		||||
                |_, _| anyhow::bail!("invalid facet level merging"),
 | 
			
		||||
                WriteMethod::GetMergePut,
 | 
			
		||||
            )?;
 | 
			
		||||
        let mut builder = FacetLevels::new(self.wtxn, self.index);
 | 
			
		||||
        builder.chunk_compression_type = self.chunk_compression_type;
 | 
			
		||||
        builder.chunk_compression_level = self.chunk_compression_level;
 | 
			
		||||
        builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
 | 
			
		||||
        if let Some(value) = self.facet_number_of_levels {
 | 
			
		||||
            builder.number_of_levels(value);
 | 
			
		||||
        }
 | 
			
		||||
        if let Some(value) = self.facet_last_level_size {
 | 
			
		||||
            builder.last_level_size(value);
 | 
			
		||||
        }
 | 
			
		||||
        builder.execute()?;
 | 
			
		||||
 | 
			
		||||
        debug_assert_eq!(database_count, total_databases);
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,7 @@
 | 
			
		||||
mod available_documents_ids;
 | 
			
		||||
mod clear_documents;
 | 
			
		||||
mod delete_documents;
 | 
			
		||||
mod facet_levels;
 | 
			
		||||
mod index_documents;
 | 
			
		||||
mod settings;
 | 
			
		||||
mod update_builder;
 | 
			
		||||
@@ -11,6 +12,7 @@ pub use self::available_documents_ids::AvailableDocumentsIds;
 | 
			
		||||
pub use self::clear_documents::ClearDocuments;
 | 
			
		||||
pub use self::delete_documents::DeleteDocuments;
 | 
			
		||||
pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat};
 | 
			
		||||
pub use self::facet_levels::{FacetLevels, EasingName};
 | 
			
		||||
pub use self::settings::Settings;
 | 
			
		||||
pub use self::update_builder::UpdateBuilder;
 | 
			
		||||
pub use self::update_step::UpdateIndexingStep;
 | 
			
		||||
 
 | 
			
		||||
@@ -2,7 +2,7 @@ use grenad::CompressionType;
 | 
			
		||||
use rayon::ThreadPool;
 | 
			
		||||
 | 
			
		||||
use crate::Index;
 | 
			
		||||
use super::{ClearDocuments, DeleteDocuments, IndexDocuments, Settings};
 | 
			
		||||
use super::{ClearDocuments, DeleteDocuments, IndexDocuments, Settings, FacetLevels};
 | 
			
		||||
 | 
			
		||||
pub struct UpdateBuilder<'a> {
 | 
			
		||||
    pub(crate) log_every_n: Option<usize>,
 | 
			
		||||
@@ -118,4 +118,19 @@ impl<'a> UpdateBuilder<'a> {
 | 
			
		||||
 | 
			
		||||
        builder
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn facet_levels<'t, 'u, 'i>(
 | 
			
		||||
        self,
 | 
			
		||||
        wtxn: &'t mut heed::RwTxn<'i, 'u>,
 | 
			
		||||
        index: &'i Index,
 | 
			
		||||
    ) -> FacetLevels<'t, 'u, 'i>
 | 
			
		||||
    {
 | 
			
		||||
        let mut builder = FacetLevels::new(wtxn, index);
 | 
			
		||||
 | 
			
		||||
        builder.chunk_compression_type = self.chunk_compression_type;
 | 
			
		||||
        builder.chunk_compression_level = self.chunk_compression_level;
 | 
			
		||||
        builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
 | 
			
		||||
 | 
			
		||||
        builder
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user