mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-24 21:47:15 +00:00
Compare commits
17 Commits
build-arm-
...
delta-enco
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b4f635817 | ||
|
|
a558dbdfde | ||
|
|
952b8b85eb | ||
|
|
82a260fec8 | ||
|
|
cf62af13e8 | ||
|
|
91cf94c196 | ||
|
|
753ba39199 | ||
|
|
3944c25853 | ||
|
|
925bce5fbd | ||
|
|
62065ed30d | ||
|
|
97e6ae1957 | ||
|
|
5ed9be0789 | ||
|
|
7597b1049f | ||
|
|
d99150f21b | ||
|
|
c9726674a0 | ||
|
|
205f40b3b8 | ||
|
|
361580f451 |
6
.github/workflows/publish-release-assets.yml
vendored
6
.github/workflows/publish-release-assets.yml
vendored
@@ -65,9 +65,9 @@ jobs:
|
|||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
os: [macos-13, windows-2022]
|
os: [macos-14, windows-2022]
|
||||||
include:
|
include:
|
||||||
- os: macos-13
|
- os: macos-14
|
||||||
artifact_name: meilisearch
|
artifact_name: meilisearch
|
||||||
asset_name: meilisearch-macos-amd64
|
asset_name: meilisearch-macos-amd64
|
||||||
- os: windows-2022
|
- os: windows-2022
|
||||||
@@ -90,7 +90,7 @@ jobs:
|
|||||||
|
|
||||||
publish-macos-apple-silicon:
|
publish-macos-apple-silicon:
|
||||||
name: Publish binary for macOS silicon
|
name: Publish binary for macOS silicon
|
||||||
runs-on: macos-13
|
runs-on: macos-14
|
||||||
needs: check-version
|
needs: check-version
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
|
|||||||
2
.github/workflows/test-suite.yml
vendored
2
.github/workflows/test-suite.yml
vendored
@@ -47,7 +47,7 @@ jobs:
|
|||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
os: [macos-13, windows-2022]
|
os: [macos-14, windows-2022]
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v5
|
||||||
- name: Cache dependencies
|
- name: Cache dependencies
|
||||||
|
|||||||
23
Cargo.lock
generated
23
Cargo.lock
generated
@@ -1964,6 +1964,16 @@ dependencies = [
|
|||||||
"syn 2.0.106",
|
"syn 2.0.106",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "env_logger"
|
||||||
|
version = "0.8.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
|
||||||
|
dependencies = [
|
||||||
|
"log",
|
||||||
|
"regex",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "equivalent"
|
name = "equivalent"
|
||||||
version = "1.0.2"
|
version = "1.0.2"
|
||||||
@@ -4183,6 +4193,7 @@ dependencies = [
|
|||||||
"big_s",
|
"big_s",
|
||||||
"bimap",
|
"bimap",
|
||||||
"bincode 1.3.3",
|
"bincode 1.3.3",
|
||||||
|
"bitpacking",
|
||||||
"bstr",
|
"bstr",
|
||||||
"bumpalo",
|
"bumpalo",
|
||||||
"bumparaw-collections",
|
"bumparaw-collections",
|
||||||
@@ -4229,6 +4240,7 @@ dependencies = [
|
|||||||
"obkv",
|
"obkv",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"ordered-float 5.0.0",
|
"ordered-float 5.0.0",
|
||||||
|
"quickcheck",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rayon",
|
"rayon",
|
||||||
"rhai",
|
"rhai",
|
||||||
@@ -5171,6 +5183,17 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "quickcheck"
|
||||||
|
version = "1.0.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6"
|
||||||
|
dependencies = [
|
||||||
|
"env_logger",
|
||||||
|
"log",
|
||||||
|
"rand 0.8.5",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quinn"
|
name = "quinn"
|
||||||
version = "0.11.9"
|
version = "0.11.9"
|
||||||
|
|||||||
@@ -438,12 +438,15 @@ async fn multipart_stream_to_s3(
|
|||||||
db_name: String,
|
db_name: String,
|
||||||
reader: std::io::PipeReader,
|
reader: std::io::PipeReader,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
use std::{collections::VecDeque, os::fd::OwnedFd, path::PathBuf};
|
use std::collections::VecDeque;
|
||||||
|
use std::io;
|
||||||
|
use std::os::fd::OwnedFd;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use reqwest::{Client, Response};
|
use reqwest::{Client, Response};
|
||||||
use rusty_s3::S3Action as _;
|
use rusty_s3::actions::CreateMultipartUpload;
|
||||||
use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle};
|
use rusty_s3::{Bucket, BucketError, Credentials, S3Action as _, UrlStyle};
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
let reader = OwnedFd::from(reader);
|
let reader = OwnedFd::from(reader);
|
||||||
@@ -517,7 +520,6 @@ async fn multipart_stream_to_s3(
|
|||||||
while buffer.len() < (s3_multipart_part_size as usize / 2) {
|
while buffer.len() < (s3_multipart_part_size as usize / 2) {
|
||||||
// Wait for the pipe to be readable
|
// Wait for the pipe to be readable
|
||||||
|
|
||||||
use std::io;
|
|
||||||
reader.readable().await?;
|
reader.readable().await?;
|
||||||
|
|
||||||
match reader.try_read_buf(&mut buffer) {
|
match reader.try_read_buf(&mut buffer) {
|
||||||
@@ -581,15 +583,17 @@ async fn multipart_stream_to_s3(
|
|||||||
async move {
|
async move {
|
||||||
match client.post(url).body(body).send().await {
|
match client.post(url).body(body).send().await {
|
||||||
Ok(resp) if resp.status().is_client_error() => {
|
Ok(resp) if resp.status().is_client_error() => {
|
||||||
resp.error_for_status().map_err(backoff::Error::Permanent)
|
Err(backoff::Error::Permanent(Error::S3Error {
|
||||||
|
status: resp.status(),
|
||||||
|
body: resp.text().await.unwrap_or_default(),
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
Ok(resp) => Ok(resp),
|
Ok(resp) => Ok(resp),
|
||||||
Err(e) => Err(backoff::Error::transient(e)),
|
Err(e) => Err(backoff::Error::transient(Error::S3HttpError(e))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.await
|
.await?;
|
||||||
.map_err(Error::S3HttpError)?;
|
|
||||||
|
|
||||||
let status = resp.status();
|
let status = resp.status();
|
||||||
let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;
|
let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;
|
||||||
|
|||||||
@@ -195,7 +195,7 @@ struct Infos {
|
|||||||
experimental_enable_logs_route: bool,
|
experimental_enable_logs_route: bool,
|
||||||
experimental_reduce_indexing_memory_usage: bool,
|
experimental_reduce_indexing_memory_usage: bool,
|
||||||
experimental_max_number_of_batched_tasks: usize,
|
experimental_max_number_of_batched_tasks: usize,
|
||||||
experimental_limit_batched_tasks_total_size: u64,
|
experimental_limit_batched_tasks_total_size: Option<u64>,
|
||||||
experimental_network: bool,
|
experimental_network: bool,
|
||||||
experimental_multimodal: bool,
|
experimental_multimodal: bool,
|
||||||
experimental_chat_completions: bool,
|
experimental_chat_completions: bool,
|
||||||
@@ -359,7 +359,7 @@ impl Infos {
|
|||||||
http_payload_size_limit,
|
http_payload_size_limit,
|
||||||
experimental_max_number_of_batched_tasks,
|
experimental_max_number_of_batched_tasks,
|
||||||
experimental_limit_batched_tasks_total_size:
|
experimental_limit_batched_tasks_total_size:
|
||||||
experimental_limit_batched_tasks_total_size.into(),
|
experimental_limit_batched_tasks_total_size.map(|size| size.as_u64()),
|
||||||
task_queue_webhook: task_webhook_url.is_some(),
|
task_queue_webhook: task_webhook_url.is_some(),
|
||||||
task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
|
task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
|
||||||
log_level: log_level.to_string(),
|
log_level: log_level.to_string(),
|
||||||
|
|||||||
@@ -230,7 +230,17 @@ pub fn setup_meilisearch(
|
|||||||
cleanup_enabled: !opt.experimental_replication_parameters,
|
cleanup_enabled: !opt.experimental_replication_parameters,
|
||||||
max_number_of_tasks: 1_000_000,
|
max_number_of_tasks: 1_000_000,
|
||||||
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
|
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
|
||||||
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size.into(),
|
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size.map_or_else(
|
||||||
|
|| {
|
||||||
|
opt.indexer_options
|
||||||
|
.max_indexing_memory
|
||||||
|
// By default, we use half of the available memory to determine the size of batched tasks
|
||||||
|
.map_or(u64::MAX, |mem| mem.as_u64() / 2)
|
||||||
|
// And never exceed 10 GiB when we infer the limit
|
||||||
|
.min(10 * 1024 * 1024 * 1024)
|
||||||
|
},
|
||||||
|
|size| size.as_u64(),
|
||||||
|
),
|
||||||
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
|
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
|
||||||
index_count: DEFAULT_INDEX_COUNT,
|
index_count: DEFAULT_INDEX_COUNT,
|
||||||
instance_features: opt.to_instance_features(),
|
instance_features: opt.to_instance_features(),
|
||||||
|
|||||||
@@ -473,11 +473,14 @@ pub struct Opt {
|
|||||||
#[serde(default = "default_limit_batched_tasks")]
|
#[serde(default = "default_limit_batched_tasks")]
|
||||||
pub experimental_max_number_of_batched_tasks: usize,
|
pub experimental_max_number_of_batched_tasks: usize,
|
||||||
|
|
||||||
/// Experimentally reduces the maximum total size, in bytes, of tasks that will be processed at once,
|
/// Experimentally controls the maximum total size, in bytes, of tasks that will be processed
|
||||||
/// see: <https://github.com/orgs/meilisearch/discussions/801>
|
/// simultaneously. When unspecified, defaults to half of the maximum indexing memory and
|
||||||
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())]
|
/// clamped to 10 GiB.
|
||||||
#[serde(default = "default_limit_batched_tasks_total_size")]
|
///
|
||||||
pub experimental_limit_batched_tasks_total_size: Byte,
|
/// See: <https://github.com/orgs/meilisearch/discussions/801>
|
||||||
|
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub experimental_limit_batched_tasks_total_size: Option<Byte>,
|
||||||
|
|
||||||
/// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each
|
/// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each
|
||||||
/// distinct embedder.
|
/// distinct embedder.
|
||||||
@@ -701,10 +704,12 @@ impl Opt {
|
|||||||
MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS,
|
MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS,
|
||||||
experimental_max_number_of_batched_tasks.to_string(),
|
experimental_max_number_of_batched_tasks.to_string(),
|
||||||
);
|
);
|
||||||
export_to_env_if_not_present(
|
if let Some(limit) = experimental_limit_batched_tasks_total_size {
|
||||||
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
|
export_to_env_if_not_present(
|
||||||
experimental_limit_batched_tasks_total_size.to_string(),
|
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
|
||||||
);
|
limit.to_string(),
|
||||||
|
);
|
||||||
|
}
|
||||||
export_to_env_if_not_present(
|
export_to_env_if_not_present(
|
||||||
MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES,
|
MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES,
|
||||||
experimental_embedding_cache_entries.to_string(),
|
experimental_embedding_cache_entries.to_string(),
|
||||||
@@ -1273,10 +1278,6 @@ fn default_limit_batched_tasks() -> usize {
|
|||||||
usize::MAX
|
usize::MAX
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_limit_batched_tasks_total_size() -> Byte {
|
|
||||||
Byte::from_u64(u64::MAX)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn default_embedding_cache_entries() -> usize {
|
fn default_embedding_cache_entries() -> usize {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
use crate::search::{Personalize, SearchResult};
|
use std::time::Duration;
|
||||||
use meilisearch_types::{
|
|
||||||
error::{Code, ErrorCode, ResponseError},
|
use meilisearch_types::error::{Code, ErrorCode, ResponseError};
|
||||||
milli::TimeBudget,
|
use meilisearch_types::milli::TimeBudget;
|
||||||
};
|
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::time::Duration;
|
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
|
use crate::search::{Personalize, SearchResult};
|
||||||
|
|
||||||
const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank";
|
const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank";
|
||||||
const MAX_RETRIES: u32 = 10;
|
const MAX_RETRIES: u32 = 10;
|
||||||
|
|
||||||
|
|||||||
@@ -18,10 +18,9 @@ use serde::{Deserialize, Serialize};
|
|||||||
use utoipa::ToSchema;
|
use utoipa::ToSchema;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::search::SearchMetadata;
|
|
||||||
|
|
||||||
use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex};
|
use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex};
|
||||||
use crate::milli::vector::Embedding;
|
use crate::milli::vector::Embedding;
|
||||||
|
use crate::search::SearchMetadata;
|
||||||
|
|
||||||
pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0;
|
pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0;
|
||||||
|
|
||||||
|
|||||||
@@ -1339,3 +1339,117 @@ async fn get_document_with_vectors() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_fetch_documents_pagination_with_sorting() {
|
||||||
|
let server = Server::new_shared();
|
||||||
|
let index = server.unique_index();
|
||||||
|
let (task, _code) = index.create(None).await;
|
||||||
|
server.wait_task(task.uid()).await.succeeded();
|
||||||
|
|
||||||
|
// Set name as sortable attribute
|
||||||
|
let (task, code) = index.update_settings_sortable_attributes(json!(["name"])).await;
|
||||||
|
assert_eq!(code, 202);
|
||||||
|
server.wait_task(task.uid()).await.succeeded();
|
||||||
|
|
||||||
|
let documents = json!((0..50)
|
||||||
|
.map(|i| json!({"id": i, "name": format!("doc_{:05}", std::cmp::min(i, 5))}))
|
||||||
|
.collect::<Vec<_>>());
|
||||||
|
|
||||||
|
// Add documents as described in the bug report
|
||||||
|
let (task, code) = index.add_documents(documents, None).await;
|
||||||
|
assert_eq!(code, 202);
|
||||||
|
server.wait_task(task.uid()).await.succeeded();
|
||||||
|
|
||||||
|
// Request 1 (first page): offset 0, limit 2
|
||||||
|
let (response, code) = index
|
||||||
|
.fetch_documents(json!({
|
||||||
|
"offset": 0,
|
||||||
|
"limit": 2,
|
||||||
|
"sort": ["name:asc"]
|
||||||
|
}))
|
||||||
|
.await;
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
let results = response["results"].as_array().unwrap();
|
||||||
|
snapshot!(json_string!(results), @r###"
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"id": 0,
|
||||||
|
"name": "doc_00000"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 1,
|
||||||
|
"name": "doc_00001"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
"###);
|
||||||
|
|
||||||
|
// Request 2 (second page): offset 2, limit 2
|
||||||
|
let (response, code) = index
|
||||||
|
.fetch_documents(json!({
|
||||||
|
"offset": 2,
|
||||||
|
"limit": 2,
|
||||||
|
"sort": ["name:asc"]
|
||||||
|
}))
|
||||||
|
.await;
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
let results = response["results"].as_array().unwrap();
|
||||||
|
snapshot!(json_string!(results), @r###"
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"id": 2,
|
||||||
|
"name": "doc_00002"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 3,
|
||||||
|
"name": "doc_00003"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
"###);
|
||||||
|
|
||||||
|
// Request 3 (third page): offset 4, limit 2
|
||||||
|
let (response, code) = index
|
||||||
|
.fetch_documents(json!({
|
||||||
|
"offset": 4,
|
||||||
|
"limit": 2,
|
||||||
|
"sort": ["name:asc"]
|
||||||
|
}))
|
||||||
|
.await;
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
let results = response["results"].as_array().unwrap();
|
||||||
|
snapshot!(json_string!(results), @r###"
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"id": 4,
|
||||||
|
"name": "doc_00004"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 5,
|
||||||
|
"name": "doc_00005"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
"###);
|
||||||
|
|
||||||
|
// Request 4 (fourth page): offset 6, limit 2
|
||||||
|
let (response, code) = index
|
||||||
|
.fetch_documents(json!({
|
||||||
|
"offset": 6,
|
||||||
|
"limit": 2,
|
||||||
|
"sort": ["name:asc"]
|
||||||
|
}))
|
||||||
|
.await;
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
let results = response["results"].as_array().unwrap();
|
||||||
|
snapshot!(json_string!(results), @r###"
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"id": 6,
|
||||||
|
"name": "doc_00005"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 7,
|
||||||
|
"name": "doc_00005"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
"###);
|
||||||
|
}
|
||||||
|
|||||||
@@ -120,14 +120,16 @@ twox-hash = { version = "2.1.1", default-features = false, features = [
|
|||||||
] }
|
] }
|
||||||
geo-types = "0.7.16"
|
geo-types = "0.7.16"
|
||||||
zerometry = "0.3.0"
|
zerometry = "0.3.0"
|
||||||
|
bitpacking = "0.9.2"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
mimalloc = { version = "0.1.47", default-features = false }
|
|
||||||
# fixed version due to format breakages in v1.40
|
# fixed version due to format breakages in v1.40
|
||||||
insta = "=1.39.0"
|
insta = "=1.39.0"
|
||||||
|
mimalloc = { version = "0.1.47", default-features = false }
|
||||||
maplit = "1.0.2"
|
maplit = "1.0.2"
|
||||||
md5 = "0.7.0"
|
md5 = "0.7.0"
|
||||||
meili-snap = { path = "../meili-snap" }
|
meili-snap = { path = "../meili-snap" }
|
||||||
|
quickcheck = "1.0.3"
|
||||||
rand = { version = "0.8.5", features = ["small_rng"] }
|
rand = { version = "0.8.5", features = ["small_rng"] }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ impl Iterator for SortedDocumentsIterator<'_> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Otherwise don't directly iterate over children, skip them if we know we will go further
|
// Otherwise don't directly iterate over children, skip them if we know we will go further
|
||||||
let mut to_skip = n - 1;
|
let mut to_skip = n;
|
||||||
while to_skip > 0 {
|
while to_skip > 0 {
|
||||||
if let Err(e) = SortedDocumentsIterator::update_current(
|
if let Err(e) = SortedDocumentsIterator::update_current(
|
||||||
current_child,
|
current_child,
|
||||||
@@ -108,7 +108,7 @@ impl Iterator for SortedDocumentsIterator<'_> {
|
|||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
// The current iterator is large enough, so we can forward the call to it.
|
// The current iterator is large enough, so we can forward the call to it.
|
||||||
return inner.nth(to_skip + 1);
|
return inner.nth(to_skip);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,9 @@ pub use self::beu32_str_codec::BEU32StrCodec;
|
|||||||
pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
|
pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
|
||||||
pub use self::fst_set_codec::FstSetCodec;
|
pub use self::fst_set_codec::FstSetCodec;
|
||||||
pub use self::obkv_codec::ObkvCodec;
|
pub use self::obkv_codec::ObkvCodec;
|
||||||
pub use self::roaring_bitmap::{BoRoaringBitmapCodec, CboRoaringBitmapCodec, RoaringBitmapCodec};
|
pub use self::roaring_bitmap::{
|
||||||
|
BoRoaringBitmapCodec, CboRoaringBitmapCodec, DeCboRoaringBitmapCodec, RoaringBitmapCodec,
|
||||||
|
};
|
||||||
pub use self::roaring_bitmap_length::{
|
pub use self::roaring_bitmap_length::{
|
||||||
BoRoaringBitmapLenCodec, CboRoaringBitmapLenCodec, RoaringBitmapLenCodec,
|
BoRoaringBitmapLenCodec, CboRoaringBitmapLenCodec, RoaringBitmapLenCodec,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -19,8 +19,19 @@ pub const THRESHOLD: usize = 7;
|
|||||||
pub struct CboRoaringBitmapCodec;
|
pub struct CboRoaringBitmapCodec;
|
||||||
|
|
||||||
impl CboRoaringBitmapCodec {
|
impl CboRoaringBitmapCodec {
|
||||||
|
/// If the number of items (u32s) to encode is less than or equal to the threshold
|
||||||
|
/// it means that it would weigh the same or less than the RoaringBitmap
|
||||||
|
/// header, so we directly encode them using ByteOrder instead.
|
||||||
|
pub fn bitmap_serialize_as_raw_u32s(roaring: &RoaringBitmap) -> bool {
|
||||||
|
roaring.len() <= THRESHOLD as u64
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bytes_deserialize_as_raw_u32s(bytes: &[u8]) -> bool {
|
||||||
|
bytes.len() <= THRESHOLD * size_of::<u32>()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn serialized_size(roaring: &RoaringBitmap) -> usize {
|
pub fn serialized_size(roaring: &RoaringBitmap) -> usize {
|
||||||
if roaring.len() <= THRESHOLD as u64 {
|
if Self::bitmap_serialize_as_raw_u32s(roaring) {
|
||||||
roaring.len() as usize * size_of::<u32>()
|
roaring.len() as usize * size_of::<u32>()
|
||||||
} else {
|
} else {
|
||||||
roaring.serialized_size()
|
roaring.serialized_size()
|
||||||
@@ -35,10 +46,7 @@ impl CboRoaringBitmapCodec {
|
|||||||
roaring: &RoaringBitmap,
|
roaring: &RoaringBitmap,
|
||||||
mut writer: W,
|
mut writer: W,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
if roaring.len() <= THRESHOLD as u64 {
|
if Self::bitmap_serialize_as_raw_u32s(roaring) {
|
||||||
// If the number of items (u32s) to encode is less than or equal to the threshold
|
|
||||||
// it means that it would weigh the same or less than the RoaringBitmap
|
|
||||||
// header, so we directly encode them using ByteOrder instead.
|
|
||||||
for integer in roaring {
|
for integer in roaring {
|
||||||
writer.write_u32::<NativeEndian>(integer)?;
|
writer.write_u32::<NativeEndian>(integer)?;
|
||||||
}
|
}
|
||||||
@@ -51,7 +59,7 @@ impl CboRoaringBitmapCodec {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringBitmap> {
|
pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringBitmap> {
|
||||||
if bytes.len() <= THRESHOLD * size_of::<u32>() {
|
if Self::bytes_deserialize_as_raw_u32s(bytes) {
|
||||||
// If there is threshold or less than threshold integers that can fit into this array
|
// If there is threshold or less than threshold integers that can fit into this array
|
||||||
// of bytes it means that we used the ByteOrder codec serializer.
|
// of bytes it means that we used the ByteOrder codec serializer.
|
||||||
let mut bitmap = RoaringBitmap::new();
|
let mut bitmap = RoaringBitmap::new();
|
||||||
@@ -71,7 +79,7 @@ impl CboRoaringBitmapCodec {
|
|||||||
other: &RoaringBitmap,
|
other: &RoaringBitmap,
|
||||||
) -> io::Result<RoaringBitmap> {
|
) -> io::Result<RoaringBitmap> {
|
||||||
// See above `deserialize_from` method for implementation details.
|
// See above `deserialize_from` method for implementation details.
|
||||||
if bytes.len() <= THRESHOLD * size_of::<u32>() {
|
if Self::bytes_deserialize_as_raw_u32s(bytes) {
|
||||||
let mut bitmap = RoaringBitmap::new();
|
let mut bitmap = RoaringBitmap::new();
|
||||||
while let Ok(integer) = bytes.read_u32::<NativeEndian>() {
|
while let Ok(integer) = bytes.read_u32::<NativeEndian>() {
|
||||||
if other.contains(integer) {
|
if other.contains(integer) {
|
||||||
@@ -98,7 +106,7 @@ impl CboRoaringBitmapCodec {
|
|||||||
let mut vec = Vec::new();
|
let mut vec = Vec::new();
|
||||||
|
|
||||||
for bytes in slices {
|
for bytes in slices {
|
||||||
if bytes.as_ref().len() <= THRESHOLD * size_of::<u32>() {
|
if Self::bytes_deserialize_as_raw_u32s(bytes.as_ref()) {
|
||||||
let mut reader = bytes.as_ref();
|
let mut reader = bytes.as_ref();
|
||||||
while let Ok(integer) = reader.read_u32::<NativeEndian>() {
|
while let Ok(integer) = reader.read_u32::<NativeEndian>() {
|
||||||
vec.push(integer);
|
vec.push(integer);
|
||||||
@@ -112,6 +120,8 @@ impl CboRoaringBitmapCodec {
|
|||||||
vec.sort_unstable();
|
vec.sort_unstable();
|
||||||
vec.dedup();
|
vec.dedup();
|
||||||
|
|
||||||
|
// Be careful when modifying this condition,
|
||||||
|
// the rule must be the same everywhere
|
||||||
if vec.len() <= THRESHOLD {
|
if vec.len() <= THRESHOLD {
|
||||||
for integer in vec {
|
for integer in vec {
|
||||||
buffer.extend_from_slice(&integer.to_ne_bytes());
|
buffer.extend_from_slice(&integer.to_ne_bytes());
|
||||||
|
|||||||
@@ -0,0 +1,102 @@
|
|||||||
|
use std::borrow::Cow;
|
||||||
|
use std::io::{self, ErrorKind};
|
||||||
|
|
||||||
|
use heed::BoxedError;
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
|
use super::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
|
||||||
|
use super::de_roaring_bitmap_codec::DeRoaringBitmapCodec;
|
||||||
|
use crate::heed_codec::BytesDecodeOwned;
|
||||||
|
|
||||||
|
pub struct DeCboRoaringBitmapCodec;
|
||||||
|
|
||||||
|
impl DeCboRoaringBitmapCodec {
|
||||||
|
pub fn serialized_size_with_tmp_buffer(
|
||||||
|
bitmap: &RoaringBitmap,
|
||||||
|
tmp_buffer: &mut Vec<u32>,
|
||||||
|
) -> usize {
|
||||||
|
DeRoaringBitmapCodec::serialized_size_with_tmp_buffer(bitmap, tmp_buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Writes the delta-encoded compressed version of
|
||||||
|
/// the given roaring bitmap into the provided writer.
|
||||||
|
pub fn serialize_into<W: io::Write>(bitmap: &RoaringBitmap, writer: W) -> io::Result<()> {
|
||||||
|
let mut tmp_buffer = Vec::new();
|
||||||
|
Self::serialize_into_with_tmp_buffer(bitmap, writer, &mut tmp_buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as [Self::serialize_into] but accepts a buffer to avoid allocating one.
|
||||||
|
///
|
||||||
|
/// Note that we always serialize the bitmap with the delta-encoded compressed version.
|
||||||
|
pub fn serialize_into_with_tmp_buffer<W: io::Write>(
|
||||||
|
bitmap: &RoaringBitmap,
|
||||||
|
writer: W,
|
||||||
|
tmp_buffer: &mut Vec<u32>,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
// We are stuck with this format because the CboRoaringBitmapCodec decides to write
|
||||||
|
// raw and unencoded u32s, without a header when there is at most THRESHOLD elements.
|
||||||
|
if CboRoaringBitmapCodec::bitmap_serialize_as_raw_u32s(bitmap) {
|
||||||
|
CboRoaringBitmapCodec::serialize_into_writer(bitmap, writer)
|
||||||
|
} else {
|
||||||
|
DeRoaringBitmapCodec::serialize_into_with_tmp_buffer(bitmap, writer, tmp_buffer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the delta-decoded roaring bitmap from the compressed bytes.
|
||||||
|
pub fn deserialize_from(compressed: &[u8]) -> io::Result<RoaringBitmap> {
|
||||||
|
let mut tmp_buffer = Vec::new();
|
||||||
|
Self::deserialize_from_with_tmp_buffer(compressed, &mut tmp_buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as [Self::deserialize_from] but accepts a buffer to avoid allocating one.
|
||||||
|
///
|
||||||
|
/// It tries to decode the input by using the delta-decoded version and
|
||||||
|
/// if it fails, falls back to the CboRoaringBitmap version.
|
||||||
|
pub fn deserialize_from_with_tmp_buffer(
|
||||||
|
input: &[u8],
|
||||||
|
tmp_buffer: &mut Vec<u32>,
|
||||||
|
) -> io::Result<RoaringBitmap> {
|
||||||
|
// The input is too short to be a valid delta-decoded bitmap.
|
||||||
|
// We fall back to the CboRoaringBitmap version with raw u32s.
|
||||||
|
if CboRoaringBitmapCodec::bytes_deserialize_as_raw_u32s(input) {
|
||||||
|
return CboRoaringBitmapCodec::deserialize_from(input);
|
||||||
|
}
|
||||||
|
|
||||||
|
match DeRoaringBitmapCodec::deserialize_from_with_tmp_buffer(input, tmp_buffer) {
|
||||||
|
Ok(bitmap) => Ok(bitmap),
|
||||||
|
// If the error kind is Other it means that the delta-decoder found
|
||||||
|
// an invalid magic header. We fall back to the CboRoaringBitmap version.
|
||||||
|
Err(e) if e.kind() == ErrorKind::Other => {
|
||||||
|
CboRoaringBitmapCodec::deserialize_from(input)
|
||||||
|
}
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl heed::BytesDecode<'_> for DeCboRoaringBitmapCodec {
|
||||||
|
type DItem = RoaringBitmap;
|
||||||
|
|
||||||
|
fn bytes_decode(bytes: &[u8]) -> Result<Self::DItem, BoxedError> {
|
||||||
|
Self::deserialize_from(bytes).map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BytesDecodeOwned for DeCboRoaringBitmapCodec {
|
||||||
|
type DItem = RoaringBitmap;
|
||||||
|
|
||||||
|
fn bytes_decode_owned(bytes: &[u8]) -> Result<Self::DItem, BoxedError> {
|
||||||
|
Self::deserialize_from(bytes).map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl heed::BytesEncode<'_> for DeCboRoaringBitmapCodec {
|
||||||
|
type EItem = RoaringBitmap;
|
||||||
|
|
||||||
|
fn bytes_encode(item: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> {
|
||||||
|
// let mut vec = Vec::with_capacity(Self::serialized_size(item));
|
||||||
|
// Self::serialize_into_vec(item, &mut vec);
|
||||||
|
// Ok(Cow::Owned(vec))
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,377 @@
|
|||||||
|
use std::io::{self, ErrorKind};
|
||||||
|
use std::mem::{self, size_of, size_of_val};
|
||||||
|
|
||||||
|
use bitpacking::{BitPacker, BitPacker1x, BitPacker4x, BitPacker8x};
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
|
/// The magic header for our custom encoding format
|
||||||
|
const MAGIC_HEADER: u16 = 36869;
|
||||||
|
|
||||||
|
pub struct DeRoaringBitmapCodec;
|
||||||
|
|
||||||
|
// TODO reintroduce:
|
||||||
|
// - serialized_size?
|
||||||
|
// - serialize_into_vec
|
||||||
|
// - intersection_with_serialized
|
||||||
|
// - merge_into
|
||||||
|
// - merge_deladd_into
|
||||||
|
impl DeRoaringBitmapCodec {
|
||||||
|
/// Returns the serialized size of the given roaring bitmap with the delta encoding format.
|
||||||
|
pub fn serialized_size_with_tmp_buffer(
|
||||||
|
bitmap: &RoaringBitmap,
|
||||||
|
tmp_buffer: &mut Vec<u32>,
|
||||||
|
) -> usize {
|
||||||
|
let mut size = 2; // u16 magic header
|
||||||
|
|
||||||
|
let bitpacker8x = BitPacker8x::new();
|
||||||
|
let bitpacker4x = BitPacker4x::new();
|
||||||
|
let bitpacker1x = BitPacker1x::new();
|
||||||
|
|
||||||
|
// This temporary buffer is used to store each chunk of decompressed u32s.
|
||||||
|
tmp_buffer.resize(BitPacker8x::BLOCK_LEN, 0u32);
|
||||||
|
let decompressed = &mut tmp_buffer[..];
|
||||||
|
|
||||||
|
let mut buffer_index = 0;
|
||||||
|
let mut initial = None;
|
||||||
|
// We initially collect all the integers into a flat buffer of the size
|
||||||
|
// of the largest bitpacker. We encode them with it until we don't have
|
||||||
|
// enough of them...
|
||||||
|
for n in bitmap {
|
||||||
|
decompressed[buffer_index] = n;
|
||||||
|
buffer_index += 1;
|
||||||
|
if buffer_index == BitPacker8x::BLOCK_LEN {
|
||||||
|
let num_bits = bitpacker8x.num_bits_strictly_sorted(initial, decompressed);
|
||||||
|
let compressed_len = BitPacker8x::compressed_block_size(num_bits);
|
||||||
|
size += 1; // u8 chunk header
|
||||||
|
size += compressed_len; // compressed data length
|
||||||
|
initial = Some(n);
|
||||||
|
buffer_index = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ...We then switch to a smaller bitpacker to encode the remaining chunks...
|
||||||
|
let decompressed = &decompressed[..buffer_index];
|
||||||
|
let mut chunks = decompressed.chunks_exact(BitPacker4x::BLOCK_LEN);
|
||||||
|
for decompressed in chunks.by_ref() {
|
||||||
|
let num_bits = bitpacker4x.num_bits_strictly_sorted(initial, decompressed);
|
||||||
|
let compressed_len = BitPacker4x::compressed_block_size(num_bits);
|
||||||
|
size += 1; // u8 chunk header
|
||||||
|
size += compressed_len; // compressed data length
|
||||||
|
initial = decompressed.iter().last().copied();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ...And so on...
|
||||||
|
let decompressed = chunks.remainder();
|
||||||
|
let mut chunks = decompressed.chunks_exact(BitPacker1x::BLOCK_LEN);
|
||||||
|
for decompressed in chunks.by_ref() {
|
||||||
|
let num_bits = bitpacker1x.num_bits_strictly_sorted(initial, decompressed);
|
||||||
|
let compressed_len = BitPacker1x::compressed_block_size(num_bits);
|
||||||
|
size += 1; // u8 chunk header
|
||||||
|
size += compressed_len; // compressed data length
|
||||||
|
initial = decompressed.iter().last().copied();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ...Until we don't have any small enough bitpacker. We put them raw
|
||||||
|
// at the end of out buffer with a header indicating the matter.
|
||||||
|
let decompressed = chunks.remainder();
|
||||||
|
if !decompressed.is_empty() {
|
||||||
|
size += 1; // u8 chunk header
|
||||||
|
size += decompressed.len() * mem::size_of::<u32>(); // remaining uncompressed u32s
|
||||||
|
}
|
||||||
|
|
||||||
|
size
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Writes the delta-encoded compressed version of
|
||||||
|
/// the given roaring bitmap into the provided writer.
|
||||||
|
pub fn serialize_into<W: io::Write>(bitmap: &RoaringBitmap, writer: W) -> io::Result<()> {
|
||||||
|
let mut tmp_buffer = Vec::new();
|
||||||
|
Self::serialize_into_with_tmp_buffer(bitmap, writer, &mut tmp_buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as [Self::serialize_into] but accepts a buffer to avoid allocating one.
|
||||||
|
pub fn serialize_into_with_tmp_buffer<W: io::Write>(
|
||||||
|
bitmap: &RoaringBitmap,
|
||||||
|
mut writer: W,
|
||||||
|
tmp_buffer: &mut Vec<u32>,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
// Insert the magic header
|
||||||
|
writer.write_all(&MAGIC_HEADER.to_ne_bytes())?;
|
||||||
|
|
||||||
|
let bitpacker8x = BitPacker8x::new();
|
||||||
|
let bitpacker4x = BitPacker4x::new();
|
||||||
|
let bitpacker1x = BitPacker1x::new();
|
||||||
|
|
||||||
|
// This temporary buffer is used to store each chunk of decompressed and
|
||||||
|
// compressed and delta-encoded u32s. We need room for the decompressed
|
||||||
|
// u32s coming from the roaring bitmap, the compressed output that can
|
||||||
|
// be as large as the decompressed u32s, and the chunk header.
|
||||||
|
tmp_buffer.resize((BitPacker8x::BLOCK_LEN * 2) + 1, 0u32);
|
||||||
|
let (decompressed, compressed) = tmp_buffer.split_at_mut(BitPacker8x::BLOCK_LEN);
|
||||||
|
let compressed = bytemuck::cast_slice_mut(compressed);
|
||||||
|
|
||||||
|
let mut buffer_index = 0;
|
||||||
|
let mut initial = None;
|
||||||
|
// We initially collect all the integers into a flat buffer of the size
|
||||||
|
// of the largest bitpacker. We encode them with it until we don't have
|
||||||
|
// enough of them...
|
||||||
|
for n in bitmap {
|
||||||
|
decompressed[buffer_index] = n;
|
||||||
|
buffer_index += 1;
|
||||||
|
if buffer_index == BitPacker8x::BLOCK_LEN {
|
||||||
|
let output = encode_with_packer(&bitpacker8x, decompressed, initial, compressed);
|
||||||
|
writer.write_all(output)?;
|
||||||
|
initial = Some(n);
|
||||||
|
buffer_index = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ...We then switch to a smaller bitpacker to encode the remaining chunks...
|
||||||
|
let decompressed = &decompressed[..buffer_index];
|
||||||
|
let mut chunks = decompressed.chunks_exact(BitPacker4x::BLOCK_LEN);
|
||||||
|
for decompressed in chunks.by_ref() {
|
||||||
|
let output = encode_with_packer(&bitpacker4x, decompressed, initial, compressed);
|
||||||
|
writer.write_all(output)?;
|
||||||
|
initial = decompressed.iter().last().copied();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ...And so on...
|
||||||
|
let decompressed = chunks.remainder();
|
||||||
|
let mut chunks = decompressed.chunks_exact(BitPacker1x::BLOCK_LEN);
|
||||||
|
for decompressed in chunks.by_ref() {
|
||||||
|
let output = encode_with_packer(&bitpacker1x, decompressed, initial, compressed);
|
||||||
|
writer.write_all(output)?;
|
||||||
|
initial = decompressed.iter().last().copied();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ...Until we don't have any small enough bitpacker. We put them raw
|
||||||
|
// at the end of out buffer with a header indicating the matter.
|
||||||
|
let decompressed = chunks.remainder();
|
||||||
|
if !decompressed.is_empty() {
|
||||||
|
let header = encode_chunk_header(BitPackerLevel::None, u32::BITS as u8);
|
||||||
|
// Note: Not convinced about the performance of writing a single
|
||||||
|
// byte followed by a larger write. However, we will use this
|
||||||
|
// codec with a BufWriter or directly with a Vec of bytes.
|
||||||
|
writer.write_all(&[header])?;
|
||||||
|
writer.write_all(bytemuck::cast_slice(decompressed))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the delta-decoded roaring bitmap from the compressed bytes.
|
||||||
|
pub fn deserialize_from(compressed: &[u8]) -> io::Result<RoaringBitmap> {
|
||||||
|
let mut tmp_buffer = Vec::new();
|
||||||
|
Self::deserialize_from_with_tmp_buffer(compressed, &mut tmp_buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as [Self::deserialize_from] but accepts a buffer to avoid allocating one.
|
||||||
|
pub fn deserialize_from_with_tmp_buffer(
|
||||||
|
input: &[u8],
|
||||||
|
tmp_buffer: &mut Vec<u32>,
|
||||||
|
) -> io::Result<RoaringBitmap> {
|
||||||
|
let Some((header, mut compressed)) = input.split_at_checked(size_of_val(&MAGIC_HEADER))
|
||||||
|
else {
|
||||||
|
return Err(io::Error::new(ErrorKind::UnexpectedEof, "expecting a two-bytes header"));
|
||||||
|
};
|
||||||
|
|
||||||
|
// Safety: This unwrap cannot happen as the header buffer is the right size
|
||||||
|
let header = u16::from_ne_bytes(header.try_into().unwrap());
|
||||||
|
|
||||||
|
if header != MAGIC_HEADER {
|
||||||
|
return Err(io::Error::other("invalid header value"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let bitpacker8x = BitPacker8x::new();
|
||||||
|
let bitpacker4x = BitPacker4x::new();
|
||||||
|
let bitpacker1x = BitPacker1x::new();
|
||||||
|
|
||||||
|
let mut bitmap = RoaringBitmap::new();
|
||||||
|
tmp_buffer.resize(BitPacker8x::BLOCK_LEN, 0u32);
|
||||||
|
let decompressed = &mut tmp_buffer[..];
|
||||||
|
let mut initial = None;
|
||||||
|
|
||||||
|
while let Some((&chunk_header, encoded)) = compressed.split_first() {
|
||||||
|
let (level, num_bits) = decode_chunk_header(chunk_header);
|
||||||
|
let (bytes_read, decompressed) = match level {
|
||||||
|
BitPackerLevel::None => {
|
||||||
|
if num_bits != u32::BITS as u8 {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
ErrorKind::InvalidData,
|
||||||
|
"invalid number of bits to encode non-compressed u32s",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let chunks = encoded.chunks_exact(size_of::<u32>());
|
||||||
|
if !chunks.remainder().is_empty() {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
"expecting last chunk to be a multiple of the size of an u32",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let integers = chunks
|
||||||
|
// safety: This unwrap cannot happen as
|
||||||
|
// the size of u32 is set correctly.
|
||||||
|
.map(|b| b.try_into().unwrap())
|
||||||
|
.map(u32::from_ne_bytes);
|
||||||
|
|
||||||
|
bitmap
|
||||||
|
.append(integers)
|
||||||
|
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;
|
||||||
|
|
||||||
|
// This is basically always the last chunk that exists in
|
||||||
|
// this delta-encoded format as the raw u32s are appended
|
||||||
|
// when there is not enough of them to fit in a bitpacker.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
BitPackerLevel::BitPacker1x => {
|
||||||
|
decode_with_packer(&bitpacker1x, decompressed, initial, encoded, num_bits)
|
||||||
|
}
|
||||||
|
BitPackerLevel::BitPacker4x => {
|
||||||
|
decode_with_packer(&bitpacker4x, decompressed, initial, encoded, num_bits)
|
||||||
|
}
|
||||||
|
BitPackerLevel::BitPacker8x => {
|
||||||
|
decode_with_packer(&bitpacker8x, decompressed, initial, encoded, num_bits)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
initial = decompressed.iter().last().copied();
|
||||||
|
// TODO investigate perf
|
||||||
|
// Safety: Bitpackers cannot output unsorter integers when
|
||||||
|
// used with the compress_strictly_sorted function.
|
||||||
|
bitmap.append(decompressed.iter().copied()).unwrap();
|
||||||
|
// What the delta-decoding read plus the chunk header size
|
||||||
|
compressed = &compressed[bytes_read + 1..];
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(bitmap)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Takes a strickly sorted list of u32s and outputs delta-encoded
|
||||||
|
/// bytes with a chunk header. We expect the output buffer to be
|
||||||
|
/// at least BLOCK_LEN + 1.
|
||||||
|
fn encode_with_packer<'c, B: BitPackerExt>(
|
||||||
|
bitpacker: &B,
|
||||||
|
decompressed: &[u32],
|
||||||
|
initial: Option<u32>,
|
||||||
|
output: &'c mut [u8],
|
||||||
|
) -> &'c [u8] {
|
||||||
|
let num_bits = bitpacker.num_bits_strictly_sorted(initial, decompressed);
|
||||||
|
let compressed_len = B::compressed_block_size(num_bits);
|
||||||
|
let chunk_header = encode_chunk_header(B::level(), num_bits);
|
||||||
|
let buffer = &mut output[..compressed_len + 1];
|
||||||
|
// Safety: The buffer is at least one byte
|
||||||
|
let (header_in_buffer, encoded) = buffer.split_first_mut().unwrap();
|
||||||
|
*header_in_buffer = chunk_header;
|
||||||
|
bitpacker.compress_strictly_sorted(initial, decompressed, encoded, num_bits);
|
||||||
|
buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the number of bytes read and the decoded unsigned integers.
|
||||||
|
fn decode_with_packer<'d, B: BitPacker>(
|
||||||
|
bitpacker: &B,
|
||||||
|
decompressed: &'d mut [u32],
|
||||||
|
initial: Option<u32>,
|
||||||
|
compressed: &[u8],
|
||||||
|
num_bits: u8,
|
||||||
|
) -> (usize, &'d [u32]) {
|
||||||
|
let decompressed = &mut decompressed[..B::BLOCK_LEN];
|
||||||
|
let read = bitpacker.decompress_strictly_sorted(initial, compressed, decompressed, num_bits);
|
||||||
|
(read, decompressed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An identifier for the bitpacker to be able
|
||||||
|
/// to correctly decode the compressed integers.
|
||||||
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
#[repr(u8)]
|
||||||
|
enum BitPackerLevel {
|
||||||
|
/// The remaining bytes are raw little endian encoded u32s.
|
||||||
|
None,
|
||||||
|
/// The remaining bits are encoded using a `BitPacker1x`.
|
||||||
|
BitPacker1x,
|
||||||
|
/// The remaining bits are encoded using a `BitPacker4x`.
|
||||||
|
BitPacker4x,
|
||||||
|
/// The remaining bits are encoded using a `BitPacker8x`.
|
||||||
|
BitPacker8x,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the chunk header based on the bitpacker level
|
||||||
|
/// and the number of bits to encode the list of integers.
|
||||||
|
fn encode_chunk_header(level: BitPackerLevel, num_bits: u8) -> u8 {
|
||||||
|
debug_assert!(num_bits as u32 <= 2_u32.pow(6));
|
||||||
|
let level = level as u8;
|
||||||
|
debug_assert!(level <= 3);
|
||||||
|
num_bits | (level << 6)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decodes the chunk header and output the bitpacker level
|
||||||
|
/// and the number of bits to decode the following bytes.
|
||||||
|
fn decode_chunk_header(data: u8) -> (BitPackerLevel, u8) {
|
||||||
|
let num_bits = data & 0b00111111;
|
||||||
|
let level = match data >> 6 {
|
||||||
|
0 => BitPackerLevel::None,
|
||||||
|
1 => BitPackerLevel::BitPacker1x,
|
||||||
|
2 => BitPackerLevel::BitPacker4x,
|
||||||
|
3 => BitPackerLevel::BitPacker8x,
|
||||||
|
invalid => panic!("Invalid bitpacker level: {invalid}"),
|
||||||
|
};
|
||||||
|
debug_assert!(num_bits as u32 <= 2_u32.pow(6));
|
||||||
|
(level, num_bits)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A simple helper trait to get the BitPackerLevel
|
||||||
|
/// and correctly generate the chunk header.
|
||||||
|
trait BitPackerExt: BitPacker {
|
||||||
|
/// Returns the level of the bitpacker: an identifier to be
|
||||||
|
/// able to decode the numbers with the right bitpacker.
|
||||||
|
fn level() -> BitPackerLevel;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BitPackerExt for BitPacker8x {
|
||||||
|
fn level() -> BitPackerLevel {
|
||||||
|
BitPackerLevel::BitPacker8x
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BitPackerExt for BitPacker4x {
|
||||||
|
fn level() -> BitPackerLevel {
|
||||||
|
BitPackerLevel::BitPacker4x
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BitPackerExt for BitPacker1x {
|
||||||
|
fn level() -> BitPackerLevel {
|
||||||
|
BitPackerLevel::BitPacker1x
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use quickcheck::quickcheck;
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
|
use super::DeRoaringBitmapCodec;
|
||||||
|
|
||||||
|
quickcheck! {
|
||||||
|
fn qc_random(xs: Vec<u32>) -> bool {
|
||||||
|
let bitmap = RoaringBitmap::from_iter(xs);
|
||||||
|
let mut compressed = Vec::new();
|
||||||
|
DeRoaringBitmapCodec::serialize_into(&bitmap, &mut compressed).unwrap();
|
||||||
|
let decompressed = DeRoaringBitmapCodec::deserialize_from(&compressed[..]).unwrap();
|
||||||
|
decompressed == bitmap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
quickcheck! {
|
||||||
|
fn qc_random_check_serialized_size(xs: Vec<u32>) -> bool {
|
||||||
|
let bitmap = RoaringBitmap::from_iter(xs);
|
||||||
|
let mut compressed = Vec::new();
|
||||||
|
let mut tmp_buffer = Vec::new();
|
||||||
|
DeRoaringBitmapCodec::serialize_into(&bitmap, &mut compressed).unwrap();
|
||||||
|
let expected_len = DeRoaringBitmapCodec::serialized_size_with_tmp_buffer(&bitmap, &mut tmp_buffer);
|
||||||
|
compressed.len() == expected_len
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,7 +1,10 @@
|
|||||||
mod bo_roaring_bitmap_codec;
|
mod bo_roaring_bitmap_codec;
|
||||||
pub mod cbo_roaring_bitmap_codec;
|
pub mod cbo_roaring_bitmap_codec;
|
||||||
|
pub mod de_cbo_roaring_bitmap_codec;
|
||||||
|
mod de_roaring_bitmap_codec;
|
||||||
mod roaring_bitmap_codec;
|
mod roaring_bitmap_codec;
|
||||||
|
|
||||||
pub use self::bo_roaring_bitmap_codec::BoRoaringBitmapCodec;
|
pub use self::bo_roaring_bitmap_codec::BoRoaringBitmapCodec;
|
||||||
pub use self::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
|
pub use self::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
|
||||||
|
pub use self::de_cbo_roaring_bitmap_codec::DeCboRoaringBitmapCodec;
|
||||||
pub use self::roaring_bitmap_codec::RoaringBitmapCodec;
|
pub use self::roaring_bitmap_codec::RoaringBitmapCodec;
|
||||||
|
|||||||
Reference in New Issue
Block a user