Compare commits

...

25 Commits

Author SHA1 Message Date
Kerollmops
da3f08a479 WIP: Clean up tests 2025-11-10 10:16:49 +01:00
Kerollmops
5b19df1dba Fix compressed block size bug 2025-11-09 12:52:49 +01:00
Kerollmops
2ca596003a Fix a decoding bug with flat u32s 2025-11-09 12:29:45 +01:00
Kerollmops
9b31c09dde Fix assert 2025-11-09 12:07:08 +01:00
Kerollmops
74a587785a Finalize a first version 2025-11-08 17:32:38 +01:00
Clément Renault
d612ea2a90 Fix the initial value when delta-encoding 2025-11-01 12:20:25 +01:00
Kerollmops
63a7fe5586 Fix a delta encoding bug 2025-10-31 14:16:22 +01:00
Kerollmops
53120eb2a4 Add an info about num bits for delta encoding 2025-10-31 13:58:02 +01:00
Kerollmops
19e512622e Average distance between bitmap values 2025-10-31 13:08:23 +01:00
Kerollmops
86e5f74fce Fix some display issues 2025-10-31 12:00:54 +01:00
Kerollmops
a73f635013 Fix dumb issue 2025-10-31 11:50:42 +01:00
Kerollmops
10aac4d77f Add a progress bar to the meilitool bitmap measurements 2025-10-31 11:45:19 +01:00
Kerollmops
aa2f649713 Use multiple threads to compute bitmap stats 2025-10-31 11:38:54 +01:00
Clément Renault
a1f266dc03 Add average number of values metric to roaring bitmap analysis 2025-10-31 09:39:09 +01:00
Clément Renault
566bb51eda Indicated the number of CBOs 2025-10-30 22:08:27 +01:00
Clément Renault
c37396714d Add more info 2025-10-30 11:59:07 +01:00
Clément Renault
c5473dc2b5 More logs 2025-10-29 18:09:48 +01:00
Clément Renault
3cdc7f2de4 Evaluate delta encoding 2025-10-29 18:07:07 +01:00
Clément Renault
343bae478a Display more info about the new bitmap infos 2025-10-29 10:04:58 +01:00
Clément Renault
8b41f1a69d Show the percentage in percent 2025-10-29 09:58:22 +01:00
Clément Renault
59a2f8d0ab Adding new commands to meilitool 2025-10-28 18:44:45 +01:00
Clément Renault
508be2137e Compute the ratio of bitset containers 2025-10-28 15:32:32 +01:00
Clément Renault
50bf485dc0 Reduce the number of displayed decimals 2025-10-28 15:17:13 +01:00
Clément Renault
6e4855bbc5 Adjuste bytes to ease reading 2025-10-28 15:14:00 +01:00
Clément Renault
ac5da77746 Add a meilitool command to compute the gain to use new roaring bitmaps 2025-10-28 11:58:31 +01:00
5 changed files with 744 additions and 9 deletions

72
Cargo.lock generated
View File

@@ -871,18 +871,18 @@ checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
[[package]]
name = "bytemuck"
version = "1.23.2"
version = "1.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3995eaeebcdf32f91f980d360f78732ddc061097ab4e39991ae7a6ace9194677"
checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4"
dependencies = [
"bytemuck_derive",
]
[[package]]
name = "bytemuck_derive"
version = "1.10.1"
version = "1.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f154e572231cb6ba2bd1176980827e3d5dc04cc183a75dea38109fbdd672d29"
checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff"
dependencies = [
"proc-macro2",
"quote",
@@ -1278,6 +1278,18 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "console"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b430743a6eb14e9764d4260d4c0d8123087d504eeb9c48f2b2a5e810dd369df4"
dependencies = [
"encode_unicode",
"libc",
"once_cell",
"windows-sys 0.61.0",
]
[[package]]
name = "const-random"
version = "0.1.18"
@@ -2009,6 +2021,16 @@ dependencies = [
"syn 2.0.106",
]
[[package]]
name = "env_logger"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
dependencies = [
"log",
"regex",
]
[[package]]
name = "equivalent"
version = "1.0.2"
@@ -2892,7 +2914,7 @@ source = "git+https://github.com/dureuill/hf-hub.git?branch=rust_tls#88d4f11cb9f
dependencies = [
"dirs",
"http 1.3.1",
"indicatif",
"indicatif 0.17.11",
"log",
"rand 0.8.5",
"serde",
@@ -3289,13 +3311,25 @@ version = "0.17.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
dependencies = [
"console",
"console 0.15.11",
"number_prefix",
"portable-atomic",
"unicode-width",
"web-time",
]
[[package]]
name = "indicatif"
version = "0.18.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade6dfcba0dfb62ad59e59e7241ec8912af34fd29e0e743e3db992bd278e8b65"
dependencies = [
"console 0.16.1",
"portable-atomic",
"unit-prefix",
"web-time",
]
[[package]]
name = "inout"
version = "0.1.4"
@@ -3311,7 +3345,7 @@ version = "1.39.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "810ae6042d48e2c9e9215043563a58a80b877bc863228a74cf10c49d4620a6f5"
dependencies = [
"console",
"console 0.15.11",
"lazy_static",
"linked-hash-map",
"pest",
@@ -4161,12 +4195,19 @@ name = "meilitool"
version = "1.24.0"
dependencies = [
"anyhow",
"bitpacking",
"byte-unit",
"bytemuck",
"clap",
"dump",
"file-store",
"indexmap",
"indicatif 0.18.2",
"meilisearch-auth",
"meilisearch-types",
"quickcheck",
"rayon",
"roaring 0.11.2",
"serde",
"serde_json",
"tempfile",
@@ -5139,6 +5180,17 @@ dependencies = [
"version_check",
]
[[package]]
name = "quickcheck"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6"
dependencies = [
"env_logger",
"log",
"rand 0.8.5",
]
[[package]]
name = "quinn"
version = "0.11.9"
@@ -7005,6 +7057,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "unit-prefix"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "323402cff2dd658f39ca17c789b502021b3f18707c91cdf22e3838e1b4023817"
[[package]]
name = "unsafe-libyaml"
version = "0.2.11"

View File

@@ -10,14 +10,23 @@ license.workspace = true
[dependencies]
anyhow = "1.0.98"
bitpacking = "0.9.2"
byte-unit = "5.1.6"
bytemuck = "1.24.0"
clap = { version = "4.5.40", features = ["derive"] }
dump = { path = "../dump" }
file-store = { path = "../file-store" }
indexmap = { version = "2.9.0", features = ["serde"] }
indicatif = { version = "0.18.2", default-features = false }
meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
new-roaring = { package = "roaring", version = "0.11.2" }
rayon = "1.11.0"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.140", features = ["preserve_order"] }
tempfile = "3.20.0"
time = { version = "0.3.41", features = ["formatting", "parsing", "alloc"] }
uuid = { version = "1.17.0", features = ["v4"], default-features = false }
[dev-dependencies]
quickcheck = "1.0.3"

9
crates/meilitool/quickcheck.sh Executable file
View File

@@ -0,0 +1,9 @@
#!/bin/bash
while true
do
cargo test qc_
if [[ x$? != x0 ]] ; then
exit $?
fi
done

View File

@@ -0,0 +1,27 @@
use std::io;
#[derive(Debug, Default, Clone, Copy)]
pub struct BytesCounter {
bytes_written: usize,
}
impl BytesCounter {
pub fn new() -> Self {
BytesCounter { bytes_written: 0 }
}
pub fn bytes_written(&self) -> usize {
self.bytes_written
}
}
impl io::Write for BytesCounter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.bytes_written += buf.len();
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

View File

@@ -1,14 +1,18 @@
#![allow(clippy::result_large_err)]
use std::fmt;
use std::fs::{read_dir, read_to_string, remove_file, File};
use std::io::{BufWriter, Write as _};
use std::path::PathBuf;
use std::time::Instant;
use anyhow::{bail, Context};
use bitpacking::{BitPacker, BitPacker1x, BitPacker4x, BitPacker8x};
use byte_unit::UnitType;
use clap::{Parser, Subcommand, ValueEnum};
use dump::{DumpWriter, IndexMetadata};
use file_store::FileStore;
use indicatif::{ProgressBar, ProgressStyle};
use meilisearch_auth::{open_auth_store_env, AuthController};
use meilisearch_types::batches::Batch;
use meilisearch_types::heed::types::{Bytes, SerdeJson, Str};
@@ -19,16 +23,20 @@ use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::index::EmbeddingsWithMetadata;
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use meilisearch_types::milli::{obkv_to_json, BEU32};
use meilisearch_types::milli::{obkv_to_json, CboRoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::versioning::{get_version, parse_version};
use meilisearch_types::Index;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use serde_json::Value::Object;
use time::macros::format_description;
use time::OffsetDateTime;
use upgrade::OfflineUpgrade;
use uuid_codec::UuidCodec;
use crate::bytes_counter::BytesCounter;
mod bytes_counter;
mod upgrade;
mod uuid_codec;
@@ -126,7 +134,9 @@ enum Command {
/// before running the copy and compaction. This way the current indexation must finish before
/// the compaction operation can start. Once the compaction is done, the big index is replaced
/// by the compacted one and the mutable transaction is released.
IndexCompaction { index_name: String },
IndexCompaction {
index_name: String,
},
/// Uses the hair dryer the dedicate pages hot in cache
///
@@ -140,6 +150,25 @@ enum Command {
#[arg(long, value_delimiter = ',')]
index_part: Vec<IndexPart>,
},
/// Measures the size of the indexes with the previous roaring version
/// and the brand new one with support for run-length encoding.
///
/// You can change the number of threads used for the measurement by
/// using the RAYON_NUM_THREADS environment variable.
MeasureNewRoaringDiskUsage {
#[arg(long, value_delimiter = ',')]
index_name: Vec<String>,
#[arg(long, value_delimiter = ',')]
index_part: Vec<RoaringBasedIndexPart>,
},
CompactTaskQueue,
ResizeTaskQueue {
new_size: byte_unit::Byte,
},
}
#[derive(Clone, ValueEnum)]
@@ -148,6 +177,32 @@ enum IndexPart {
Hannoy,
}
#[derive(Clone, ValueEnum)]
enum RoaringBasedIndexPart {
WordDocids,
WordPrefixDocids,
}
impl RoaringBasedIndexPart {
pub fn database(&self, index: &Index) -> Database<Bytes, CboRoaringBitmapCodec> {
use RoaringBasedIndexPart::*;
match self {
WordDocids => index.word_docids.remap_key_type(),
WordPrefixDocids => index.word_prefix_docids.remap_key_type(),
}
}
}
impl fmt::Display for RoaringBasedIndexPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use RoaringBasedIndexPart::*;
match self {
WordDocids => write!(f, "word-docids"),
WordPrefixDocids => write!(f, "word-prefix-docids"),
}
}
}
fn main() -> anyhow::Result<()> {
let Cli { db_path, command } = Cli::parse();
@@ -169,9 +224,66 @@ fn main() -> anyhow::Result<()> {
Command::HairDryer { index_name, index_part } => {
hair_dryer(db_path, &index_name, &index_part)
}
Command::MeasureNewRoaringDiskUsage { index_name, index_part } => {
measure_new_roaring_disk_usage(db_path, &index_name, &index_part)
}
Command::CompactTaskQueue => compact_task_queue(db_path),
Command::ResizeTaskQueue { new_size } => resize_task_queue(db_path, new_size),
}
}
fn resize_task_queue(db_path: PathBuf, new_size: byte_unit::Byte) -> anyhow::Result<()> {
let new_size = new_size.as_u64();
let path = db_path.join("tasks");
let env = unsafe {
EnvOpenOptions::new()
.read_txn_without_tls()
.max_dbs(100)
.map_size(new_size as usize)
.open(&path)
}
.with_context(|| format!("While trying to open {:?}", path.display()))?;
eprintln!("Acquiring a write transaction for tasks queue...");
let wtxn = env.write_txn()?;
wtxn.commit()?;
Ok(())
}
fn compact_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
let path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&path) }
.with_context(|| format!("While trying to open {:?}", path.display()))?;
eprintln!("Acquiring a write transaction for tasks queue...");
let wtxn = env.write_txn()?;
/// The name of the copy of the data.mdb file used during compaction.
const DATA_MDB_COPY_NAME: &str = "data.mdb.cpy";
let src_path = env.path().join("data.mdb");
let dst_path = tempfile::TempPath::from_path(path.join(DATA_MDB_COPY_NAME));
let file = File::create(&dst_path)?;
let mut file = tempfile::NamedTempFile::from_parts(file, dst_path);
env.copy_to_file(file.as_file_mut(), meilisearch_types::heed::CompactionOption::Enabled)?;
match file.persist(src_path) {
Ok(file) => file.sync_all()?,
// TODO see if we have a _resource busy_ error and probably handle this by:
// 1. closing the index, 2. replacing and 3. reopening it
Err(tempfile::PersistError { error, file: _ }) => return Err(error.into()),
};
eprintln!("Task persisted, aborting wtxn...");
wtxn.abort();
Ok(())
}
/// Clears the task queue located at `db_path`.
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
let path = db_path.join("tasks");
@@ -694,3 +806,523 @@ fn hair_dryer(
Ok(())
}
fn measure_new_roaring_disk_usage(
db_path: PathBuf,
index_names: &[String],
index_parts: &[RoaringBasedIndexPart],
) -> Result<(), anyhow::Error> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe {
EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path)
}
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
eprintln!("Trying to get a read transaction on the index scheduler...");
let scheduler_rtxn = env.read_txn()?;
let index_mapping: Database<Str, UuidCodec> =
try_opening_database(&env, &scheduler_rtxn, "index-mapping")?;
eprintln!("Got one! Reading indexes...");
let thread_count = rayon::current_num_threads();
assert!(thread_count > 0);
for result in index_mapping.iter(&scheduler_rtxn)? {
let (uid, uuid) = result?;
if index_names.iter().any(|i| i == uid) {
let index_path = db_path.join("indexes").join(uuid.to_string());
let index =
Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
.with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
for index_part in index_parts {
let database = index_part.database(&index);
println!("{uid} -> {index_part}");
eprintln!("\tTrying to get {thread_count} read transactions on {uid}...");
let rtxns = std::iter::repeat_with(|| index.read_txn())
.take(thread_count)
.collect::<meilisearch_types::heed::Result<Vec<_>>>()?;
eprintln!("\tGot one! Reading databases...");
let stat = database.stat(&rtxns[0])?;
let number_of_entries = database.len(&rtxns[0])?;
let pages = stat.leaf_pages + stat.branch_pages + stat.overflow_pages;
let size_as_pages = pages * (stat.page_size as usize);
let human_size = byte_unit::Byte::from(size_as_pages as u64)
.get_appropriate_unit(UnitType::Binary);
println!(
"\tThe size of the database seen by LMDB (in terms of pages): {human_size:.2}"
);
#[derive(Default)]
struct ComputedStats {
key_size: usize,
value_size: usize,
new_value_size: usize,
delta_encoded_value_size: usize,
average_distance_between_value_numbers: f32,
number_of_values: u64,
number_of_raw_cbos: usize,
number_of_containers: usize,
number_of_array_containers: usize,
number_of_bitset_containers: usize,
new_number_of_containers: usize,
new_number_of_array_containers: usize,
new_number_of_bitset_containers: usize,
new_number_of_run_containers: usize,
}
impl std::ops::Add for ComputedStats {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
let ComputedStats {
key_size,
value_size,
new_value_size,
delta_encoded_value_size,
average_distance_between_value_numbers,
number_of_values,
number_of_raw_cbos,
number_of_containers,
number_of_array_containers,
number_of_bitset_containers,
new_number_of_containers,
new_number_of_array_containers,
new_number_of_bitset_containers,
new_number_of_run_containers,
} = self;
ComputedStats {
key_size: key_size + rhs.key_size,
value_size: value_size + rhs.value_size,
new_value_size: new_value_size + rhs.new_value_size,
delta_encoded_value_size: delta_encoded_value_size
+ rhs.delta_encoded_value_size,
average_distance_between_value_numbers:
(average_distance_between_value_numbers
+ rhs.average_distance_between_value_numbers)
/ 2.0,
number_of_values: number_of_values + rhs.number_of_values,
number_of_raw_cbos: number_of_raw_cbos + rhs.number_of_raw_cbos,
number_of_containers: number_of_containers + rhs.number_of_containers,
number_of_array_containers: number_of_array_containers
+ rhs.number_of_array_containers,
number_of_bitset_containers: number_of_bitset_containers
+ rhs.number_of_bitset_containers,
new_number_of_containers: new_number_of_containers
+ rhs.new_number_of_containers,
new_number_of_array_containers: new_number_of_array_containers
+ rhs.new_number_of_array_containers,
new_number_of_bitset_containers: new_number_of_bitset_containers
+ rhs.new_number_of_bitset_containers,
new_number_of_run_containers: new_number_of_run_containers
+ rhs.new_number_of_run_containers,
}
}
}
let style = ProgressStyle::with_template(
"[{elapsed_precise}] [{wide_bar}] {human_pos}/{human_len} ({percent}%) ({eta})",
)
.unwrap();
let pb = ProgressBar::new(number_of_entries).with_style(style);
let stats = rtxns
.into_par_iter()
.enumerate()
.map(|(thread_id, rtxn)| {
let mut stats = ComputedStats::default();
let ComputedStats {
key_size,
value_size,
new_value_size,
delta_encoded_value_size,
average_distance_between_value_numbers,
number_of_values,
number_of_raw_cbos,
number_of_containers,
number_of_array_containers,
number_of_bitset_containers,
new_number_of_containers,
new_number_of_array_containers,
new_number_of_bitset_containers,
new_number_of_run_containers,
} = &mut stats;
for (index, result) in
database.remap_data_type::<Bytes>().iter(&rtxn)?.enumerate()
{
if index % thread_count != thread_id {
continue;
}
let (key, value) = result?;
*key_size += key.len();
*value_size += value.len();
let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?;
*number_of_values += bitmap.len();
*number_of_raw_cbos += (bitmap.len() < 7) as usize; // Cbo threshold
let stats = bitmap.statistics();
*average_distance_between_value_numbers = {
let mut total_distance = 0usize;
let mut prev = None;
for n in &bitmap {
if let Some(prev) = prev {
total_distance += (n - prev) as usize;
}
prev = Some(n);
}
if bitmap.is_empty() {
f32::INFINITY
} else {
total_distance as f32 / bitmap.len() as f32
}
};
*number_of_containers += stats.n_containers as usize;
*number_of_array_containers += stats.n_array_containers as usize;
*number_of_bitset_containers += stats.n_bitset_containers as usize;
let mut new_bitmap =
new_roaring::RoaringBitmap::from_sorted_iter(bitmap).unwrap();
let compressed = encode_bitmap_with_delta_encoding(&new_bitmap);
*delta_encoded_value_size += compressed.len();
let decoded_bitmap = decode_bitmap_with_delta_encoding(&compressed[..]);
assert_eq!(decoded_bitmap, new_bitmap);
let _has_been_optimized = new_bitmap.optimize();
let stats = new_bitmap.statistics();
*new_number_of_containers += stats.n_containers as usize;
*new_number_of_array_containers += stats.n_array_containers as usize;
*new_number_of_bitset_containers += stats.n_bitset_containers as usize;
*new_number_of_run_containers += stats.n_run_containers as usize;
let mut bytes_counter = BytesCounter::new();
new_bitmap.serialize_into(&mut bytes_counter).unwrap();
*new_value_size += bytes_counter.bytes_written();
pb.inc(1);
}
meilisearch_types::heed::Result::Ok(stats)
})
.try_reduce(ComputedStats::default, |a, b| Ok(a + b))?;
pb.finish();
let ComputedStats {
key_size,
value_size,
new_value_size,
delta_encoded_value_size,
average_distance_between_value_numbers,
number_of_values,
number_of_raw_cbos,
number_of_containers,
number_of_array_containers,
number_of_bitset_containers,
new_number_of_containers,
new_number_of_array_containers,
new_number_of_bitset_containers,
new_number_of_run_containers,
} = stats;
let human_size = byte_unit::Byte::from(key_size + value_size)
.get_appropriate_unit(UnitType::Binary);
let human_key_size =
byte_unit::Byte::from(key_size).get_appropriate_unit(UnitType::Binary);
let human_value_size =
byte_unit::Byte::from(value_size).get_appropriate_unit(UnitType::Binary);
println!("\tThe raw size of the database: {human_size:.2} (keys: {human_key_size:.2}, values: {human_value_size:.2})");
let human_size = byte_unit::Byte::from(key_size + new_value_size)
.get_appropriate_unit(UnitType::Binary);
println!("\tThe raw size of the database using the new bitmaps: {human_size:.2}");
println!(
"\tThe raw size of the database using the delta encoding: {:.2}",
byte_unit::Byte::from(key_size + delta_encoded_value_size)
.get_appropriate_unit(UnitType::Binary)
);
println!("\tnumber of entries: {number_of_entries}");
println!(
"\taverage number of values: {:.2}",
number_of_values as f64 / number_of_entries as f64
);
println!(
"\taverage distance between value numbers: {:.2}",
average_distance_between_value_numbers
);
println!(
"\tnumber of raw cbos: {number_of_raw_cbos} ({}%)",
number_of_raw_cbos as f64 / number_of_entries as f64 * 100.0
);
println!("\tnumber of containers: {number_of_containers}");
println!(
"\tnumber of array containers: {number_of_array_containers} ({:.2}%)",
(number_of_array_containers as f64 / number_of_containers as f64) * 100.0
);
println!(
"\tnumber of bitset containers: {number_of_bitset_containers} ({:.2}%)",
(number_of_bitset_containers as f64 / number_of_containers as f64) * 100.0
);
println!("\tnew number of containers: {new_number_of_containers}");
println!(
"\tnew number of array containers: {new_number_of_array_containers} ({:.2}%)",
(new_number_of_array_containers as f64 / new_number_of_containers as f64)
* 100.0
);
println!(
"\tnew number of bitset containers: {new_number_of_bitset_containers} ({:.2}%)",
(new_number_of_bitset_containers as f64 / new_number_of_containers as f64)
* 100.0
);
println!(
"\tnew number of run containers: {new_number_of_run_containers} ({:.2}%)",
(new_number_of_run_containers as f64 / new_number_of_containers as f64) * 100.0
);
}
}
}
Ok(())
}
/// The magic header for our custom encoding format
const MAGIC_HEADER: u8 = 0xF0;
/// Returns the delta-encoded compressed version of the given roaring bitmap.
fn encode_bitmap_with_delta_encoding(bitmap: &new_roaring::RoaringBitmap) -> Vec<u8> {
let mut compressed = Vec::new();
// Insert the magic header
compressed.push(MAGIC_HEADER);
let bitpacker8x = BitPacker8x::new();
let bitpacker4x = BitPacker4x::new();
let bitpacker1x = BitPacker1x::new();
let mut decompressed = vec![0u32; BitPacker8x::BLOCK_LEN];
let decompressed = &mut decompressed[..];
let mut buffer_index = 0;
let mut initial = None;
for n in bitmap {
decompressed[buffer_index] = n;
buffer_index += 1;
if buffer_index == BitPacker8x::BLOCK_LEN {
encode_with_packer(&bitpacker8x, decompressed, initial, &mut compressed);
initial = Some(n);
buffer_index = 0;
}
}
// No more integers, let's compress them with smaller bitpackers
let decompressed = &decompressed[..buffer_index];
let mut chunks = decompressed.chunks_exact(BitPacker4x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
encode_with_packer(&bitpacker4x, &decompressed, initial, &mut compressed);
initial = decompressed.iter().last().copied();
}
let decompressed = chunks.remainder();
let mut chunks = decompressed.chunks_exact(BitPacker1x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
encode_with_packer(&bitpacker1x, &decompressed, initial, &mut compressed);
initial = decompressed.iter().last().copied();
}
// If we have remaining integers that we were not able to compress
// with our smallest bitpacker we put it raw at the end of out buffer
// with a header.
let decompressed = chunks.remainder();
if !decompressed.is_empty() {
let header = encode_bitpacker_level_and_num_bits(BitPackerLevel::None, u32::BITS as u8);
let original_len = compressed.len();
let additional = decompressed.len() * std::mem::size_of::<u32>() + 1;
compressed.resize(original_len + additional, 0);
let buffer = &mut compressed[original_len..];
let (header_in_buffer, buffer) = buffer.split_first_mut().unwrap();
*header_in_buffer = header;
buffer.copy_from_slice(bytemuck::cast_slice(decompressed));
}
compressed
}
fn encode_with_packer<B: BitPackerExt>(
bitpacker: &B,
decompressed: &[u32],
initial: Option<u32>,
output: &mut Vec<u8>,
) {
let num_bits = bitpacker.num_bits_strictly_sorted(initial, decompressed);
let compressed_len = B::compressed_block_size(num_bits);
let chunk_header = encode_bitpacker_level_and_num_bits(B::level(), num_bits);
let original_len = output.len();
output.resize(original_len + compressed_len + 1, 0);
let buffer = &mut output[original_len..];
let (header_in_buffer, buffer) = buffer.split_first_mut().unwrap();
*header_in_buffer = chunk_header;
bitpacker.compress_strictly_sorted(initial, decompressed, buffer, num_bits);
}
// TODO do not panic and return error messages
fn decode_bitmap_with_delta_encoding(compressed: &[u8]) -> new_roaring::RoaringBitmap {
let (&header, compressed) = compressed.split_first().expect("compressed must not be empty");
assert_eq!(
header, MAGIC_HEADER,
"Invalid header. Found 0x{:x}, expecting 0x{:x}",
header, MAGIC_HEADER
);
let bitpacker8x = BitPacker8x::new();
let bitpacker4x = BitPacker4x::new();
let bitpacker1x = BitPacker1x::new();
let mut output = new_roaring::RoaringBitmap::new();
let mut decompressed = vec![0u32; BitPacker8x::BLOCK_LEN];
let decompressed = &mut decompressed[..];
let mut compressed = compressed;
let mut initial = None;
loop {
let Some((&chunk_header, encoded)) = compressed.split_first() else { break };
let (level, num_bits) = decode_bitpacker_level_and_num_bits(chunk_header);
let (bytes_read, decompressed) = match level {
BitPackerLevel::None => {
assert_eq!(num_bits, u32::BITS as u8);
// TODO we may prefer returning an iterator of u32 instead to avoid this copy.
// However, that may not be the most important part to optimize.
let decompressed_bytes = bytemuck::cast_slice_mut(decompressed);
assert!(encoded.len().is_multiple_of(std::mem::size_of::<u32>()));
let decompressed_bytes = &mut decompressed_bytes[..encoded.len()];
decompressed_bytes.copy_from_slice(encoded);
// FIXME: Remove this ugly cast
(encoded.len(), bytemuck::cast_slice::<_, u32>(decompressed_bytes))
}
BitPackerLevel::BitPacker1x => {
decode_with_packer(&bitpacker1x, decompressed, initial, encoded, num_bits)
}
BitPackerLevel::BitPacker4x => {
decode_with_packer(&bitpacker4x, decompressed, initial, encoded, num_bits)
}
BitPackerLevel::BitPacker8x => {
decode_with_packer(&bitpacker8x, decompressed, initial, encoded, num_bits)
}
};
initial = decompressed.iter().last().copied();
// TODO investigate perf
output.append(decompressed.iter().copied()).unwrap();
// What the delta-decoding read plus the chunk header size
compressed = &compressed[bytes_read + 1..];
}
output
}
/// Returns the number of bytes read and the decoded unsigned integers.
fn decode_with_packer<'d, B: BitPacker>(
bitpacker: &B,
decompressed: &'d mut [u32],
initial: Option<u32>,
compressed: &[u8],
num_bits: u8,
) -> (usize, &'d [u32]) {
let decompressed = &mut decompressed[..B::BLOCK_LEN];
let read = bitpacker.decompress_strictly_sorted(initial, compressed, decompressed, num_bits);
(read, decompressed)
}
#[derive(Debug, PartialEq, Eq)]
#[repr(u8)]
enum BitPackerLevel {
/// The remaining bytes are raw little endian encoded u32s.
None,
/// The remaining bits are encoded using a `BitPacker1x`.
BitPacker1x,
/// The remaining bits are encoded using a `BitPacker4x`.
BitPacker4x,
/// The remaining bits are encoded using a `BitPacker8x`.
BitPacker8x,
}
// TODO: never panic in this function and rather return a result
fn encode_bitpacker_level_and_num_bits(level: BitPackerLevel, num_bits: u8) -> u8 {
assert!(num_bits as u32 <= 2_u32.pow(6));
let level = level as u8;
assert!(level <= 3);
num_bits | (level << 6)
}
// TODO: never panic in this function and rather return a result
fn decode_bitpacker_level_and_num_bits(data: u8) -> (BitPackerLevel, u8) {
let num_bits = data & 0b00111111;
let level = match data >> 6 {
0 => BitPackerLevel::None,
1 => BitPackerLevel::BitPacker1x,
2 => BitPackerLevel::BitPacker4x,
3 => BitPackerLevel::BitPacker8x,
invalid => panic!("Invalid bitpacker level: {invalid}"),
};
assert!(num_bits as u32 <= 2_u32.pow(6));
(level, num_bits)
}
trait BitPackerExt: BitPacker {
fn level() -> BitPackerLevel;
}
impl BitPackerExt for BitPacker8x {
fn level() -> BitPackerLevel {
BitPackerLevel::BitPacker8x
}
}
impl BitPackerExt for BitPacker4x {
fn level() -> BitPackerLevel {
BitPackerLevel::BitPacker4x
}
}
impl BitPackerExt for BitPacker1x {
fn level() -> BitPackerLevel {
BitPackerLevel::BitPacker1x
}
}
#[cfg(test)]
mod tests {
use new_roaring::RoaringBitmap;
use quickcheck::quickcheck;
use super::{
decode_bitmap_with_delta_encoding, decode_bitpacker_level_and_num_bits,
encode_bitmap_with_delta_encoding, encode_bitpacker_level_and_num_bits, BitPackerLevel,
};
quickcheck! {
// fn qc_bitpacker_level_and_num_bits(bp_level: BitPackerLevel, numbits: u8) -> bool {
// let left = encode_bitpacker_level_and_num_bits(crate::BitPackerLevel::None, num_bits);
// let (out_level, out_num_bits) = decode_bitpacker_level_and_num_bits(left);
// out_level ==level && out_num_bits == num_bits
// }
fn qc_random(xs: Vec<u32>) -> bool {
let bitmap = RoaringBitmap::from_iter(xs);
let compressed = encode_bitmap_with_delta_encoding(&bitmap);
let decompressed = decode_bitmap_with_delta_encoding(&compressed);
decompressed == bitmap
}
}
}