Compare commits

...

17 Commits

Author SHA1 Message Date
Clément Renault
0b4f635817 WIP missing DeCboRoaringBitmapCodec::bytes_encode implem 2025-11-24 18:22:57 +01:00
Kerollmops
a558dbdfde Remove the unwraps, asserts and return actual io errors 2025-11-24 13:41:37 +01:00
Kerollmops
952b8b85eb Move to a two bytes header 2025-11-24 13:41:37 +01:00
Kerollmops
82a260fec8 Introduce a first working version of the DeBitmapCodec 2025-11-24 13:41:37 +01:00
Clément Renault
cf62af13e8 Merge pull request #6005 from meilisearch/clamp-max-batch-size
Clamp max batch size to 10 GiB
2025-11-20 10:45:23 +00:00
Many the fish
91cf94c196 Merge pull request #5999 from meilisearch/fix-document-fetch-sort
Fix the Document Fetch pagination bug when Sort is applied
2025-11-20 10:15:04 +00:00
Clément Renault
753ba39199 Update the documentation of the batch size 2025-11-20 10:33:02 +01:00
Clément Renault
3944c25853 Clamp the maximum batch size to maximum 10GiB 2025-11-20 10:29:50 +01:00
ManyTheFish
925bce5fbd Modify the test to test all the sort branches and fix the untested branch 2025-11-20 10:27:24 +01:00
ManyTheFish
62065ed30d Fix the pagination bug
where the last document of the previous page was duplicated as the first
document of the current page. This was due to a bug on the custom nth
function of the sort ranking rule skipping `n-1` documents instead of `n`
2025-11-20 10:27:24 +01:00
Clément Renault
97e6ae1957 Merge pull request #5994 from meilisearch/improve-s3-error-messages
Improve S3 upload by showing errors in the task queue
2025-11-19 16:58:02 +00:00
Clément Renault
5ed9be0789 Merge pull request #5990 from meilisearch/default-max-batch-size
Make the limit batched tasks total size defaults to half of the max indexing memory
2025-11-19 16:56:34 +00:00
Clément Renault
7597b1049f Merge pull request #6001 from meilisearch/update-windows-macos-ci
Update the macOS platform version in the CI
2025-11-19 16:12:52 +00:00
Clément Renault
d99150f21b Improve error message extraction
Co-authored-by: Many the fish <many@meilisearch.com>
2025-11-19 17:09:15 +01:00
Kerollmops
c9726674a0 Make the limit batched tasks total size default to half of max indexing
memory
2025-11-19 17:04:45 +01:00
Clément Renault
205f40b3b8 Update the macOS platform version to use version 14 2025-11-19 16:10:41 +01:00
Kerollmops
361580f451 Display the error message on failure 2025-11-17 09:21:18 +01:00
17 changed files with 695 additions and 48 deletions

View File

@@ -65,9 +65,9 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
os: [macos-13, windows-2022] os: [macos-14, windows-2022]
include: include:
- os: macos-13 - os: macos-14
artifact_name: meilisearch artifact_name: meilisearch
asset_name: meilisearch-macos-amd64 asset_name: meilisearch-macos-amd64
- os: windows-2022 - os: windows-2022
@@ -90,7 +90,7 @@ jobs:
publish-macos-apple-silicon: publish-macos-apple-silicon:
name: Publish binary for macOS silicon name: Publish binary for macOS silicon
runs-on: macos-13 runs-on: macos-14
needs: check-version needs: check-version
strategy: strategy:
matrix: matrix:

View File

@@ -47,7 +47,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
os: [macos-13, windows-2022] os: [macos-14, windows-2022]
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v5
- name: Cache dependencies - name: Cache dependencies

23
Cargo.lock generated
View File

@@ -1964,6 +1964,16 @@ dependencies = [
"syn 2.0.106", "syn 2.0.106",
] ]
[[package]]
name = "env_logger"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
dependencies = [
"log",
"regex",
]
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.2" version = "1.0.2"
@@ -4183,6 +4193,7 @@ dependencies = [
"big_s", "big_s",
"bimap", "bimap",
"bincode 1.3.3", "bincode 1.3.3",
"bitpacking",
"bstr", "bstr",
"bumpalo", "bumpalo",
"bumparaw-collections", "bumparaw-collections",
@@ -4229,6 +4240,7 @@ dependencies = [
"obkv", "obkv",
"once_cell", "once_cell",
"ordered-float 5.0.0", "ordered-float 5.0.0",
"quickcheck",
"rand 0.8.5", "rand 0.8.5",
"rayon", "rayon",
"rhai", "rhai",
@@ -5171,6 +5183,17 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "quickcheck"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6"
dependencies = [
"env_logger",
"log",
"rand 0.8.5",
]
[[package]] [[package]]
name = "quinn" name = "quinn"
version = "0.11.9" version = "0.11.9"

View File

@@ -438,12 +438,15 @@ async fn multipart_stream_to_s3(
db_name: String, db_name: String,
reader: std::io::PipeReader, reader: std::io::PipeReader,
) -> Result<(), Error> { ) -> Result<(), Error> {
use std::{collections::VecDeque, os::fd::OwnedFd, path::PathBuf}; use std::collections::VecDeque;
use std::io;
use std::os::fd::OwnedFd;
use std::path::PathBuf;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use reqwest::{Client, Response}; use reqwest::{Client, Response};
use rusty_s3::S3Action as _; use rusty_s3::actions::CreateMultipartUpload;
use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle}; use rusty_s3::{Bucket, BucketError, Credentials, S3Action as _, UrlStyle};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
let reader = OwnedFd::from(reader); let reader = OwnedFd::from(reader);
@@ -517,7 +520,6 @@ async fn multipart_stream_to_s3(
while buffer.len() < (s3_multipart_part_size as usize / 2) { while buffer.len() < (s3_multipart_part_size as usize / 2) {
// Wait for the pipe to be readable // Wait for the pipe to be readable
use std::io;
reader.readable().await?; reader.readable().await?;
match reader.try_read_buf(&mut buffer) { match reader.try_read_buf(&mut buffer) {
@@ -581,15 +583,17 @@ async fn multipart_stream_to_s3(
async move { async move {
match client.post(url).body(body).send().await { match client.post(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => { Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent) Err(backoff::Error::Permanent(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
}))
} }
Ok(resp) => Ok(resp), Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)), Err(e) => Err(backoff::Error::transient(Error::S3HttpError(e))),
} }
} }
}) })
.await .await?;
.map_err(Error::S3HttpError)?;
let status = resp.status(); let status = resp.status();
let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?; let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;

View File

@@ -195,7 +195,7 @@ struct Infos {
experimental_enable_logs_route: bool, experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool, experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize, experimental_max_number_of_batched_tasks: usize,
experimental_limit_batched_tasks_total_size: u64, experimental_limit_batched_tasks_total_size: Option<u64>,
experimental_network: bool, experimental_network: bool,
experimental_multimodal: bool, experimental_multimodal: bool,
experimental_chat_completions: bool, experimental_chat_completions: bool,
@@ -359,7 +359,7 @@ impl Infos {
http_payload_size_limit, http_payload_size_limit,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
experimental_limit_batched_tasks_total_size: experimental_limit_batched_tasks_total_size:
experimental_limit_batched_tasks_total_size.into(), experimental_limit_batched_tasks_total_size.map(|size| size.as_u64()),
task_queue_webhook: task_webhook_url.is_some(), task_queue_webhook: task_webhook_url.is_some(),
task_webhook_authorization_header: task_webhook_authorization_header.is_some(), task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
log_level: log_level.to_string(), log_level: log_level.to_string(),

View File

@@ -230,7 +230,17 @@ pub fn setup_meilisearch(
cleanup_enabled: !opt.experimental_replication_parameters, cleanup_enabled: !opt.experimental_replication_parameters,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size.into(), batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size.map_or_else(
|| {
opt.indexer_options
.max_indexing_memory
// By default, we use half of the available memory to determine the size of batched tasks
.map_or(u64::MAX, |mem| mem.as_u64() / 2)
// And never exceed 10 GiB when we infer the limit
.min(10 * 1024 * 1024 * 1024)
},
|size| size.as_u64(),
),
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
instance_features: opt.to_instance_features(), instance_features: opt.to_instance_features(),

View File

@@ -473,11 +473,14 @@ pub struct Opt {
#[serde(default = "default_limit_batched_tasks")] #[serde(default = "default_limit_batched_tasks")]
pub experimental_max_number_of_batched_tasks: usize, pub experimental_max_number_of_batched_tasks: usize,
/// Experimentally reduces the maximum total size, in bytes, of tasks that will be processed at once, /// Experimentally controls the maximum total size, in bytes, of tasks that will be processed
/// see: <https://github.com/orgs/meilisearch/discussions/801> /// simultaneously. When unspecified, defaults to half of the maximum indexing memory and
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())] /// clamped to 10 GiB.
#[serde(default = "default_limit_batched_tasks_total_size")] ///
pub experimental_limit_batched_tasks_total_size: Byte, /// See: <https://github.com/orgs/meilisearch/discussions/801>
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE)]
#[serde(default)]
pub experimental_limit_batched_tasks_total_size: Option<Byte>,
/// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each /// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each
/// distinct embedder. /// distinct embedder.
@@ -701,10 +704,12 @@ impl Opt {
MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS,
experimental_max_number_of_batched_tasks.to_string(), experimental_max_number_of_batched_tasks.to_string(),
); );
export_to_env_if_not_present( if let Some(limit) = experimental_limit_batched_tasks_total_size {
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, export_to_env_if_not_present(
experimental_limit_batched_tasks_total_size.to_string(), MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
); limit.to_string(),
);
}
export_to_env_if_not_present( export_to_env_if_not_present(
MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES, MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES,
experimental_embedding_cache_entries.to_string(), experimental_embedding_cache_entries.to_string(),
@@ -1273,10 +1278,6 @@ fn default_limit_batched_tasks() -> usize {
usize::MAX usize::MAX
} }
fn default_limit_batched_tasks_total_size() -> Byte {
Byte::from_u64(u64::MAX)
}
fn default_embedding_cache_entries() -> usize { fn default_embedding_cache_entries() -> usize {
0 0
} }

View File

@@ -1,14 +1,14 @@
use crate::search::{Personalize, SearchResult}; use std::time::Duration;
use meilisearch_types::{
error::{Code, ErrorCode, ResponseError}, use meilisearch_types::error::{Code, ErrorCode, ResponseError};
milli::TimeBudget, use meilisearch_types::milli::TimeBudget;
};
use rand::Rng; use rand::Rng;
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::search::{Personalize, SearchResult};
const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank"; const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank";
const MAX_RETRIES: u32 = 10; const MAX_RETRIES: u32 = 10;

View File

@@ -18,10 +18,9 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema; use utoipa::ToSchema;
use uuid::Uuid; use uuid::Uuid;
use crate::search::SearchMetadata;
use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex}; use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex};
use crate::milli::vector::Embedding; use crate::milli::vector::Embedding;
use crate::search::SearchMetadata;
pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0; pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0;

View File

@@ -1339,3 +1339,117 @@ async fn get_document_with_vectors() {
} }
"###); "###);
} }
#[actix_rt::test]
async fn test_fetch_documents_pagination_with_sorting() {
let server = Server::new_shared();
let index = server.unique_index();
let (task, _code) = index.create(None).await;
server.wait_task(task.uid()).await.succeeded();
// Set name as sortable attribute
let (task, code) = index.update_settings_sortable_attributes(json!(["name"])).await;
assert_eq!(code, 202);
server.wait_task(task.uid()).await.succeeded();
let documents = json!((0..50)
.map(|i| json!({"id": i, "name": format!("doc_{:05}", std::cmp::min(i, 5))}))
.collect::<Vec<_>>());
// Add documents as described in the bug report
let (task, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202);
server.wait_task(task.uid()).await.succeeded();
// Request 1 (first page): offset 0, limit 2
let (response, code) = index
.fetch_documents(json!({
"offset": 0,
"limit": 2,
"sort": ["name:asc"]
}))
.await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
snapshot!(json_string!(results), @r###"
[
{
"id": 0,
"name": "doc_00000"
},
{
"id": 1,
"name": "doc_00001"
}
]
"###);
// Request 2 (second page): offset 2, limit 2
let (response, code) = index
.fetch_documents(json!({
"offset": 2,
"limit": 2,
"sort": ["name:asc"]
}))
.await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
snapshot!(json_string!(results), @r###"
[
{
"id": 2,
"name": "doc_00002"
},
{
"id": 3,
"name": "doc_00003"
}
]
"###);
// Request 3 (third page): offset 4, limit 2
let (response, code) = index
.fetch_documents(json!({
"offset": 4,
"limit": 2,
"sort": ["name:asc"]
}))
.await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
snapshot!(json_string!(results), @r###"
[
{
"id": 4,
"name": "doc_00004"
},
{
"id": 5,
"name": "doc_00005"
}
]
"###);
// Request 4 (fourth page): offset 6, limit 2
let (response, code) = index
.fetch_documents(json!({
"offset": 6,
"limit": 2,
"sort": ["name:asc"]
}))
.await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
snapshot!(json_string!(results), @r###"
[
{
"id": 6,
"name": "doc_00005"
},
{
"id": 7,
"name": "doc_00005"
}
]
"###);
}

View File

@@ -120,14 +120,16 @@ twox-hash = { version = "2.1.1", default-features = false, features = [
] } ] }
geo-types = "0.7.16" geo-types = "0.7.16"
zerometry = "0.3.0" zerometry = "0.3.0"
bitpacking = "0.9.2"
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.47", default-features = false }
# fixed version due to format breakages in v1.40 # fixed version due to format breakages in v1.40
insta = "=1.39.0" insta = "=1.39.0"
mimalloc = { version = "0.1.47", default-features = false }
maplit = "1.0.2" maplit = "1.0.2"
md5 = "0.7.0" md5 = "0.7.0"
meili-snap = { path = "../meili-snap" } meili-snap = { path = "../meili-snap" }
quickcheck = "1.0.3"
rand = { version = "0.8.5", features = ["small_rng"] } rand = { version = "0.8.5", features = ["small_rng"] }
[features] [features]

View File

@@ -87,7 +87,7 @@ impl Iterator for SortedDocumentsIterator<'_> {
}; };
// Otherwise don't directly iterate over children, skip them if we know we will go further // Otherwise don't directly iterate over children, skip them if we know we will go further
let mut to_skip = n - 1; let mut to_skip = n;
while to_skip > 0 { while to_skip > 0 {
if let Err(e) = SortedDocumentsIterator::update_current( if let Err(e) = SortedDocumentsIterator::update_current(
current_child, current_child,
@@ -108,7 +108,7 @@ impl Iterator for SortedDocumentsIterator<'_> {
continue; continue;
} else { } else {
// The current iterator is large enough, so we can forward the call to it. // The current iterator is large enough, so we can forward the call to it.
return inner.nth(to_skip + 1); return inner.nth(to_skip);
} }
} }

View File

@@ -22,7 +22,9 @@ pub use self::beu32_str_codec::BEU32StrCodec;
pub use self::field_id_word_count_codec::FieldIdWordCountCodec; pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
pub use self::fst_set_codec::FstSetCodec; pub use self::fst_set_codec::FstSetCodec;
pub use self::obkv_codec::ObkvCodec; pub use self::obkv_codec::ObkvCodec;
pub use self::roaring_bitmap::{BoRoaringBitmapCodec, CboRoaringBitmapCodec, RoaringBitmapCodec}; pub use self::roaring_bitmap::{
BoRoaringBitmapCodec, CboRoaringBitmapCodec, DeCboRoaringBitmapCodec, RoaringBitmapCodec,
};
pub use self::roaring_bitmap_length::{ pub use self::roaring_bitmap_length::{
BoRoaringBitmapLenCodec, CboRoaringBitmapLenCodec, RoaringBitmapLenCodec, BoRoaringBitmapLenCodec, CboRoaringBitmapLenCodec, RoaringBitmapLenCodec,
}; };

View File

@@ -19,8 +19,19 @@ pub const THRESHOLD: usize = 7;
pub struct CboRoaringBitmapCodec; pub struct CboRoaringBitmapCodec;
impl CboRoaringBitmapCodec { impl CboRoaringBitmapCodec {
/// If the number of items (u32s) to encode is less than or equal to the threshold
/// it means that it would weigh the same or less than the RoaringBitmap
/// header, so we directly encode them using ByteOrder instead.
pub fn bitmap_serialize_as_raw_u32s(roaring: &RoaringBitmap) -> bool {
roaring.len() <= THRESHOLD as u64
}
pub fn bytes_deserialize_as_raw_u32s(bytes: &[u8]) -> bool {
bytes.len() <= THRESHOLD * size_of::<u32>()
}
pub fn serialized_size(roaring: &RoaringBitmap) -> usize { pub fn serialized_size(roaring: &RoaringBitmap) -> usize {
if roaring.len() <= THRESHOLD as u64 { if Self::bitmap_serialize_as_raw_u32s(roaring) {
roaring.len() as usize * size_of::<u32>() roaring.len() as usize * size_of::<u32>()
} else { } else {
roaring.serialized_size() roaring.serialized_size()
@@ -35,10 +46,7 @@ impl CboRoaringBitmapCodec {
roaring: &RoaringBitmap, roaring: &RoaringBitmap,
mut writer: W, mut writer: W,
) -> io::Result<()> { ) -> io::Result<()> {
if roaring.len() <= THRESHOLD as u64 { if Self::bitmap_serialize_as_raw_u32s(roaring) {
// If the number of items (u32s) to encode is less than or equal to the threshold
// it means that it would weigh the same or less than the RoaringBitmap
// header, so we directly encode them using ByteOrder instead.
for integer in roaring { for integer in roaring {
writer.write_u32::<NativeEndian>(integer)?; writer.write_u32::<NativeEndian>(integer)?;
} }
@@ -51,7 +59,7 @@ impl CboRoaringBitmapCodec {
} }
pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringBitmap> { pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringBitmap> {
if bytes.len() <= THRESHOLD * size_of::<u32>() { if Self::bytes_deserialize_as_raw_u32s(bytes) {
// If there is threshold or less than threshold integers that can fit into this array // If there is threshold or less than threshold integers that can fit into this array
// of bytes it means that we used the ByteOrder codec serializer. // of bytes it means that we used the ByteOrder codec serializer.
let mut bitmap = RoaringBitmap::new(); let mut bitmap = RoaringBitmap::new();
@@ -71,7 +79,7 @@ impl CboRoaringBitmapCodec {
other: &RoaringBitmap, other: &RoaringBitmap,
) -> io::Result<RoaringBitmap> { ) -> io::Result<RoaringBitmap> {
// See above `deserialize_from` method for implementation details. // See above `deserialize_from` method for implementation details.
if bytes.len() <= THRESHOLD * size_of::<u32>() { if Self::bytes_deserialize_as_raw_u32s(bytes) {
let mut bitmap = RoaringBitmap::new(); let mut bitmap = RoaringBitmap::new();
while let Ok(integer) = bytes.read_u32::<NativeEndian>() { while let Ok(integer) = bytes.read_u32::<NativeEndian>() {
if other.contains(integer) { if other.contains(integer) {
@@ -98,7 +106,7 @@ impl CboRoaringBitmapCodec {
let mut vec = Vec::new(); let mut vec = Vec::new();
for bytes in slices { for bytes in slices {
if bytes.as_ref().len() <= THRESHOLD * size_of::<u32>() { if Self::bytes_deserialize_as_raw_u32s(bytes.as_ref()) {
let mut reader = bytes.as_ref(); let mut reader = bytes.as_ref();
while let Ok(integer) = reader.read_u32::<NativeEndian>() { while let Ok(integer) = reader.read_u32::<NativeEndian>() {
vec.push(integer); vec.push(integer);
@@ -112,6 +120,8 @@ impl CboRoaringBitmapCodec {
vec.sort_unstable(); vec.sort_unstable();
vec.dedup(); vec.dedup();
// Be careful when modifying this condition,
// the rule must be the same everywhere
if vec.len() <= THRESHOLD { if vec.len() <= THRESHOLD {
for integer in vec { for integer in vec {
buffer.extend_from_slice(&integer.to_ne_bytes()); buffer.extend_from_slice(&integer.to_ne_bytes());

View File

@@ -0,0 +1,102 @@
use std::borrow::Cow;
use std::io::{self, ErrorKind};
use heed::BoxedError;
use roaring::RoaringBitmap;
use super::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
use super::de_roaring_bitmap_codec::DeRoaringBitmapCodec;
use crate::heed_codec::BytesDecodeOwned;
pub struct DeCboRoaringBitmapCodec;
impl DeCboRoaringBitmapCodec {
pub fn serialized_size_with_tmp_buffer(
bitmap: &RoaringBitmap,
tmp_buffer: &mut Vec<u32>,
) -> usize {
DeRoaringBitmapCodec::serialized_size_with_tmp_buffer(bitmap, tmp_buffer)
}
/// Writes the delta-encoded compressed version of
/// the given roaring bitmap into the provided writer.
pub fn serialize_into<W: io::Write>(bitmap: &RoaringBitmap, writer: W) -> io::Result<()> {
let mut tmp_buffer = Vec::new();
Self::serialize_into_with_tmp_buffer(bitmap, writer, &mut tmp_buffer)
}
/// Same as [Self::serialize_into] but accepts a buffer to avoid allocating one.
///
/// Note that we always serialize the bitmap with the delta-encoded compressed version.
pub fn serialize_into_with_tmp_buffer<W: io::Write>(
bitmap: &RoaringBitmap,
writer: W,
tmp_buffer: &mut Vec<u32>,
) -> io::Result<()> {
// We are stuck with this format because the CboRoaringBitmapCodec decides to write
// raw and unencoded u32s, without a header when there is at most THRESHOLD elements.
if CboRoaringBitmapCodec::bitmap_serialize_as_raw_u32s(bitmap) {
CboRoaringBitmapCodec::serialize_into_writer(bitmap, writer)
} else {
DeRoaringBitmapCodec::serialize_into_with_tmp_buffer(bitmap, writer, tmp_buffer)
}
}
/// Returns the delta-decoded roaring bitmap from the compressed bytes.
pub fn deserialize_from(compressed: &[u8]) -> io::Result<RoaringBitmap> {
let mut tmp_buffer = Vec::new();
Self::deserialize_from_with_tmp_buffer(compressed, &mut tmp_buffer)
}
/// Same as [Self::deserialize_from] but accepts a buffer to avoid allocating one.
///
/// It tries to decode the input by using the delta-decoded version and
/// if it fails, falls back to the CboRoaringBitmap version.
pub fn deserialize_from_with_tmp_buffer(
input: &[u8],
tmp_buffer: &mut Vec<u32>,
) -> io::Result<RoaringBitmap> {
// The input is too short to be a valid delta-decoded bitmap.
// We fall back to the CboRoaringBitmap version with raw u32s.
if CboRoaringBitmapCodec::bytes_deserialize_as_raw_u32s(input) {
return CboRoaringBitmapCodec::deserialize_from(input);
}
match DeRoaringBitmapCodec::deserialize_from_with_tmp_buffer(input, tmp_buffer) {
Ok(bitmap) => Ok(bitmap),
// If the error kind is Other it means that the delta-decoder found
// an invalid magic header. We fall back to the CboRoaringBitmap version.
Err(e) if e.kind() == ErrorKind::Other => {
CboRoaringBitmapCodec::deserialize_from(input)
}
Err(e) => Err(e),
}
}
}
impl heed::BytesDecode<'_> for DeCboRoaringBitmapCodec {
type DItem = RoaringBitmap;
fn bytes_decode(bytes: &[u8]) -> Result<Self::DItem, BoxedError> {
Self::deserialize_from(bytes).map_err(Into::into)
}
}
impl BytesDecodeOwned for DeCboRoaringBitmapCodec {
type DItem = RoaringBitmap;
fn bytes_decode_owned(bytes: &[u8]) -> Result<Self::DItem, BoxedError> {
Self::deserialize_from(bytes).map_err(Into::into)
}
}
impl heed::BytesEncode<'_> for DeCboRoaringBitmapCodec {
type EItem = RoaringBitmap;
fn bytes_encode(item: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> {
// let mut vec = Vec::with_capacity(Self::serialized_size(item));
// Self::serialize_into_vec(item, &mut vec);
// Ok(Cow::Owned(vec))
todo!()
}
}

View File

@@ -0,0 +1,377 @@
use std::io::{self, ErrorKind};
use std::mem::{self, size_of, size_of_val};
use bitpacking::{BitPacker, BitPacker1x, BitPacker4x, BitPacker8x};
use roaring::RoaringBitmap;
/// The magic header for our custom encoding format
const MAGIC_HEADER: u16 = 36869;
pub struct DeRoaringBitmapCodec;
// TODO reintroduce:
// - serialized_size?
// - serialize_into_vec
// - intersection_with_serialized
// - merge_into
// - merge_deladd_into
impl DeRoaringBitmapCodec {
/// Returns the serialized size of the given roaring bitmap with the delta encoding format.
pub fn serialized_size_with_tmp_buffer(
bitmap: &RoaringBitmap,
tmp_buffer: &mut Vec<u32>,
) -> usize {
let mut size = 2; // u16 magic header
let bitpacker8x = BitPacker8x::new();
let bitpacker4x = BitPacker4x::new();
let bitpacker1x = BitPacker1x::new();
// This temporary buffer is used to store each chunk of decompressed u32s.
tmp_buffer.resize(BitPacker8x::BLOCK_LEN, 0u32);
let decompressed = &mut tmp_buffer[..];
let mut buffer_index = 0;
let mut initial = None;
// We initially collect all the integers into a flat buffer of the size
// of the largest bitpacker. We encode them with it until we don't have
// enough of them...
for n in bitmap {
decompressed[buffer_index] = n;
buffer_index += 1;
if buffer_index == BitPacker8x::BLOCK_LEN {
let num_bits = bitpacker8x.num_bits_strictly_sorted(initial, decompressed);
let compressed_len = BitPacker8x::compressed_block_size(num_bits);
size += 1; // u8 chunk header
size += compressed_len; // compressed data length
initial = Some(n);
buffer_index = 0;
}
}
// ...We then switch to a smaller bitpacker to encode the remaining chunks...
let decompressed = &decompressed[..buffer_index];
let mut chunks = decompressed.chunks_exact(BitPacker4x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
let num_bits = bitpacker4x.num_bits_strictly_sorted(initial, decompressed);
let compressed_len = BitPacker4x::compressed_block_size(num_bits);
size += 1; // u8 chunk header
size += compressed_len; // compressed data length
initial = decompressed.iter().last().copied();
}
// ...And so on...
let decompressed = chunks.remainder();
let mut chunks = decompressed.chunks_exact(BitPacker1x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
let num_bits = bitpacker1x.num_bits_strictly_sorted(initial, decompressed);
let compressed_len = BitPacker1x::compressed_block_size(num_bits);
size += 1; // u8 chunk header
size += compressed_len; // compressed data length
initial = decompressed.iter().last().copied();
}
// ...Until we don't have any small enough bitpacker. We put them raw
// at the end of out buffer with a header indicating the matter.
let decompressed = chunks.remainder();
if !decompressed.is_empty() {
size += 1; // u8 chunk header
size += decompressed.len() * mem::size_of::<u32>(); // remaining uncompressed u32s
}
size
}
/// Writes the delta-encoded compressed version of
/// the given roaring bitmap into the provided writer.
pub fn serialize_into<W: io::Write>(bitmap: &RoaringBitmap, writer: W) -> io::Result<()> {
let mut tmp_buffer = Vec::new();
Self::serialize_into_with_tmp_buffer(bitmap, writer, &mut tmp_buffer)
}
/// Same as [Self::serialize_into] but accepts a buffer to avoid allocating one.
pub fn serialize_into_with_tmp_buffer<W: io::Write>(
bitmap: &RoaringBitmap,
mut writer: W,
tmp_buffer: &mut Vec<u32>,
) -> io::Result<()> {
// Insert the magic header
writer.write_all(&MAGIC_HEADER.to_ne_bytes())?;
let bitpacker8x = BitPacker8x::new();
let bitpacker4x = BitPacker4x::new();
let bitpacker1x = BitPacker1x::new();
// This temporary buffer is used to store each chunk of decompressed and
// compressed and delta-encoded u32s. We need room for the decompressed
// u32s coming from the roaring bitmap, the compressed output that can
// be as large as the decompressed u32s, and the chunk header.
tmp_buffer.resize((BitPacker8x::BLOCK_LEN * 2) + 1, 0u32);
let (decompressed, compressed) = tmp_buffer.split_at_mut(BitPacker8x::BLOCK_LEN);
let compressed = bytemuck::cast_slice_mut(compressed);
let mut buffer_index = 0;
let mut initial = None;
// We initially collect all the integers into a flat buffer of the size
// of the largest bitpacker. We encode them with it until we don't have
// enough of them...
for n in bitmap {
decompressed[buffer_index] = n;
buffer_index += 1;
if buffer_index == BitPacker8x::BLOCK_LEN {
let output = encode_with_packer(&bitpacker8x, decompressed, initial, compressed);
writer.write_all(output)?;
initial = Some(n);
buffer_index = 0;
}
}
// ...We then switch to a smaller bitpacker to encode the remaining chunks...
let decompressed = &decompressed[..buffer_index];
let mut chunks = decompressed.chunks_exact(BitPacker4x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
let output = encode_with_packer(&bitpacker4x, decompressed, initial, compressed);
writer.write_all(output)?;
initial = decompressed.iter().last().copied();
}
// ...And so on...
let decompressed = chunks.remainder();
let mut chunks = decompressed.chunks_exact(BitPacker1x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
let output = encode_with_packer(&bitpacker1x, decompressed, initial, compressed);
writer.write_all(output)?;
initial = decompressed.iter().last().copied();
}
// ...Until we don't have any small enough bitpacker. We put them raw
// at the end of out buffer with a header indicating the matter.
let decompressed = chunks.remainder();
if !decompressed.is_empty() {
let header = encode_chunk_header(BitPackerLevel::None, u32::BITS as u8);
// Note: Not convinced about the performance of writing a single
// byte followed by a larger write. However, we will use this
// codec with a BufWriter or directly with a Vec of bytes.
writer.write_all(&[header])?;
writer.write_all(bytemuck::cast_slice(decompressed))?;
}
Ok(())
}
/// Returns the delta-decoded roaring bitmap from the compressed bytes.
pub fn deserialize_from(compressed: &[u8]) -> io::Result<RoaringBitmap> {
let mut tmp_buffer = Vec::new();
Self::deserialize_from_with_tmp_buffer(compressed, &mut tmp_buffer)
}
/// Same as [Self::deserialize_from] but accepts a buffer to avoid allocating one.
pub fn deserialize_from_with_tmp_buffer(
input: &[u8],
tmp_buffer: &mut Vec<u32>,
) -> io::Result<RoaringBitmap> {
let Some((header, mut compressed)) = input.split_at_checked(size_of_val(&MAGIC_HEADER))
else {
return Err(io::Error::new(ErrorKind::UnexpectedEof, "expecting a two-bytes header"));
};
// Safety: This unwrap cannot happen as the header buffer is the right size
let header = u16::from_ne_bytes(header.try_into().unwrap());
if header != MAGIC_HEADER {
return Err(io::Error::other("invalid header value"));
}
let bitpacker8x = BitPacker8x::new();
let bitpacker4x = BitPacker4x::new();
let bitpacker1x = BitPacker1x::new();
let mut bitmap = RoaringBitmap::new();
tmp_buffer.resize(BitPacker8x::BLOCK_LEN, 0u32);
let decompressed = &mut tmp_buffer[..];
let mut initial = None;
while let Some((&chunk_header, encoded)) = compressed.split_first() {
let (level, num_bits) = decode_chunk_header(chunk_header);
let (bytes_read, decompressed) = match level {
BitPackerLevel::None => {
if num_bits != u32::BITS as u8 {
return Err(io::Error::new(
ErrorKind::InvalidData,
"invalid number of bits to encode non-compressed u32s",
));
}
let chunks = encoded.chunks_exact(size_of::<u32>());
if !chunks.remainder().is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"expecting last chunk to be a multiple of the size of an u32",
));
}
let integers = chunks
// safety: This unwrap cannot happen as
// the size of u32 is set correctly.
.map(|b| b.try_into().unwrap())
.map(u32::from_ne_bytes);
bitmap
.append(integers)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;
// This is basically always the last chunk that exists in
// this delta-encoded format as the raw u32s are appended
// when there is not enough of them to fit in a bitpacker.
break;
}
BitPackerLevel::BitPacker1x => {
decode_with_packer(&bitpacker1x, decompressed, initial, encoded, num_bits)
}
BitPackerLevel::BitPacker4x => {
decode_with_packer(&bitpacker4x, decompressed, initial, encoded, num_bits)
}
BitPackerLevel::BitPacker8x => {
decode_with_packer(&bitpacker8x, decompressed, initial, encoded, num_bits)
}
};
initial = decompressed.iter().last().copied();
// TODO investigate perf
// Safety: Bitpackers cannot output unsorter integers when
// used with the compress_strictly_sorted function.
bitmap.append(decompressed.iter().copied()).unwrap();
// What the delta-decoding read plus the chunk header size
compressed = &compressed[bytes_read + 1..];
}
Ok(bitmap)
}
}
/// Takes a strickly sorted list of u32s and outputs delta-encoded
/// bytes with a chunk header. We expect the output buffer to be
/// at least BLOCK_LEN + 1.
fn encode_with_packer<'c, B: BitPackerExt>(
bitpacker: &B,
decompressed: &[u32],
initial: Option<u32>,
output: &'c mut [u8],
) -> &'c [u8] {
let num_bits = bitpacker.num_bits_strictly_sorted(initial, decompressed);
let compressed_len = B::compressed_block_size(num_bits);
let chunk_header = encode_chunk_header(B::level(), num_bits);
let buffer = &mut output[..compressed_len + 1];
// Safety: The buffer is at least one byte
let (header_in_buffer, encoded) = buffer.split_first_mut().unwrap();
*header_in_buffer = chunk_header;
bitpacker.compress_strictly_sorted(initial, decompressed, encoded, num_bits);
buffer
}
/// Returns the number of bytes read and the decoded unsigned integers.
fn decode_with_packer<'d, B: BitPacker>(
bitpacker: &B,
decompressed: &'d mut [u32],
initial: Option<u32>,
compressed: &[u8],
num_bits: u8,
) -> (usize, &'d [u32]) {
let decompressed = &mut decompressed[..B::BLOCK_LEN];
let read = bitpacker.decompress_strictly_sorted(initial, compressed, decompressed, num_bits);
(read, decompressed)
}
/// An identifier for the bitpacker to be able
/// to correctly decode the compressed integers.
#[derive(Debug, PartialEq, Eq)]
#[repr(u8)]
enum BitPackerLevel {
/// The remaining bytes are raw little endian encoded u32s.
None,
/// The remaining bits are encoded using a `BitPacker1x`.
BitPacker1x,
/// The remaining bits are encoded using a `BitPacker4x`.
BitPacker4x,
/// The remaining bits are encoded using a `BitPacker8x`.
BitPacker8x,
}
/// Returns the chunk header based on the bitpacker level
/// and the number of bits to encode the list of integers.
fn encode_chunk_header(level: BitPackerLevel, num_bits: u8) -> u8 {
debug_assert!(num_bits as u32 <= 2_u32.pow(6));
let level = level as u8;
debug_assert!(level <= 3);
num_bits | (level << 6)
}
/// Decodes the chunk header and output the bitpacker level
/// and the number of bits to decode the following bytes.
fn decode_chunk_header(data: u8) -> (BitPackerLevel, u8) {
let num_bits = data & 0b00111111;
let level = match data >> 6 {
0 => BitPackerLevel::None,
1 => BitPackerLevel::BitPacker1x,
2 => BitPackerLevel::BitPacker4x,
3 => BitPackerLevel::BitPacker8x,
invalid => panic!("Invalid bitpacker level: {invalid}"),
};
debug_assert!(num_bits as u32 <= 2_u32.pow(6));
(level, num_bits)
}
/// A simple helper trait to get the BitPackerLevel
/// and correctly generate the chunk header.
trait BitPackerExt: BitPacker {
/// Returns the level of the bitpacker: an identifier to be
/// able to decode the numbers with the right bitpacker.
fn level() -> BitPackerLevel;
}
impl BitPackerExt for BitPacker8x {
fn level() -> BitPackerLevel {
BitPackerLevel::BitPacker8x
}
}
impl BitPackerExt for BitPacker4x {
fn level() -> BitPackerLevel {
BitPackerLevel::BitPacker4x
}
}
impl BitPackerExt for BitPacker1x {
fn level() -> BitPackerLevel {
BitPackerLevel::BitPacker1x
}
}
#[cfg(test)]
mod tests {
use quickcheck::quickcheck;
use roaring::RoaringBitmap;
use super::DeRoaringBitmapCodec;
quickcheck! {
fn qc_random(xs: Vec<u32>) -> bool {
let bitmap = RoaringBitmap::from_iter(xs);
let mut compressed = Vec::new();
DeRoaringBitmapCodec::serialize_into(&bitmap, &mut compressed).unwrap();
let decompressed = DeRoaringBitmapCodec::deserialize_from(&compressed[..]).unwrap();
decompressed == bitmap
}
}
quickcheck! {
fn qc_random_check_serialized_size(xs: Vec<u32>) -> bool {
let bitmap = RoaringBitmap::from_iter(xs);
let mut compressed = Vec::new();
let mut tmp_buffer = Vec::new();
DeRoaringBitmapCodec::serialize_into(&bitmap, &mut compressed).unwrap();
let expected_len = DeRoaringBitmapCodec::serialized_size_with_tmp_buffer(&bitmap, &mut tmp_buffer);
compressed.len() == expected_len
}
}
}

View File

@@ -1,7 +1,10 @@
mod bo_roaring_bitmap_codec; mod bo_roaring_bitmap_codec;
pub mod cbo_roaring_bitmap_codec; pub mod cbo_roaring_bitmap_codec;
pub mod de_cbo_roaring_bitmap_codec;
mod de_roaring_bitmap_codec;
mod roaring_bitmap_codec; mod roaring_bitmap_codec;
pub use self::bo_roaring_bitmap_codec::BoRoaringBitmapCodec; pub use self::bo_roaring_bitmap_codec::BoRoaringBitmapCodec;
pub use self::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec; pub use self::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
pub use self::de_cbo_roaring_bitmap_codec::DeCboRoaringBitmapCodec;
pub use self::roaring_bitmap_codec::RoaringBitmapCodec; pub use self::roaring_bitmap_codec::RoaringBitmapCodec;