mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 12:46:53 +00:00
Compare commits
25 Commits
main
...
measure-ne
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
da3f08a479 | ||
|
|
5b19df1dba | ||
|
|
2ca596003a | ||
|
|
9b31c09dde | ||
|
|
74a587785a | ||
|
|
d612ea2a90 | ||
|
|
63a7fe5586 | ||
|
|
53120eb2a4 | ||
|
|
19e512622e | ||
|
|
86e5f74fce | ||
|
|
a73f635013 | ||
|
|
10aac4d77f | ||
|
|
aa2f649713 | ||
|
|
a1f266dc03 | ||
|
|
566bb51eda | ||
|
|
c37396714d | ||
|
|
c5473dc2b5 | ||
|
|
3cdc7f2de4 | ||
|
|
343bae478a | ||
|
|
8b41f1a69d | ||
|
|
59a2f8d0ab | ||
|
|
508be2137e | ||
|
|
50bf485dc0 | ||
|
|
6e4855bbc5 | ||
|
|
ac5da77746 |
72
Cargo.lock
generated
72
Cargo.lock
generated
@@ -871,18 +871,18 @@ checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
|
||||
|
||||
[[package]]
|
||||
name = "bytemuck"
|
||||
version = "1.23.2"
|
||||
version = "1.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3995eaeebcdf32f91f980d360f78732ddc061097ab4e39991ae7a6ace9194677"
|
||||
checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4"
|
||||
dependencies = [
|
||||
"bytemuck_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bytemuck_derive"
|
||||
version = "1.10.1"
|
||||
version = "1.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4f154e572231cb6ba2bd1176980827e3d5dc04cc183a75dea38109fbdd672d29"
|
||||
checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1278,6 +1278,18 @@ dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console"
|
||||
version = "0.16.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b430743a6eb14e9764d4260d4c0d8123087d504eeb9c48f2b2a5e810dd369df4"
|
||||
dependencies = [
|
||||
"encode_unicode",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"windows-sys 0.61.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-random"
|
||||
version = "0.1.18"
|
||||
@@ -2009,6 +2021,16 @@ dependencies = [
|
||||
"syn 2.0.106",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
|
||||
dependencies = [
|
||||
"log",
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equivalent"
|
||||
version = "1.0.2"
|
||||
@@ -2892,7 +2914,7 @@ source = "git+https://github.com/dureuill/hf-hub.git?branch=rust_tls#88d4f11cb9f
|
||||
dependencies = [
|
||||
"dirs",
|
||||
"http 1.3.1",
|
||||
"indicatif",
|
||||
"indicatif 0.17.11",
|
||||
"log",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
@@ -3289,13 +3311,25 @@ version = "0.17.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
|
||||
dependencies = [
|
||||
"console",
|
||||
"console 0.15.11",
|
||||
"number_prefix",
|
||||
"portable-atomic",
|
||||
"unicode-width",
|
||||
"web-time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indicatif"
|
||||
version = "0.18.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ade6dfcba0dfb62ad59e59e7241ec8912af34fd29e0e743e3db992bd278e8b65"
|
||||
dependencies = [
|
||||
"console 0.16.1",
|
||||
"portable-atomic",
|
||||
"unit-prefix",
|
||||
"web-time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inout"
|
||||
version = "0.1.4"
|
||||
@@ -3311,7 +3345,7 @@ version = "1.39.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "810ae6042d48e2c9e9215043563a58a80b877bc863228a74cf10c49d4620a6f5"
|
||||
dependencies = [
|
||||
"console",
|
||||
"console 0.15.11",
|
||||
"lazy_static",
|
||||
"linked-hash-map",
|
||||
"pest",
|
||||
@@ -4161,12 +4195,19 @@ name = "meilitool"
|
||||
version = "1.24.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bitpacking",
|
||||
"byte-unit",
|
||||
"bytemuck",
|
||||
"clap",
|
||||
"dump",
|
||||
"file-store",
|
||||
"indexmap",
|
||||
"indicatif 0.18.2",
|
||||
"meilisearch-auth",
|
||||
"meilisearch-types",
|
||||
"quickcheck",
|
||||
"rayon",
|
||||
"roaring 0.11.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
@@ -5139,6 +5180,17 @@ dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quickcheck"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6"
|
||||
dependencies = [
|
||||
"env_logger",
|
||||
"log",
|
||||
"rand 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.11.9"
|
||||
@@ -7005,6 +7057,12 @@ version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
|
||||
|
||||
[[package]]
|
||||
name = "unit-prefix"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "323402cff2dd658f39ca17c789b502021b3f18707c91cdf22e3838e1b4023817"
|
||||
|
||||
[[package]]
|
||||
name = "unsafe-libyaml"
|
||||
version = "0.2.11"
|
||||
|
||||
@@ -10,14 +10,23 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.98"
|
||||
bitpacking = "0.9.2"
|
||||
byte-unit = "5.1.6"
|
||||
bytemuck = "1.24.0"
|
||||
clap = { version = "4.5.40", features = ["derive"] }
|
||||
dump = { path = "../dump" }
|
||||
file-store = { path = "../file-store" }
|
||||
indexmap = { version = "2.9.0", features = ["serde"] }
|
||||
indicatif = { version = "0.18.2", default-features = false }
|
||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||
meilisearch-types = { path = "../meilisearch-types" }
|
||||
new-roaring = { package = "roaring", version = "0.11.2" }
|
||||
rayon = "1.11.0"
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = { version = "1.0.140", features = ["preserve_order"] }
|
||||
tempfile = "3.20.0"
|
||||
time = { version = "0.3.41", features = ["formatting", "parsing", "alloc"] }
|
||||
uuid = { version = "1.17.0", features = ["v4"], default-features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
quickcheck = "1.0.3"
|
||||
|
||||
9
crates/meilitool/quickcheck.sh
Executable file
9
crates/meilitool/quickcheck.sh
Executable file
@@ -0,0 +1,9 @@
|
||||
#!/bin/bash
|
||||
|
||||
while true
|
||||
do
|
||||
cargo test qc_
|
||||
if [[ x$? != x0 ]] ; then
|
||||
exit $?
|
||||
fi
|
||||
done
|
||||
27
crates/meilitool/src/bytes_counter.rs
Normal file
27
crates/meilitool/src/bytes_counter.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
use std::io;
|
||||
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
pub struct BytesCounter {
|
||||
bytes_written: usize,
|
||||
}
|
||||
|
||||
impl BytesCounter {
|
||||
pub fn new() -> Self {
|
||||
BytesCounter { bytes_written: 0 }
|
||||
}
|
||||
|
||||
pub fn bytes_written(&self) -> usize {
|
||||
self.bytes_written
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for BytesCounter {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.bytes_written += buf.len();
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,14 +1,18 @@
|
||||
#![allow(clippy::result_large_err)]
|
||||
|
||||
use std::fmt;
|
||||
use std::fs::{read_dir, read_to_string, remove_file, File};
|
||||
use std::io::{BufWriter, Write as _};
|
||||
use std::path::PathBuf;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use bitpacking::{BitPacker, BitPacker1x, BitPacker4x, BitPacker8x};
|
||||
use byte_unit::UnitType;
|
||||
use clap::{Parser, Subcommand, ValueEnum};
|
||||
use dump::{DumpWriter, IndexMetadata};
|
||||
use file_store::FileStore;
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use meilisearch_auth::{open_auth_store_env, AuthController};
|
||||
use meilisearch_types::batches::Batch;
|
||||
use meilisearch_types::heed::types::{Bytes, SerdeJson, Str};
|
||||
@@ -19,16 +23,20 @@ use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
|
||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||
use meilisearch_types::milli::index::EmbeddingsWithMetadata;
|
||||
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
|
||||
use meilisearch_types::milli::{obkv_to_json, BEU32};
|
||||
use meilisearch_types::milli::{obkv_to_json, CboRoaringBitmapCodec, BEU32};
|
||||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::versioning::{get_version, parse_version};
|
||||
use meilisearch_types::Index;
|
||||
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
|
||||
use serde_json::Value::Object;
|
||||
use time::macros::format_description;
|
||||
use time::OffsetDateTime;
|
||||
use upgrade::OfflineUpgrade;
|
||||
use uuid_codec::UuidCodec;
|
||||
|
||||
use crate::bytes_counter::BytesCounter;
|
||||
|
||||
mod bytes_counter;
|
||||
mod upgrade;
|
||||
mod uuid_codec;
|
||||
|
||||
@@ -126,7 +134,9 @@ enum Command {
|
||||
/// before running the copy and compaction. This way the current indexation must finish before
|
||||
/// the compaction operation can start. Once the compaction is done, the big index is replaced
|
||||
/// by the compacted one and the mutable transaction is released.
|
||||
IndexCompaction { index_name: String },
|
||||
IndexCompaction {
|
||||
index_name: String,
|
||||
},
|
||||
|
||||
/// Uses the hair dryer the dedicate pages hot in cache
|
||||
///
|
||||
@@ -140,6 +150,25 @@ enum Command {
|
||||
#[arg(long, value_delimiter = ',')]
|
||||
index_part: Vec<IndexPart>,
|
||||
},
|
||||
|
||||
/// Measures the size of the indexes with the previous roaring version
|
||||
/// and the brand new one with support for run-length encoding.
|
||||
///
|
||||
/// You can change the number of threads used for the measurement by
|
||||
/// using the RAYON_NUM_THREADS environment variable.
|
||||
MeasureNewRoaringDiskUsage {
|
||||
#[arg(long, value_delimiter = ',')]
|
||||
index_name: Vec<String>,
|
||||
|
||||
#[arg(long, value_delimiter = ',')]
|
||||
index_part: Vec<RoaringBasedIndexPart>,
|
||||
},
|
||||
|
||||
CompactTaskQueue,
|
||||
|
||||
ResizeTaskQueue {
|
||||
new_size: byte_unit::Byte,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, ValueEnum)]
|
||||
@@ -148,6 +177,32 @@ enum IndexPart {
|
||||
Hannoy,
|
||||
}
|
||||
|
||||
#[derive(Clone, ValueEnum)]
|
||||
enum RoaringBasedIndexPart {
|
||||
WordDocids,
|
||||
WordPrefixDocids,
|
||||
}
|
||||
|
||||
impl RoaringBasedIndexPart {
|
||||
pub fn database(&self, index: &Index) -> Database<Bytes, CboRoaringBitmapCodec> {
|
||||
use RoaringBasedIndexPart::*;
|
||||
match self {
|
||||
WordDocids => index.word_docids.remap_key_type(),
|
||||
WordPrefixDocids => index.word_prefix_docids.remap_key_type(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for RoaringBasedIndexPart {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
use RoaringBasedIndexPart::*;
|
||||
match self {
|
||||
WordDocids => write!(f, "word-docids"),
|
||||
WordPrefixDocids => write!(f, "word-prefix-docids"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let Cli { db_path, command } = Cli::parse();
|
||||
|
||||
@@ -169,9 +224,66 @@ fn main() -> anyhow::Result<()> {
|
||||
Command::HairDryer { index_name, index_part } => {
|
||||
hair_dryer(db_path, &index_name, &index_part)
|
||||
}
|
||||
Command::MeasureNewRoaringDiskUsage { index_name, index_part } => {
|
||||
measure_new_roaring_disk_usage(db_path, &index_name, &index_part)
|
||||
}
|
||||
Command::CompactTaskQueue => compact_task_queue(db_path),
|
||||
Command::ResizeTaskQueue { new_size } => resize_task_queue(db_path, new_size),
|
||||
}
|
||||
}
|
||||
|
||||
fn resize_task_queue(db_path: PathBuf, new_size: byte_unit::Byte) -> anyhow::Result<()> {
|
||||
let new_size = new_size.as_u64();
|
||||
let path = db_path.join("tasks");
|
||||
let env = unsafe {
|
||||
EnvOpenOptions::new()
|
||||
.read_txn_without_tls()
|
||||
.max_dbs(100)
|
||||
.map_size(new_size as usize)
|
||||
.open(&path)
|
||||
}
|
||||
.with_context(|| format!("While trying to open {:?}", path.display()))?;
|
||||
|
||||
eprintln!("Acquiring a write transaction for tasks queue...");
|
||||
|
||||
let wtxn = env.write_txn()?;
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compact_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
|
||||
let path = db_path.join("tasks");
|
||||
let env = unsafe { EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&path) }
|
||||
.with_context(|| format!("While trying to open {:?}", path.display()))?;
|
||||
|
||||
eprintln!("Acquiring a write transaction for tasks queue...");
|
||||
|
||||
let wtxn = env.write_txn()?;
|
||||
|
||||
/// The name of the copy of the data.mdb file used during compaction.
|
||||
const DATA_MDB_COPY_NAME: &str = "data.mdb.cpy";
|
||||
|
||||
let src_path = env.path().join("data.mdb");
|
||||
let dst_path = tempfile::TempPath::from_path(path.join(DATA_MDB_COPY_NAME));
|
||||
let file = File::create(&dst_path)?;
|
||||
let mut file = tempfile::NamedTempFile::from_parts(file, dst_path);
|
||||
|
||||
env.copy_to_file(file.as_file_mut(), meilisearch_types::heed::CompactionOption::Enabled)?;
|
||||
match file.persist(src_path) {
|
||||
Ok(file) => file.sync_all()?,
|
||||
// TODO see if we have a _resource busy_ error and probably handle this by:
|
||||
// 1. closing the index, 2. replacing and 3. reopening it
|
||||
Err(tempfile::PersistError { error, file: _ }) => return Err(error.into()),
|
||||
};
|
||||
|
||||
eprintln!("Task persisted, aborting wtxn...");
|
||||
|
||||
wtxn.abort();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clears the task queue located at `db_path`.
|
||||
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
|
||||
let path = db_path.join("tasks");
|
||||
@@ -694,3 +806,523 @@ fn hair_dryer(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn measure_new_roaring_disk_usage(
|
||||
db_path: PathBuf,
|
||||
index_names: &[String],
|
||||
index_parts: &[RoaringBasedIndexPart],
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let index_scheduler_path = db_path.join("tasks");
|
||||
let env = unsafe {
|
||||
EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path)
|
||||
}
|
||||
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
||||
|
||||
eprintln!("Trying to get a read transaction on the index scheduler...");
|
||||
|
||||
let scheduler_rtxn = env.read_txn()?;
|
||||
let index_mapping: Database<Str, UuidCodec> =
|
||||
try_opening_database(&env, &scheduler_rtxn, "index-mapping")?;
|
||||
|
||||
eprintln!("Got one! Reading indexes...");
|
||||
|
||||
let thread_count = rayon::current_num_threads();
|
||||
assert!(thread_count > 0);
|
||||
|
||||
for result in index_mapping.iter(&scheduler_rtxn)? {
|
||||
let (uid, uuid) = result?;
|
||||
if index_names.iter().any(|i| i == uid) {
|
||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||
let index =
|
||||
Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
|
||||
.with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
|
||||
for index_part in index_parts {
|
||||
let database = index_part.database(&index);
|
||||
println!("{uid} -> {index_part}");
|
||||
|
||||
eprintln!("\tTrying to get {thread_count} read transactions on {uid}...");
|
||||
let rtxns = std::iter::repeat_with(|| index.read_txn())
|
||||
.take(thread_count)
|
||||
.collect::<meilisearch_types::heed::Result<Vec<_>>>()?;
|
||||
eprintln!("\tGot one! Reading databases...");
|
||||
|
||||
let stat = database.stat(&rtxns[0])?;
|
||||
let number_of_entries = database.len(&rtxns[0])?;
|
||||
let pages = stat.leaf_pages + stat.branch_pages + stat.overflow_pages;
|
||||
let size_as_pages = pages * (stat.page_size as usize);
|
||||
let human_size = byte_unit::Byte::from(size_as_pages as u64)
|
||||
.get_appropriate_unit(UnitType::Binary);
|
||||
println!(
|
||||
"\tThe size of the database seen by LMDB (in terms of pages): {human_size:.2}"
|
||||
);
|
||||
|
||||
#[derive(Default)]
|
||||
struct ComputedStats {
|
||||
key_size: usize,
|
||||
value_size: usize,
|
||||
new_value_size: usize,
|
||||
delta_encoded_value_size: usize,
|
||||
average_distance_between_value_numbers: f32,
|
||||
number_of_values: u64,
|
||||
number_of_raw_cbos: usize,
|
||||
number_of_containers: usize,
|
||||
number_of_array_containers: usize,
|
||||
number_of_bitset_containers: usize,
|
||||
new_number_of_containers: usize,
|
||||
new_number_of_array_containers: usize,
|
||||
new_number_of_bitset_containers: usize,
|
||||
new_number_of_run_containers: usize,
|
||||
}
|
||||
|
||||
impl std::ops::Add for ComputedStats {
|
||||
type Output = Self;
|
||||
|
||||
fn add(self, rhs: Self) -> Self::Output {
|
||||
let ComputedStats {
|
||||
key_size,
|
||||
value_size,
|
||||
new_value_size,
|
||||
delta_encoded_value_size,
|
||||
average_distance_between_value_numbers,
|
||||
number_of_values,
|
||||
number_of_raw_cbos,
|
||||
number_of_containers,
|
||||
number_of_array_containers,
|
||||
number_of_bitset_containers,
|
||||
new_number_of_containers,
|
||||
new_number_of_array_containers,
|
||||
new_number_of_bitset_containers,
|
||||
new_number_of_run_containers,
|
||||
} = self;
|
||||
ComputedStats {
|
||||
key_size: key_size + rhs.key_size,
|
||||
value_size: value_size + rhs.value_size,
|
||||
new_value_size: new_value_size + rhs.new_value_size,
|
||||
delta_encoded_value_size: delta_encoded_value_size
|
||||
+ rhs.delta_encoded_value_size,
|
||||
average_distance_between_value_numbers:
|
||||
(average_distance_between_value_numbers
|
||||
+ rhs.average_distance_between_value_numbers)
|
||||
/ 2.0,
|
||||
number_of_values: number_of_values + rhs.number_of_values,
|
||||
number_of_raw_cbos: number_of_raw_cbos + rhs.number_of_raw_cbos,
|
||||
number_of_containers: number_of_containers + rhs.number_of_containers,
|
||||
number_of_array_containers: number_of_array_containers
|
||||
+ rhs.number_of_array_containers,
|
||||
number_of_bitset_containers: number_of_bitset_containers
|
||||
+ rhs.number_of_bitset_containers,
|
||||
new_number_of_containers: new_number_of_containers
|
||||
+ rhs.new_number_of_containers,
|
||||
new_number_of_array_containers: new_number_of_array_containers
|
||||
+ rhs.new_number_of_array_containers,
|
||||
new_number_of_bitset_containers: new_number_of_bitset_containers
|
||||
+ rhs.new_number_of_bitset_containers,
|
||||
new_number_of_run_containers: new_number_of_run_containers
|
||||
+ rhs.new_number_of_run_containers,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let style = ProgressStyle::with_template(
|
||||
"[{elapsed_precise}] [{wide_bar}] {human_pos}/{human_len} ({percent}%) ({eta})",
|
||||
)
|
||||
.unwrap();
|
||||
let pb = ProgressBar::new(number_of_entries).with_style(style);
|
||||
|
||||
let stats = rtxns
|
||||
.into_par_iter()
|
||||
.enumerate()
|
||||
.map(|(thread_id, rtxn)| {
|
||||
let mut stats = ComputedStats::default();
|
||||
let ComputedStats {
|
||||
key_size,
|
||||
value_size,
|
||||
new_value_size,
|
||||
delta_encoded_value_size,
|
||||
average_distance_between_value_numbers,
|
||||
number_of_values,
|
||||
number_of_raw_cbos,
|
||||
number_of_containers,
|
||||
number_of_array_containers,
|
||||
number_of_bitset_containers,
|
||||
new_number_of_containers,
|
||||
new_number_of_array_containers,
|
||||
new_number_of_bitset_containers,
|
||||
new_number_of_run_containers,
|
||||
} = &mut stats;
|
||||
|
||||
for (index, result) in
|
||||
database.remap_data_type::<Bytes>().iter(&rtxn)?.enumerate()
|
||||
{
|
||||
if index % thread_count != thread_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let (key, value) = result?;
|
||||
*key_size += key.len();
|
||||
*value_size += value.len();
|
||||
|
||||
let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?;
|
||||
*number_of_values += bitmap.len();
|
||||
*number_of_raw_cbos += (bitmap.len() < 7) as usize; // Cbo threshold
|
||||
let stats = bitmap.statistics();
|
||||
*average_distance_between_value_numbers = {
|
||||
let mut total_distance = 0usize;
|
||||
let mut prev = None;
|
||||
for n in &bitmap {
|
||||
if let Some(prev) = prev {
|
||||
total_distance += (n - prev) as usize;
|
||||
}
|
||||
prev = Some(n);
|
||||
}
|
||||
if bitmap.is_empty() {
|
||||
f32::INFINITY
|
||||
} else {
|
||||
total_distance as f32 / bitmap.len() as f32
|
||||
}
|
||||
};
|
||||
|
||||
*number_of_containers += stats.n_containers as usize;
|
||||
*number_of_array_containers += stats.n_array_containers as usize;
|
||||
*number_of_bitset_containers += stats.n_bitset_containers as usize;
|
||||
|
||||
let mut new_bitmap =
|
||||
new_roaring::RoaringBitmap::from_sorted_iter(bitmap).unwrap();
|
||||
let compressed = encode_bitmap_with_delta_encoding(&new_bitmap);
|
||||
*delta_encoded_value_size += compressed.len();
|
||||
let decoded_bitmap = decode_bitmap_with_delta_encoding(&compressed[..]);
|
||||
assert_eq!(decoded_bitmap, new_bitmap);
|
||||
let _has_been_optimized = new_bitmap.optimize();
|
||||
let stats = new_bitmap.statistics();
|
||||
*new_number_of_containers += stats.n_containers as usize;
|
||||
*new_number_of_array_containers += stats.n_array_containers as usize;
|
||||
*new_number_of_bitset_containers += stats.n_bitset_containers as usize;
|
||||
*new_number_of_run_containers += stats.n_run_containers as usize;
|
||||
|
||||
let mut bytes_counter = BytesCounter::new();
|
||||
new_bitmap.serialize_into(&mut bytes_counter).unwrap();
|
||||
*new_value_size += bytes_counter.bytes_written();
|
||||
|
||||
pb.inc(1);
|
||||
}
|
||||
|
||||
meilisearch_types::heed::Result::Ok(stats)
|
||||
})
|
||||
.try_reduce(ComputedStats::default, |a, b| Ok(a + b))?;
|
||||
|
||||
pb.finish();
|
||||
|
||||
let ComputedStats {
|
||||
key_size,
|
||||
value_size,
|
||||
new_value_size,
|
||||
delta_encoded_value_size,
|
||||
average_distance_between_value_numbers,
|
||||
number_of_values,
|
||||
number_of_raw_cbos,
|
||||
number_of_containers,
|
||||
number_of_array_containers,
|
||||
number_of_bitset_containers,
|
||||
new_number_of_containers,
|
||||
new_number_of_array_containers,
|
||||
new_number_of_bitset_containers,
|
||||
new_number_of_run_containers,
|
||||
} = stats;
|
||||
|
||||
let human_size = byte_unit::Byte::from(key_size + value_size)
|
||||
.get_appropriate_unit(UnitType::Binary);
|
||||
let human_key_size =
|
||||
byte_unit::Byte::from(key_size).get_appropriate_unit(UnitType::Binary);
|
||||
let human_value_size =
|
||||
byte_unit::Byte::from(value_size).get_appropriate_unit(UnitType::Binary);
|
||||
println!("\tThe raw size of the database: {human_size:.2} (keys: {human_key_size:.2}, values: {human_value_size:.2})");
|
||||
|
||||
let human_size = byte_unit::Byte::from(key_size + new_value_size)
|
||||
.get_appropriate_unit(UnitType::Binary);
|
||||
println!("\tThe raw size of the database using the new bitmaps: {human_size:.2}");
|
||||
|
||||
println!(
|
||||
"\tThe raw size of the database using the delta encoding: {:.2}",
|
||||
byte_unit::Byte::from(key_size + delta_encoded_value_size)
|
||||
.get_appropriate_unit(UnitType::Binary)
|
||||
);
|
||||
|
||||
println!("\tnumber of entries: {number_of_entries}");
|
||||
println!(
|
||||
"\taverage number of values: {:.2}",
|
||||
number_of_values as f64 / number_of_entries as f64
|
||||
);
|
||||
println!(
|
||||
"\taverage distance between value numbers: {:.2}",
|
||||
average_distance_between_value_numbers
|
||||
);
|
||||
println!(
|
||||
"\tnumber of raw cbos: {number_of_raw_cbos} ({}%)",
|
||||
number_of_raw_cbos as f64 / number_of_entries as f64 * 100.0
|
||||
);
|
||||
println!("\tnumber of containers: {number_of_containers}");
|
||||
println!(
|
||||
"\tnumber of array containers: {number_of_array_containers} ({:.2}%)",
|
||||
(number_of_array_containers as f64 / number_of_containers as f64) * 100.0
|
||||
);
|
||||
println!(
|
||||
"\tnumber of bitset containers: {number_of_bitset_containers} ({:.2}%)",
|
||||
(number_of_bitset_containers as f64 / number_of_containers as f64) * 100.0
|
||||
);
|
||||
|
||||
println!("\tnew number of containers: {new_number_of_containers}");
|
||||
println!(
|
||||
"\tnew number of array containers: {new_number_of_array_containers} ({:.2}%)",
|
||||
(new_number_of_array_containers as f64 / new_number_of_containers as f64)
|
||||
* 100.0
|
||||
);
|
||||
println!(
|
||||
"\tnew number of bitset containers: {new_number_of_bitset_containers} ({:.2}%)",
|
||||
(new_number_of_bitset_containers as f64 / new_number_of_containers as f64)
|
||||
* 100.0
|
||||
);
|
||||
println!(
|
||||
"\tnew number of run containers: {new_number_of_run_containers} ({:.2}%)",
|
||||
(new_number_of_run_containers as f64 / new_number_of_containers as f64) * 100.0
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// The magic header for our custom encoding format
|
||||
const MAGIC_HEADER: u8 = 0xF0;
|
||||
|
||||
/// Returns the delta-encoded compressed version of the given roaring bitmap.
|
||||
fn encode_bitmap_with_delta_encoding(bitmap: &new_roaring::RoaringBitmap) -> Vec<u8> {
|
||||
let mut compressed = Vec::new();
|
||||
|
||||
// Insert the magic header
|
||||
compressed.push(MAGIC_HEADER);
|
||||
|
||||
let bitpacker8x = BitPacker8x::new();
|
||||
let bitpacker4x = BitPacker4x::new();
|
||||
let bitpacker1x = BitPacker1x::new();
|
||||
|
||||
let mut decompressed = vec![0u32; BitPacker8x::BLOCK_LEN];
|
||||
let decompressed = &mut decompressed[..];
|
||||
|
||||
let mut buffer_index = 0;
|
||||
let mut initial = None;
|
||||
for n in bitmap {
|
||||
decompressed[buffer_index] = n;
|
||||
buffer_index += 1;
|
||||
if buffer_index == BitPacker8x::BLOCK_LEN {
|
||||
encode_with_packer(&bitpacker8x, decompressed, initial, &mut compressed);
|
||||
initial = Some(n);
|
||||
buffer_index = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// No more integers, let's compress them with smaller bitpackers
|
||||
let decompressed = &decompressed[..buffer_index];
|
||||
let mut chunks = decompressed.chunks_exact(BitPacker4x::BLOCK_LEN);
|
||||
for decompressed in chunks.by_ref() {
|
||||
encode_with_packer(&bitpacker4x, &decompressed, initial, &mut compressed);
|
||||
initial = decompressed.iter().last().copied();
|
||||
}
|
||||
|
||||
let decompressed = chunks.remainder();
|
||||
let mut chunks = decompressed.chunks_exact(BitPacker1x::BLOCK_LEN);
|
||||
for decompressed in chunks.by_ref() {
|
||||
encode_with_packer(&bitpacker1x, &decompressed, initial, &mut compressed);
|
||||
initial = decompressed.iter().last().copied();
|
||||
}
|
||||
|
||||
// If we have remaining integers that we were not able to compress
|
||||
// with our smallest bitpacker we put it raw at the end of out buffer
|
||||
// with a header.
|
||||
let decompressed = chunks.remainder();
|
||||
if !decompressed.is_empty() {
|
||||
let header = encode_bitpacker_level_and_num_bits(BitPackerLevel::None, u32::BITS as u8);
|
||||
let original_len = compressed.len();
|
||||
let additional = decompressed.len() * std::mem::size_of::<u32>() + 1;
|
||||
compressed.resize(original_len + additional, 0);
|
||||
let buffer = &mut compressed[original_len..];
|
||||
let (header_in_buffer, buffer) = buffer.split_first_mut().unwrap();
|
||||
*header_in_buffer = header;
|
||||
buffer.copy_from_slice(bytemuck::cast_slice(decompressed));
|
||||
}
|
||||
|
||||
compressed
|
||||
}
|
||||
|
||||
fn encode_with_packer<B: BitPackerExt>(
|
||||
bitpacker: &B,
|
||||
decompressed: &[u32],
|
||||
initial: Option<u32>,
|
||||
output: &mut Vec<u8>,
|
||||
) {
|
||||
let num_bits = bitpacker.num_bits_strictly_sorted(initial, decompressed);
|
||||
let compressed_len = B::compressed_block_size(num_bits);
|
||||
let chunk_header = encode_bitpacker_level_and_num_bits(B::level(), num_bits);
|
||||
let original_len = output.len();
|
||||
output.resize(original_len + compressed_len + 1, 0);
|
||||
let buffer = &mut output[original_len..];
|
||||
let (header_in_buffer, buffer) = buffer.split_first_mut().unwrap();
|
||||
*header_in_buffer = chunk_header;
|
||||
bitpacker.compress_strictly_sorted(initial, decompressed, buffer, num_bits);
|
||||
}
|
||||
|
||||
// TODO do not panic and return error messages
|
||||
fn decode_bitmap_with_delta_encoding(compressed: &[u8]) -> new_roaring::RoaringBitmap {
|
||||
let (&header, compressed) = compressed.split_first().expect("compressed must not be empty");
|
||||
|
||||
assert_eq!(
|
||||
header, MAGIC_HEADER,
|
||||
"Invalid header. Found 0x{:x}, expecting 0x{:x}",
|
||||
header, MAGIC_HEADER
|
||||
);
|
||||
|
||||
let bitpacker8x = BitPacker8x::new();
|
||||
let bitpacker4x = BitPacker4x::new();
|
||||
let bitpacker1x = BitPacker1x::new();
|
||||
|
||||
let mut output = new_roaring::RoaringBitmap::new();
|
||||
let mut decompressed = vec![0u32; BitPacker8x::BLOCK_LEN];
|
||||
let decompressed = &mut decompressed[..];
|
||||
let mut compressed = compressed;
|
||||
let mut initial = None;
|
||||
|
||||
loop {
|
||||
let Some((&chunk_header, encoded)) = compressed.split_first() else { break };
|
||||
let (level, num_bits) = decode_bitpacker_level_and_num_bits(chunk_header);
|
||||
|
||||
let (bytes_read, decompressed) = match level {
|
||||
BitPackerLevel::None => {
|
||||
assert_eq!(num_bits, u32::BITS as u8);
|
||||
// TODO we may prefer returning an iterator of u32 instead to avoid this copy.
|
||||
// However, that may not be the most important part to optimize.
|
||||
let decompressed_bytes = bytemuck::cast_slice_mut(decompressed);
|
||||
assert!(encoded.len().is_multiple_of(std::mem::size_of::<u32>()));
|
||||
let decompressed_bytes = &mut decompressed_bytes[..encoded.len()];
|
||||
decompressed_bytes.copy_from_slice(encoded);
|
||||
// FIXME: Remove this ugly cast
|
||||
(encoded.len(), bytemuck::cast_slice::<_, u32>(decompressed_bytes))
|
||||
}
|
||||
BitPackerLevel::BitPacker1x => {
|
||||
decode_with_packer(&bitpacker1x, decompressed, initial, encoded, num_bits)
|
||||
}
|
||||
BitPackerLevel::BitPacker4x => {
|
||||
decode_with_packer(&bitpacker4x, decompressed, initial, encoded, num_bits)
|
||||
}
|
||||
BitPackerLevel::BitPacker8x => {
|
||||
decode_with_packer(&bitpacker8x, decompressed, initial, encoded, num_bits)
|
||||
}
|
||||
};
|
||||
|
||||
initial = decompressed.iter().last().copied();
|
||||
// TODO investigate perf
|
||||
output.append(decompressed.iter().copied()).unwrap();
|
||||
// What the delta-decoding read plus the chunk header size
|
||||
compressed = &compressed[bytes_read + 1..];
|
||||
}
|
||||
|
||||
output
|
||||
}
|
||||
|
||||
/// Returns the number of bytes read and the decoded unsigned integers.
|
||||
fn decode_with_packer<'d, B: BitPacker>(
|
||||
bitpacker: &B,
|
||||
decompressed: &'d mut [u32],
|
||||
initial: Option<u32>,
|
||||
compressed: &[u8],
|
||||
num_bits: u8,
|
||||
) -> (usize, &'d [u32]) {
|
||||
let decompressed = &mut decompressed[..B::BLOCK_LEN];
|
||||
let read = bitpacker.decompress_strictly_sorted(initial, compressed, decompressed, num_bits);
|
||||
(read, decompressed)
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
#[repr(u8)]
|
||||
enum BitPackerLevel {
|
||||
/// The remaining bytes are raw little endian encoded u32s.
|
||||
None,
|
||||
/// The remaining bits are encoded using a `BitPacker1x`.
|
||||
BitPacker1x,
|
||||
/// The remaining bits are encoded using a `BitPacker4x`.
|
||||
BitPacker4x,
|
||||
/// The remaining bits are encoded using a `BitPacker8x`.
|
||||
BitPacker8x,
|
||||
}
|
||||
|
||||
// TODO: never panic in this function and rather return a result
|
||||
fn encode_bitpacker_level_and_num_bits(level: BitPackerLevel, num_bits: u8) -> u8 {
|
||||
assert!(num_bits as u32 <= 2_u32.pow(6));
|
||||
let level = level as u8;
|
||||
assert!(level <= 3);
|
||||
num_bits | (level << 6)
|
||||
}
|
||||
|
||||
// TODO: never panic in this function and rather return a result
|
||||
fn decode_bitpacker_level_and_num_bits(data: u8) -> (BitPackerLevel, u8) {
|
||||
let num_bits = data & 0b00111111;
|
||||
let level = match data >> 6 {
|
||||
0 => BitPackerLevel::None,
|
||||
1 => BitPackerLevel::BitPacker1x,
|
||||
2 => BitPackerLevel::BitPacker4x,
|
||||
3 => BitPackerLevel::BitPacker8x,
|
||||
invalid => panic!("Invalid bitpacker level: {invalid}"),
|
||||
};
|
||||
assert!(num_bits as u32 <= 2_u32.pow(6));
|
||||
(level, num_bits)
|
||||
}
|
||||
|
||||
trait BitPackerExt: BitPacker {
|
||||
fn level() -> BitPackerLevel;
|
||||
}
|
||||
|
||||
impl BitPackerExt for BitPacker8x {
|
||||
fn level() -> BitPackerLevel {
|
||||
BitPackerLevel::BitPacker8x
|
||||
}
|
||||
}
|
||||
|
||||
impl BitPackerExt for BitPacker4x {
|
||||
fn level() -> BitPackerLevel {
|
||||
BitPackerLevel::BitPacker4x
|
||||
}
|
||||
}
|
||||
|
||||
impl BitPackerExt for BitPacker1x {
|
||||
fn level() -> BitPackerLevel {
|
||||
BitPackerLevel::BitPacker1x
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use new_roaring::RoaringBitmap;
|
||||
use quickcheck::quickcheck;
|
||||
|
||||
use super::{
|
||||
decode_bitmap_with_delta_encoding, decode_bitpacker_level_and_num_bits,
|
||||
encode_bitmap_with_delta_encoding, encode_bitpacker_level_and_num_bits, BitPackerLevel,
|
||||
};
|
||||
|
||||
quickcheck! {
|
||||
// fn qc_bitpacker_level_and_num_bits(bp_level: BitPackerLevel, numbits: u8) -> bool {
|
||||
// let left = encode_bitpacker_level_and_num_bits(crate::BitPackerLevel::None, num_bits);
|
||||
// let (out_level, out_num_bits) = decode_bitpacker_level_and_num_bits(left);
|
||||
// out_level ==level && out_num_bits == num_bits
|
||||
// }
|
||||
|
||||
fn qc_random(xs: Vec<u32>) -> bool {
|
||||
let bitmap = RoaringBitmap::from_iter(xs);
|
||||
let compressed = encode_bitmap_with_delta_encoding(&bitmap);
|
||||
let decompressed = decode_bitmap_with_delta_encoding(&compressed);
|
||||
decompressed == bitmap
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user