Compare commits

..

9 Commits

Author SHA1 Message Date
Kerollmops
ffdd767e3d WIP hide the CboRoaringBitmapCodec to replace it more conveniently 2025-12-04 18:31:48 +01:00
Kerollmops
e9f2faf61d Expose a CLI parameter to enable or disable delta-encoding of bitmaps 2025-12-04 18:31:48 +01:00
Kerollmops
b878fcbf91 Use unsafe blocks to set env variables 2025-12-04 18:31:48 +01:00
Kerollmops
d18f4aea4a Support conditionally serializing with new format 2025-12-04 18:31:48 +01:00
Kerollmops
673d7e3a25 Simplify the computation of the raw u32s bytes size 2025-12-04 18:31:47 +01:00
Clément Renault
e2832eba95 WIP missing DeCboRoaringBitmapCodec::bytes_encode implem 2025-12-04 18:31:47 +01:00
Kerollmops
a57938c49e Remove the unwraps, asserts and return actual io errors 2025-12-04 18:31:47 +01:00
Kerollmops
aaee5db5e1 Move to a two bytes header 2025-12-04 18:31:47 +01:00
Kerollmops
be0b41753c Introduce a first working version of the DeBitmapCodec 2025-12-04 18:31:47 +01:00
50 changed files with 1137 additions and 418 deletions

View File

@@ -18,7 +18,7 @@ jobs:
timeout-minutes: 180 # 3h
steps:
- uses: actions/checkout@v5
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
with:
profile: minimal

View File

@@ -66,7 +66,7 @@ jobs:
fetch-depth: 0 # fetch full history to be able to get main commit sha
ref: ${{ steps.comment-branch.outputs.head_ref }}
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Run benchmarks on PR ${{ github.event.issue.id }}
run: |

View File

@@ -12,7 +12,7 @@ jobs:
timeout-minutes: 180 # 3h
steps:
- uses: actions/checkout@v5
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
# Run benchmarks
- name: Run benchmarks - Dataset ${BENCH_NAME} - Branch main - Commit ${{ github.sha }}

View File

@@ -18,7 +18,7 @@ jobs:
timeout-minutes: 4320 # 72h
steps:
- uses: actions/checkout@v5
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
with:
profile: minimal

View File

@@ -44,7 +44,7 @@ jobs:
exit 1
fi
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
with:
profile: minimal

View File

@@ -16,7 +16,7 @@ jobs:
timeout-minutes: 4320 # 72h
steps:
- uses: actions/checkout@v5
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
with:
profile: minimal

View File

@@ -15,7 +15,7 @@ jobs:
runs-on: benchmarks
steps:
- uses: actions/checkout@v5
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
with:
profile: minimal

View File

@@ -15,7 +15,7 @@ jobs:
runs-on: benchmarks
steps:
- uses: actions/checkout@v5
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
with:
profile: minimal

View File

@@ -15,7 +15,7 @@ jobs:
runs-on: benchmarks
steps:
- uses: actions/checkout@v5
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
with:
profile: minimal

View File

@@ -3,7 +3,7 @@ name: Look for flaky tests
on:
workflow_dispatch:
schedule:
- cron: "0 4 * * *" # Every day at 4:00AM
- cron: '0 4 * * *' # Every day at 4:00AM
jobs:
flaky:
@@ -23,7 +23,7 @@ jobs:
run: |
apt-get update && apt-get install -y curl
apt-get install build-essential -y
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Install cargo-flaky
run: cargo install cargo-flaky
- name: Run cargo flaky in the dumps

View File

@@ -12,7 +12,7 @@ jobs:
timeout-minutes: 4320 # 72h
steps:
- uses: actions/checkout@v5
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
# Run benchmarks
- name: Run the fuzzer

View File

@@ -31,7 +31,7 @@ jobs:
sudo rm -rf "/usr/share/dotnet" || true
sudo rm -rf "/usr/local/lib/android" || true
sudo rm -rf "/usr/local/share/boost" || true
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Install cargo-deb
run: cargo install cargo-deb
- uses: actions/checkout@v5

View File

@@ -76,7 +76,7 @@ jobs:
needs: check-version
steps:
- uses: actions/checkout@v5
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Build
run: cargo build --release --locked ${{ matrix.feature-flag }} ${{ matrix.extra-args }}
# No need to upload binaries for dry run (cron or workflow_dispatch)

View File

@@ -34,7 +34,7 @@ jobs:
- name: check free space after
run: df -h
- name: Setup test with Rust stable
uses: dtolnay/rust-toolchain@1.91.1
uses: dtolnay/rust-toolchain@1.89
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.8.0
with:
@@ -63,7 +63,7 @@ jobs:
- uses: actions/checkout@v5
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.8.0
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Run cargo build without any default features
uses: actions-rs/cargo@v1
with:
@@ -87,7 +87,7 @@ jobs:
sudo rm -rf "/usr/share/dotnet" || true
sudo rm -rf "/usr/local/lib/android" || true
sudo rm -rf "/usr/local/share/boost" || true
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Run cargo build with almost all features
run: |
cargo build --workspace --locked --features "$(cargo xtask list-features --exclude-feature cuda,test-ollama)"
@@ -145,7 +145,7 @@ jobs:
sudo rm -rf "/usr/share/dotnet" || true
sudo rm -rf "/usr/local/lib/android" || true
sudo rm -rf "/usr/local/share/boost" || true
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Run cargo tree without default features and check lindera is not present
run: |
if cargo tree -f '{p} {f}' -e normal --no-default-features | grep -qz lindera; then
@@ -167,7 +167,7 @@ jobs:
sudo rm -rf "/usr/share/dotnet" || true
sudo rm -rf "/usr/local/lib/android" || true
sudo rm -rf "/usr/local/share/boost" || true
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.8.0
- name: Build
@@ -187,7 +187,7 @@ jobs:
sudo rm -rf "/usr/share/dotnet" || true
sudo rm -rf "/usr/local/lib/android" || true
sudo rm -rf "/usr/local/share/boost" || true
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
with:
components: clippy
- name: Cache dependencies
@@ -209,7 +209,7 @@ jobs:
sudo rm -rf "/usr/share/dotnet" || true
sudo rm -rf "/usr/local/lib/android" || true
sudo rm -rf "/usr/local/share/boost" || true
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
with:
components: rustfmt
- name: Cache dependencies
@@ -235,7 +235,7 @@ jobs:
sudo rm -rf "/usr/share/dotnet" || true
sudo rm -rf "/usr/local/lib/android" || true
sudo rm -rf "/usr/local/share/boost" || true
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.8.0
- name: Run declarative tests

View File

@@ -24,7 +24,7 @@ jobs:
sudo rm -rf "/usr/share/dotnet" || true
sudo rm -rf "/usr/local/lib/android" || true
sudo rm -rf "/usr/local/share/boost" || true
- uses: dtolnay/rust-toolchain@1.91.1
- uses: dtolnay/rust-toolchain@1.89
- name: Install sd
run: cargo install sd
- name: Update Cargo.toml file

601
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -23,7 +23,7 @@ members = [
]
[workspace.package]
version = "1.29.0"
version = "1.28.2"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@@ -107,14 +107,19 @@ impl Settings<Unchecked> {
}
}
#[derive(Default, Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub enum Setting<T> {
Set(T),
Reset,
#[default]
NotSet,
}
impl<T> Default for Setting<T> {
fn default() -> Self {
Self::NotSet
}
}
impl<T> Setting<T> {
pub const fn is_not_set(&self) -> bool {
matches!(self, Self::NotSet)

View File

@@ -161,14 +161,19 @@ pub struct Facets {
pub min_level_size: Option<NonZeroUsize>,
}
#[derive(Default, Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Setting<T> {
Set(T),
Reset,
#[default]
NotSet,
}
impl<T> Default for Setting<T> {
fn default() -> Self {
Self::NotSet
}
}
impl<T> Setting<T> {
pub fn map<U, F>(self, f: F) -> Setting<U>
where

View File

@@ -1,7 +1,9 @@
use std::fmt::{self, Display, Formatter};
use std::marker::PhantomData;
use std::str::FromStr;
use serde::Deserialize;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer};
use uuid::Uuid;
use super::settings::{Settings, Unchecked};
@@ -80,3 +82,59 @@ impl Display for IndexUidFormatError {
}
impl std::error::Error for IndexUidFormatError {}
/// A type that tries to match either a star (*) or
/// any other thing that implements `FromStr`.
#[derive(Debug)]
#[cfg_attr(test, derive(serde::Serialize))]
pub enum StarOr<T> {
Star,
Other(T),
}
impl<'de, T, E> Deserialize<'de> for StarOr<T>
where
T: FromStr<Err = E>,
E: Display,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
/// Serde can't differentiate between `StarOr::Star` and `StarOr::Other` without a tag.
/// Simply using `#[serde(untagged)]` + `#[serde(rename="*")]` will lead to attempting to
/// deserialize everything as a `StarOr::Other`, including "*".
/// [`#[serde(other)]`](https://serde.rs/variant-attrs.html#other) might have helped but is
/// not supported on untagged enums.
struct StarOrVisitor<T>(PhantomData<T>);
impl<T, FE> Visitor<'_> for StarOrVisitor<T>
where
T: FromStr<Err = FE>,
FE: Display,
{
type Value = StarOr<T>;
fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("a string")
}
fn visit_str<SE>(self, v: &str) -> Result<Self::Value, SE>
where
SE: serde::de::Error,
{
match v {
"*" => Ok(StarOr::Star),
v => {
let other = FromStr::from_str(v).map_err(|e: T::Err| {
SE::custom(format!("Invalid `other` value: {}", e))
})?;
Ok(StarOr::Other(other))
}
}
}
}
deserializer.deserialize_str(StarOrVisitor(PhantomData))
}
}

View File

@@ -192,14 +192,19 @@ pub struct Facets {
pub min_level_size: Option<NonZeroUsize>,
}
#[derive(Default, Debug, Clone, PartialEq, Eq, Copy)]
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum Setting<T> {
Set(T),
Reset,
#[default]
NotSet,
}
impl<T> Default for Setting<T> {
fn default() -> Self {
Self::NotSet
}
}
impl<T> Setting<T> {
pub fn set(self) -> Option<T> {
match self {

View File

@@ -47,15 +47,20 @@ pub struct Settings<T> {
pub _kind: PhantomData<T>,
}
#[derive(Default, Debug, Clone, PartialEq, Eq, Copy)]
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
#[cfg_attr(test, derive(serde::Serialize))]
pub enum Setting<T> {
Set(T),
Reset,
#[default]
NotSet,
}
impl<T> Default for Setting<T> {
fn default() -> Self {
Self::NotSet
}
}
impl<T> Setting<T> {
pub fn set(self) -> Option<T> {
match self {

View File

@@ -322,7 +322,7 @@ impl From<Task> for TaskView {
_ => None,
});
let duration = finished_at.zip(started_at).map(|(tf, ts)| tf - ts);
let duration = finished_at.zip(started_at).map(|(tf, ts)| (tf - ts));
Self {
uid: id,

View File

@@ -502,11 +502,13 @@ impl Queue {
*before_finished_at,
)?;
batches = if query.reverse.unwrap_or_default() {
batches.into_iter().take(*limit).collect()
} else {
batches.into_iter().rev().take(*limit).collect()
};
if let Some(limit) = limit {
batches = if query.reverse.unwrap_or_default() {
batches.into_iter().take(*limit as usize).collect()
} else {
batches.into_iter().rev().take(*limit as usize).collect()
};
}
Ok(batches)
}
@@ -600,8 +602,11 @@ impl Queue {
Box::new(batches.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
};
let batches =
self.batches.get_existing_batches(rtxn, batches.take(query.limit), processing)?;
let batches = self.batches.get_existing_batches(
rtxn,
batches.take(query.limit.unwrap_or(u32::MAX) as usize),
processing,
)?;
Ok((batches, total))
}

View File

@@ -28,21 +28,21 @@ fn query_batches_from_and_limit() {
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query { limit: 0, ..Default::default() };
let query = Query { limit: Some(0), ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[]");
let query = Query { limit: 1, ..Default::default() };
let query = Query { limit: Some(1), ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[2,]");
let query = Query { limit: 2, ..Default::default() };
let query = Query { limit: Some(2), ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
@@ -63,14 +63,14 @@ fn query_batches_from_and_limit() {
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[0,1,2,]");
let query = Query { from: Some(1), limit: 1, ..Default::default() };
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[1,]");
let query = Query { from: Some(1), limit: 2, ..Default::default() };
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)

View File

@@ -31,9 +31,6 @@ use crate::{Error, IndexSchedulerOptions, Result, TaskId};
/// The number of database used by queue itself
const NUMBER_OF_DATABASES: u32 = 1;
/// The default limit for pagination
const DEFAULT_LIMIT: usize = 20;
/// Database const names for the `IndexScheduler`.
mod db_name {
pub const BATCH_TO_TASKS_MAPPING: &str = "batch-to-tasks-mapping";
@@ -43,11 +40,11 @@ mod db_name {
///
/// An empty/default query (where each field is set to `None`) matches all tasks.
/// Each non-null field restricts the set of tasks further.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct Query {
/// The maximum number of tasks to be matched. Defaults to 20.
pub limit: usize,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched. Defaults to 0.
/// The maximum number of tasks to be matched
pub limit: Option<u32>,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
pub from: Option<u32>,
/// The order used to return the tasks. By default the newest tasks are returned first and the boolean is `false`.
pub reverse: Option<bool>,
@@ -86,29 +83,32 @@ pub struct Query {
pub after_finished_at: Option<OffsetDateTime>,
}
impl Default for Query {
fn default() -> Self {
Self {
limit: DEFAULT_LIMIT,
from: Default::default(),
reverse: Default::default(),
uids: Default::default(),
batch_uids: Default::default(),
statuses: Default::default(),
types: Default::default(),
index_uids: Default::default(),
canceled_by: Default::default(),
before_enqueued_at: Default::default(),
after_enqueued_at: Default::default(),
before_started_at: Default::default(),
after_started_at: Default::default(),
before_finished_at: Default::default(),
after_finished_at: Default::default(),
}
}
}
impl Query {
/// Return `true` if every field of the query is set to `None`, such that the query
/// matches all tasks.
pub fn is_empty(&self) -> bool {
matches!(
self,
Query {
limit: None,
from: None,
reverse: None,
uids: None,
batch_uids: None,
statuses: None,
types: None,
index_uids: None,
canceled_by: None,
before_enqueued_at: None,
after_enqueued_at: None,
before_started_at: None,
after_started_at: None,
before_finished_at: None,
after_finished_at: None,
}
)
}
/// Add an [index id](meilisearch_types::tasks::Task::index_uid) to the list of permitted indexes.
pub fn with_index(self, index_uid: String) -> Self {
let mut index_vec = self.index_uids.unwrap_or_default();
@@ -119,7 +119,7 @@ impl Query {
// Removes the `from` and `limit` restrictions from the query.
// Useful to get the total number of tasks matching a filter.
pub fn without_limits(self) -> Self {
Query { limit: usize::MAX, from: None, ..self }
Query { limit: None, from: None, ..self }
}
}

View File

@@ -465,11 +465,13 @@ impl Queue {
*before_finished_at,
)?;
tasks = if query.reverse.unwrap_or_default() {
tasks.into_iter().take(*limit).collect()
} else {
tasks.into_iter().rev().take(*limit).collect()
};
if let Some(limit) = limit {
tasks = if query.reverse.unwrap_or_default() {
tasks.into_iter().take(*limit as usize).collect()
} else {
tasks.into_iter().rev().take(*limit as usize).collect()
};
}
Ok(tasks)
}
@@ -527,7 +529,9 @@ impl Queue {
} else {
Box::new(tasks.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
};
let tasks = self.tasks.get_existing_tasks(rtxn, tasks.take(query.limit))?;
let tasks = self
.tasks
.get_existing_tasks(rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?;
let ProcessingTasks { batch, processing, progress: _ } = processing_tasks;

View File

@@ -28,21 +28,21 @@ fn query_tasks_from_and_limit() {
let rtxn = index_scheduler.env.read_txn().unwrap();
let processing = index_scheduler.processing_tasks.read().unwrap();
let query = Query { limit: 0, ..Default::default() };
let query = Query { limit: Some(0), ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
.unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[]");
let query = Query { limit: 1, ..Default::default() };
let query = Query { limit: Some(1), ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
.unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
let query = Query { limit: 2, ..Default::default() };
let query = Query { limit: Some(2), ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
@@ -63,14 +63,14 @@ fn query_tasks_from_and_limit() {
.unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
let query = Query { from: Some(1), limit: 1, ..Default::default() };
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
.unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
let query = Query { from: Some(1), limit: 2, ..Default::default() };
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)

View File

@@ -1,7 +1,7 @@
use std::any::TypeId;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -300,6 +300,7 @@ impl Infos {
max_indexing_memory,
max_indexing_threads,
skip_index_budget: _,
experimental_disable_delta_encoding: _,
experimental_no_edition_2024_for_settings,
experimental_no_edition_2024_for_dumps,
experimental_no_edition_2024_for_prefix_post_processing,
@@ -344,14 +345,14 @@ impl Infos {
experimental_no_edition_2024_for_dumps,
experimental_vector_store_setting: vector_store_setting,
gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(),
db_path: db_path != Path::new("./data.ms"),
db_path: db_path != PathBuf::from("./data.ms"),
import_dump: import_dump.is_some(),
dump_dir: dump_dir != Path::new("dumps/"),
dump_dir: dump_dir != PathBuf::from("dumps/"),
ignore_missing_dump,
ignore_dump_if_db_exists,
import_snapshot: import_snapshot.is_some(),
schedule_snapshot,
snapshot_dir: snapshot_dir != Path::new("snapshots/"),
snapshot_dir: snapshot_dir != PathBuf::from("snapshots/"),
uses_s3_snapshots: s3_snapshot_options.is_some(),
ignore_missing_snapshot,
ignore_snapshot_if_db_exists,

View File

@@ -21,6 +21,7 @@ use meilisearch::{
LogStderrType, Opt, ServicesData, SubscriberForSecondLayer,
};
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
use meilisearch_types::milli::heed_codec::DELTA_ENCODING_STATUS;
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt as _;
@@ -95,6 +96,14 @@ async fn main() -> anyhow::Result<()> {
async fn try_main(runtime: tokio::runtime::Handle) -> anyhow::Result<()> {
let (opt, config_read_from) = Opt::try_build()?;
// Disables the delta encoding of bitmaps as soon as possible
if opt.indexer_options.experimental_disable_delta_encoding {
DELTA_ENCODING_STATUS.set_to_disabled()
} else {
DELTA_ENCODING_STATUS.set_to_enabled()
}
.expect("the delta-encoding status to be set only once");
std::panic::set_hook(Box::new(on_panic));
anyhow::ensure!(

View File

@@ -60,6 +60,7 @@ const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_FACET_POST_PROCESSING: &str =
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_FACET_POST_PROCESSING";
const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_PREFIX_POST_PROCESSING: &str =
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_PREFIX_POST_PROCESSING";
const MEILI_EXPERIMENTAL_DISABLE_DELTA_ENCODING: &str = "MEILI_EXPERIMENTAL_DISABLE_DELTA_ENCODING";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE: &str = "MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE";
const MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER: &str = "MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER";
@@ -845,6 +846,14 @@ pub struct IndexerOpts {
#[clap(long, env = MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_FACET_POST_PROCESSING)]
#[serde(default)]
pub experimental_no_edition_2024_for_facet_post_processing: bool,
/// Experimental disable delta-encoding for bitmaps. For more information,
/// see: <https://github.com/orgs/meilisearch/discussions/875>
///
/// Enables the experimental disable delta-encoding for bitmaps feature.
#[clap(long, env = MEILI_EXPERIMENTAL_DISABLE_DELTA_ENCODING)]
#[serde(default)]
pub experimental_disable_delta_encoding: bool,
}
impl IndexerOpts {
@@ -858,6 +867,7 @@ impl IndexerOpts {
experimental_no_edition_2024_for_dumps,
experimental_no_edition_2024_for_prefix_post_processing,
experimental_no_edition_2024_for_facet_post_processing,
experimental_disable_delta_encoding,
} = self;
if let Some(max_indexing_memory) = max_indexing_memory.0 {
export_to_env_if_not_present(
@@ -895,6 +905,12 @@ impl IndexerOpts {
experimental_no_edition_2024_for_facet_post_processing.to_string(),
);
}
if experimental_disable_delta_encoding {
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_DISABLE_DELTA_ENCODING,
experimental_disable_delta_encoding.to_string(),
);
}
}
}
@@ -910,6 +926,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
experimental_no_edition_2024_for_dumps,
experimental_no_edition_2024_for_prefix_post_processing,
experimental_no_edition_2024_for_facet_post_processing,
experimental_disable_delta_encoding: _, // managed in try_main
} = other;
let thread_pool = ThreadPoolNoAbortBuilder::new_for_indexing()
@@ -1245,7 +1262,7 @@ where
T: AsRef<OsStr>,
{
if let Err(VarError::NotPresent) = std::env::var(key) {
std::env::set_var(key, value);
unsafe { std::env::set_var(key, value) }
}
}

View File

@@ -185,7 +185,7 @@ pub async fn get_metrics(
// Fetch the finished batches...
&Query {
statuses: Some(vec![Status::Succeeded, Status::Failed]),
limit: 1,
limit: Some(1),
..Query::default()
},
auth_filters,
@@ -214,7 +214,7 @@ pub async fn get_metrics(
let task_queue_latency_seconds = index_scheduler
.get_tasks_from_authorized_indexes(
&Query {
limit: 1,
limit: Some(1),
reverse: Some(true),
statuses: Some(vec![Status::Enqueued, Status::Processing]),
..Query::default()

View File

@@ -126,7 +126,7 @@ pub struct TasksFilterQuery {
impl TasksFilterQuery {
pub(crate) fn into_query(self) -> Query {
Query {
limit: self.limit.0 as usize,
limit: Some(self.limit.0),
from: self.from.as_deref().copied(),
reverse: self.reverse.as_deref().copied(),
batch_uids: self.batch_uids.merge_star_and_none(),
@@ -225,8 +225,7 @@ pub struct TaskDeletionOrCancelationQuery {
impl TaskDeletionOrCancelationQuery {
fn into_query(self) -> Query {
Query {
// We want to delete all tasks that match the given filters
limit: usize::MAX,
limit: None,
from: None,
reverse: None,
batch_uids: self.batch_uids.merge_star_and_none(),

View File

@@ -789,12 +789,11 @@ impl TryFrom<Value> for ExternalDocumentId {
}
}
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Deserr, ToSchema, Serialize)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserr, ToSchema, Serialize)]
#[deserr(rename_all = camelCase)]
#[serde(rename_all = "camelCase")]
pub enum MatchingStrategy {
/// Remove query words from last to first
#[default]
Last,
/// All query words are mandatory
All,
@@ -802,6 +801,12 @@ pub enum MatchingStrategy {
Frequency,
}
impl Default for MatchingStrategy {
fn default() -> Self {
Self::Last
}
}
impl From<MatchingStrategy> for TermsMatchingStrategy {
fn from(other: MatchingStrategy) -> Self {
match other {

View File

@@ -187,7 +187,7 @@ macro_rules! compute_forbidden_search {
#[actix_rt::test]
async fn search_authorized_simple_token() {
let tenant_tokens = [
let tenant_tokens = vec![
hashmap! {
"searchRules" => json!({"*": {}}),
"exp" => json!((OffsetDateTime::now_utc() + Duration::hours(1)).unix_timestamp())
@@ -239,7 +239,7 @@ async fn search_authorized_simple_token() {
#[actix_rt::test]
async fn search_authorized_filter_token() {
let tenant_tokens = [
let tenant_tokens = vec![
hashmap! {
"searchRules" => json!({"*": {"filter": "color = blue"}}),
"exp" => json!((OffsetDateTime::now_utc() + Duration::hours(1)).unix_timestamp())
@@ -292,7 +292,7 @@ async fn search_authorized_filter_token() {
#[actix_rt::test]
async fn filter_search_authorized_filter_token() {
let tenant_tokens = [
let tenant_tokens = vec![
hashmap! {
"searchRules" => json!({"*": {"filter": "color = blue"}}),
"exp" => json!((OffsetDateTime::now_utc() + Duration::hours(1)).unix_timestamp())
@@ -353,7 +353,7 @@ async fn filter_search_authorized_filter_token() {
/// Tests that those Tenant Token are incompatible with the REFUSED_KEYS defined above.
#[actix_rt::test]
async fn error_search_token_forbidden_parent_key() {
let tenant_tokens = [
let tenant_tokens = vec![
hashmap! {
"searchRules" => json!({"*": {}}),
"exp" => json!((OffsetDateTime::now_utc() + Duration::hours(1)).unix_timestamp())
@@ -389,7 +389,7 @@ async fn error_search_token_forbidden_parent_key() {
#[actix_rt::test]
async fn error_search_forbidden_token() {
let tenant_tokens = [
let tenant_tokens = vec![
// bad index
hashmap! {
"searchRules" => json!({"products": {}}),

View File

@@ -680,7 +680,7 @@ async fn multi_search_authorized_simple_token() {
#[actix_rt::test]
async fn single_search_authorized_filter_token() {
let tenant_tokens = [
let tenant_tokens = vec![
hashmap! {
"searchRules" => json!({"*": {"filter": "color = blue"}}),
"exp" => json!((OffsetDateTime::now_utc() + Duration::hours(1)).unix_timestamp())
@@ -733,7 +733,7 @@ async fn single_search_authorized_filter_token() {
#[actix_rt::test]
async fn multi_search_authorized_filter_token() {
let both_tenant_tokens = [
let both_tenant_tokens = vec![
hashmap! {
"searchRules" => json!({"sales": {"filter": "color = blue"}, "products": {"filter": "doggos.age <= 5"}}),
"exp" => json!((OffsetDateTime::now_utc() + Duration::hours(1)).unix_timestamp())
@@ -842,7 +842,7 @@ async fn filter_single_search_authorized_filter_token() {
#[actix_rt::test]
async fn filter_multi_search_authorized_filter_token() {
let tenant_tokens = [
let tenant_tokens = vec![
hashmap! {
"searchRules" => json!({"sales": {"filter": "color = blue"}, "products": {"filter": "doggos.age <= 5"}}),
"exp" => json!((OffsetDateTime::now_utc() + Duration::hours(1)).unix_timestamp())
@@ -900,7 +900,7 @@ async fn filter_multi_search_authorized_filter_token() {
/// Tests that those Tenant Token are incompatible with the REFUSED_KEYS defined above.
#[actix_rt::test]
async fn error_single_search_token_forbidden_parent_key() {
let tenant_tokens = [
let tenant_tokens = vec![
hashmap! {
"searchRules" => json!({"*": {}}),
"exp" => json!((OffsetDateTime::now_utc() + Duration::hours(1)).unix_timestamp())
@@ -941,7 +941,7 @@ async fn error_single_search_token_forbidden_parent_key() {
/// Tests that those Tenant Token are incompatible with the REFUSED_KEYS defined above.
#[actix_rt::test]
async fn error_multi_search_token_forbidden_parent_key() {
let tenant_tokens = [
let tenant_tokens = vec![
hashmap! {
"searchRules" => json!({"*": {}}),
"exp" => json!((OffsetDateTime::now_utc() + Duration::hours(1)).unix_timestamp())

View File

@@ -43,9 +43,9 @@ impl Server<Owned> {
let dir = TempDir::new().unwrap();
if cfg!(windows) {
std::env::set_var("TMP", TEST_TEMP_DIR.path());
unsafe { std::env::set_var("TMP", TEST_TEMP_DIR.path()) }
} else {
std::env::set_var("TMPDIR", TEST_TEMP_DIR.path());
unsafe { std::env::set_var("TMPDIR", TEST_TEMP_DIR.path()) }
}
let options = default_settings(dir.path());
@@ -58,9 +58,9 @@ impl Server<Owned> {
pub async fn new_auth_with_options(mut options: Opt, dir: TempDir) -> Self {
if cfg!(windows) {
std::env::set_var("TMP", TEST_TEMP_DIR.path());
unsafe { std::env::set_var("TMP", TEST_TEMP_DIR.path()) }
} else {
std::env::set_var("TMPDIR", TEST_TEMP_DIR.path());
unsafe { std::env::set_var("TMPDIR", TEST_TEMP_DIR.path()) }
}
options.master_key = Some("MASTER_KEY".to_string());
@@ -215,9 +215,9 @@ impl Server<Shared> {
let dir = TempDir::new().unwrap();
if cfg!(windows) {
std::env::set_var("TMP", TEST_TEMP_DIR.path());
unsafe { std::env::set_var("TMP", TEST_TEMP_DIR.path()) }
} else {
std::env::set_var("TMPDIR", TEST_TEMP_DIR.path());
unsafe { std::env::set_var("TMPDIR", TEST_TEMP_DIR.path()) }
}
let options = default_settings(dir.path());
@@ -508,6 +508,8 @@ pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
experimental_no_edition_2024_for_dumps: false,
experimental_no_edition_2024_for_prefix_post_processing: false,
experimental_no_edition_2024_for_facet_post_processing: false,
// It has no effect to set the delta encoding here as the toggle is done in try_main
experimental_disable_delta_encoding: false,
},
experimental_enable_metrics: false,
..Parser::parse_from(None as Option<&str>)

View File

@@ -120,14 +120,16 @@ twox-hash = { version = "2.1.2", default-features = false, features = [
] }
geo-types = "0.7.17"
zerometry = "0.3.0"
bitpacking = "0.9.2"
[dev-dependencies]
mimalloc = { version = "0.1.48", default-features = false }
# fixed version due to format breakages in v1.40
insta = "=1.39.0"
mimalloc = { version = "0.1.48", default-features = false }
maplit = "1.0.2"
md5 = "0.8.0"
meili-snap = { path = "../meili-snap" }
quickcheck = "1.0.3"
rand = { version = "0.8.5", features = ["small_rng"] }
[features]

View File

@@ -22,7 +22,10 @@ pub use self::beu32_str_codec::BEU32StrCodec;
pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
pub use self::fst_set_codec::FstSetCodec;
pub use self::obkv_codec::ObkvCodec;
pub use self::roaring_bitmap::{BoRoaringBitmapCodec, CboRoaringBitmapCodec, RoaringBitmapCodec};
pub use self::roaring_bitmap::{
BoRoaringBitmapCodec, CboRoaringBitmapCodec, DeCboRoaringBitmapCodec, RoaringBitmapCodec,
DELTA_ENCODING_STATUS,
};
pub use self::roaring_bitmap_length::{
BoRoaringBitmapLenCodec, CboRoaringBitmapLenCodec, RoaringBitmapLenCodec,
};

View File

@@ -19,8 +19,19 @@ pub const THRESHOLD: usize = 7;
pub struct CboRoaringBitmapCodec;
impl CboRoaringBitmapCodec {
/// If the number of items (u32s) to encode is less than or equal to the threshold
/// it means that it would weigh the same or less than the RoaringBitmap
/// header, so we directly encode them using ByteOrder instead.
pub fn bitmap_serialize_as_raw_u32s(roaring: &RoaringBitmap) -> bool {
roaring.len() <= THRESHOLD as u64
}
pub fn bytes_deserialize_as_raw_u32s(bytes: &[u8]) -> bool {
bytes.len() <= THRESHOLD * size_of::<u32>()
}
pub fn serialized_size(roaring: &RoaringBitmap) -> usize {
if roaring.len() <= THRESHOLD as u64 {
if Self::bitmap_serialize_as_raw_u32s(roaring) {
roaring.len() as usize * size_of::<u32>()
} else {
roaring.serialized_size()
@@ -35,10 +46,7 @@ impl CboRoaringBitmapCodec {
roaring: &RoaringBitmap,
mut writer: W,
) -> io::Result<()> {
if roaring.len() <= THRESHOLD as u64 {
// If the number of items (u32s) to encode is less than or equal to the threshold
// it means that it would weigh the same or less than the RoaringBitmap
// header, so we directly encode them using ByteOrder instead.
if Self::bitmap_serialize_as_raw_u32s(roaring) {
for integer in roaring {
writer.write_u32::<NativeEndian>(integer)?;
}
@@ -51,7 +59,7 @@ impl CboRoaringBitmapCodec {
}
pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringBitmap> {
if bytes.len() <= THRESHOLD * size_of::<u32>() {
if Self::bytes_deserialize_as_raw_u32s(bytes) {
// If there is threshold or less than threshold integers that can fit into this array
// of bytes it means that we used the ByteOrder codec serializer.
let mut bitmap = RoaringBitmap::new();
@@ -71,7 +79,7 @@ impl CboRoaringBitmapCodec {
other: &RoaringBitmap,
) -> io::Result<RoaringBitmap> {
// See above `deserialize_from` method for implementation details.
if bytes.len() <= THRESHOLD * size_of::<u32>() {
if Self::bytes_deserialize_as_raw_u32s(bytes) {
let mut bitmap = RoaringBitmap::new();
while let Ok(integer) = bytes.read_u32::<NativeEndian>() {
if other.contains(integer) {
@@ -98,7 +106,7 @@ impl CboRoaringBitmapCodec {
let mut vec = Vec::new();
for bytes in slices {
if bytes.as_ref().len() <= THRESHOLD * size_of::<u32>() {
if Self::bytes_deserialize_as_raw_u32s(bytes.as_ref()) {
let mut reader = bytes.as_ref();
while let Ok(integer) = reader.read_u32::<NativeEndian>() {
vec.push(integer);
@@ -112,6 +120,8 @@ impl CboRoaringBitmapCodec {
vec.sort_unstable();
vec.dedup();
// Be careful when modifying this condition,
// the rule must be the same everywhere
if vec.len() <= THRESHOLD {
for integer in vec {
buffer.extend_from_slice(&integer.to_ne_bytes());

View File

@@ -0,0 +1,153 @@
use std::borrow::Cow;
use std::io::{self, ErrorKind};
use std::sync::OnceLock;
use heed::BoxedError;
use roaring::RoaringBitmap;
use super::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
use super::de_roaring_bitmap_codec::DeRoaringBitmapCodec;
use crate::heed_codec::BytesDecodeOwned;
/// Defines the status of the delta encoding on whether we have enabled it or not.
pub static DELTA_ENCODING_STATUS: DeltaEncodingStatusLock = DeltaEncodingStatusLock::new();
pub struct DeCboRoaringBitmapCodec;
impl DeCboRoaringBitmapCodec {
pub fn serialized_size_with_tmp_buffer(
bitmap: &RoaringBitmap,
tmp_buffer: &mut Vec<u32>,
) -> usize {
// We are stuck with this format because the CboRoaringBitmapCodec decides to write
// raw and unencoded u32s, without a header when there is at most THRESHOLD elements.
if CboRoaringBitmapCodec::bitmap_serialize_as_raw_u32s(bitmap)
&& DELTA_ENCODING_STATUS.is_disabled()
{
CboRoaringBitmapCodec::serialized_size(bitmap)
} else {
DeRoaringBitmapCodec::serialized_size_with_tmp_buffer(bitmap, tmp_buffer)
}
}
/// Writes the delta-encoded compressed version of
/// the given roaring bitmap into the provided writer.
pub fn serialize_into<W: io::Write>(bitmap: &RoaringBitmap, writer: W) -> io::Result<()> {
let mut tmp_buffer = Vec::new();
Self::serialize_into_with_tmp_buffer(bitmap, writer, &mut tmp_buffer)
}
/// Same as [Self::serialize_into] but accepts a buffer to avoid allocating one.
///
/// Note that we always serialize the bitmap with the delta-encoded compressed version.
pub fn serialize_into_with_tmp_buffer<W: io::Write>(
bitmap: &RoaringBitmap,
writer: W,
tmp_buffer: &mut Vec<u32>,
) -> io::Result<()> {
// We are stuck with this format because the CboRoaringBitmapCodec decides to write
// raw and unencoded u32s, without a header when there is at most THRESHOLD elements.
if CboRoaringBitmapCodec::bitmap_serialize_as_raw_u32s(bitmap)
&& DELTA_ENCODING_STATUS.is_disabled()
{
CboRoaringBitmapCodec::serialize_into_writer(bitmap, writer)
} else {
DeRoaringBitmapCodec::serialize_into_with_tmp_buffer(bitmap, writer, tmp_buffer)
}
}
/// Returns the delta-decoded roaring bitmap from the compressed bytes.
pub fn deserialize_from(compressed: &[u8]) -> io::Result<RoaringBitmap> {
let mut tmp_buffer = Vec::new();
Self::deserialize_from_with_tmp_buffer(compressed, &mut tmp_buffer)
}
/// Same as [Self::deserialize_from] but accepts a buffer to avoid allocating one.
///
/// It tries to decode the input by using the delta-decoded version and
/// if it fails, falls back to the CboRoaringBitmap version.
pub fn deserialize_from_with_tmp_buffer(
input: &[u8],
tmp_buffer: &mut Vec<u32>,
) -> io::Result<RoaringBitmap> {
// The input is too short to be a valid delta-decoded bitmap.
// We fall back to the CboRoaringBitmap version with raw u32s.
if CboRoaringBitmapCodec::bytes_deserialize_as_raw_u32s(input) {
return CboRoaringBitmapCodec::deserialize_from(input);
}
match DeRoaringBitmapCodec::deserialize_from_with_tmp_buffer(input, tmp_buffer) {
Ok(bitmap) => Ok(bitmap),
// If the error kind is Other it means that the delta-decoder found
// an invalid magic header. We fall back to the CboRoaringBitmap version.
Err(e) if e.kind() == ErrorKind::Other => {
CboRoaringBitmapCodec::deserialize_from(input)
}
Err(e) => Err(e),
}
}
}
impl heed::BytesDecode<'_> for DeCboRoaringBitmapCodec {
type DItem = RoaringBitmap;
fn bytes_decode(bytes: &[u8]) -> Result<Self::DItem, BoxedError> {
Self::deserialize_from(bytes).map_err(Into::into)
}
}
impl BytesDecodeOwned for DeCboRoaringBitmapCodec {
type DItem = RoaringBitmap;
fn bytes_decode_owned(bytes: &[u8]) -> Result<Self::DItem, BoxedError> {
Self::deserialize_from(bytes).map_err(Into::into)
}
}
impl heed::BytesEncode<'_> for DeCboRoaringBitmapCodec {
type EItem = RoaringBitmap;
fn bytes_encode(item: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> {
let mut tmp_buffer = Vec::new();
let capacity = Self::serialized_size_with_tmp_buffer(&item, &mut tmp_buffer);
let mut output = Vec::with_capacity(capacity);
Self::serialize_into_with_tmp_buffer(item, &mut output, &mut tmp_buffer)?;
Ok(Cow::Owned(output))
}
}
/// Manages the global status of the delta encoding.
///
/// Whether we must use delta encoding or not when encoding roaring bitmaps.
pub struct DeltaEncodingStatusLock(OnceLock<DeltaEncodingStatus>);
impl DeltaEncodingStatusLock {
pub const fn new() -> Self {
Self(OnceLock::new())
}
}
#[derive(Default)]
enum DeltaEncodingStatus {
Enabled,
#[default]
Disabled,
}
impl DeltaEncodingStatusLock {
pub fn set_to_enabled(&self) -> Result<(), ()> {
self.0.set(DeltaEncodingStatus::Enabled).map_err(drop)
}
pub fn set_to_disabled(&self) -> Result<(), ()> {
self.0.set(DeltaEncodingStatus::Disabled).map_err(drop)
}
pub fn is_enabled(&self) -> bool {
matches!(self.0.get(), Some(DeltaEncodingStatus::Enabled))
}
pub fn is_disabled(&self) -> bool {
!self.is_enabled()
}
}

View File

@@ -0,0 +1,377 @@
use std::io::{self, ErrorKind};
use std::mem::{self, size_of, size_of_val};
use bitpacking::{BitPacker, BitPacker1x, BitPacker4x, BitPacker8x};
use roaring::RoaringBitmap;
/// The magic header for our custom encoding format
const MAGIC_HEADER: u16 = 36869;
pub struct DeRoaringBitmapCodec;
// TODO reintroduce:
// - serialized_size?
// - serialize_into_vec
// - intersection_with_serialized
// - merge_into
// - merge_deladd_into
impl DeRoaringBitmapCodec {
/// Returns the serialized size of the given roaring bitmap with the delta encoding format.
pub fn serialized_size_with_tmp_buffer(
bitmap: &RoaringBitmap,
tmp_buffer: &mut Vec<u32>,
) -> usize {
let mut size = 2; // u16 magic header
let bitpacker8x = BitPacker8x::new();
let bitpacker4x = BitPacker4x::new();
let bitpacker1x = BitPacker1x::new();
// This temporary buffer is used to store each chunk of decompressed u32s.
tmp_buffer.resize(BitPacker8x::BLOCK_LEN, 0u32);
let decompressed = &mut tmp_buffer[..];
let mut buffer_index = 0;
let mut initial = None;
// We initially collect all the integers into a flat buffer of the size
// of the largest bitpacker. We encode them with it until we don't have
// enough of them...
for n in bitmap {
decompressed[buffer_index] = n;
buffer_index += 1;
if buffer_index == BitPacker8x::BLOCK_LEN {
let num_bits = bitpacker8x.num_bits_strictly_sorted(initial, decompressed);
let compressed_len = BitPacker8x::compressed_block_size(num_bits);
size += 1; // u8 chunk header
size += compressed_len; // compressed data length
initial = Some(n);
buffer_index = 0;
}
}
// ...We then switch to a smaller bitpacker to encode the remaining chunks...
let decompressed = &decompressed[..buffer_index];
let mut chunks = decompressed.chunks_exact(BitPacker4x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
let num_bits = bitpacker4x.num_bits_strictly_sorted(initial, decompressed);
let compressed_len = BitPacker4x::compressed_block_size(num_bits);
size += 1; // u8 chunk header
size += compressed_len; // compressed data length
initial = decompressed.iter().last().copied();
}
// ...And so on...
let decompressed = chunks.remainder();
let mut chunks = decompressed.chunks_exact(BitPacker1x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
let num_bits = bitpacker1x.num_bits_strictly_sorted(initial, decompressed);
let compressed_len = BitPacker1x::compressed_block_size(num_bits);
size += 1; // u8 chunk header
size += compressed_len; // compressed data length
initial = decompressed.iter().last().copied();
}
// ...Until we don't have any small enough bitpacker. We put them raw
// at the end of out buffer with a header indicating the matter.
let decompressed = chunks.remainder();
if !decompressed.is_empty() {
size += 1; // u8 chunk header
size += mem::size_of_val(decompressed); // remaining uncompressed u32s
}
size
}
/// Writes the delta-encoded compressed version of
/// the given roaring bitmap into the provided writer.
pub fn serialize_into<W: io::Write>(bitmap: &RoaringBitmap, writer: W) -> io::Result<()> {
let mut tmp_buffer = Vec::new();
Self::serialize_into_with_tmp_buffer(bitmap, writer, &mut tmp_buffer)
}
/// Same as [Self::serialize_into] but accepts a buffer to avoid allocating one.
pub fn serialize_into_with_tmp_buffer<W: io::Write>(
bitmap: &RoaringBitmap,
mut writer: W,
tmp_buffer: &mut Vec<u32>,
) -> io::Result<()> {
// Insert the magic header
writer.write_all(&MAGIC_HEADER.to_ne_bytes())?;
let bitpacker8x = BitPacker8x::new();
let bitpacker4x = BitPacker4x::new();
let bitpacker1x = BitPacker1x::new();
// This temporary buffer is used to store each chunk of decompressed and
// compressed and delta-encoded u32s. We need room for the decompressed
// u32s coming from the roaring bitmap, the compressed output that can
// be as large as the decompressed u32s, and the chunk header.
tmp_buffer.resize((BitPacker8x::BLOCK_LEN * 2) + 1, 0u32);
let (decompressed, compressed) = tmp_buffer.split_at_mut(BitPacker8x::BLOCK_LEN);
let compressed = bytemuck::cast_slice_mut(compressed);
let mut buffer_index = 0;
let mut initial = None;
// We initially collect all the integers into a flat buffer of the size
// of the largest bitpacker. We encode them with it until we don't have
// enough of them...
for n in bitmap {
decompressed[buffer_index] = n;
buffer_index += 1;
if buffer_index == BitPacker8x::BLOCK_LEN {
let output = encode_with_packer(&bitpacker8x, decompressed, initial, compressed);
writer.write_all(output)?;
initial = Some(n);
buffer_index = 0;
}
}
// ...We then switch to a smaller bitpacker to encode the remaining chunks...
let decompressed = &decompressed[..buffer_index];
let mut chunks = decompressed.chunks_exact(BitPacker4x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
let output = encode_with_packer(&bitpacker4x, decompressed, initial, compressed);
writer.write_all(output)?;
initial = decompressed.iter().last().copied();
}
// ...And so on...
let decompressed = chunks.remainder();
let mut chunks = decompressed.chunks_exact(BitPacker1x::BLOCK_LEN);
for decompressed in chunks.by_ref() {
let output = encode_with_packer(&bitpacker1x, decompressed, initial, compressed);
writer.write_all(output)?;
initial = decompressed.iter().last().copied();
}
// ...Until we don't have any small enough bitpacker. We put them raw
// at the end of out buffer with a header indicating the matter.
let decompressed = chunks.remainder();
if !decompressed.is_empty() {
let header = encode_chunk_header(BitPackerLevel::None, u32::BITS as u8);
// Note: Not convinced about the performance of writing a single
// byte followed by a larger write. However, we will use this
// codec with a BufWriter or directly with a Vec of bytes.
writer.write_all(&[header])?;
writer.write_all(bytemuck::cast_slice(decompressed))?;
}
Ok(())
}
/// Returns the delta-decoded roaring bitmap from the compressed bytes.
pub fn deserialize_from(compressed: &[u8]) -> io::Result<RoaringBitmap> {
let mut tmp_buffer = Vec::new();
Self::deserialize_from_with_tmp_buffer(compressed, &mut tmp_buffer)
}
/// Same as [Self::deserialize_from] but accepts a buffer to avoid allocating one.
pub fn deserialize_from_with_tmp_buffer(
input: &[u8],
tmp_buffer: &mut Vec<u32>,
) -> io::Result<RoaringBitmap> {
let Some((header, mut compressed)) = input.split_at_checked(size_of_val(&MAGIC_HEADER))
else {
return Err(io::Error::new(ErrorKind::UnexpectedEof, "expecting a two-bytes header"));
};
// Safety: This unwrap cannot happen as the header buffer is the right size
let header = u16::from_ne_bytes(header.try_into().unwrap());
if header != MAGIC_HEADER {
return Err(io::Error::other("invalid header value"));
}
let bitpacker8x = BitPacker8x::new();
let bitpacker4x = BitPacker4x::new();
let bitpacker1x = BitPacker1x::new();
let mut bitmap = RoaringBitmap::new();
tmp_buffer.resize(BitPacker8x::BLOCK_LEN, 0u32);
let decompressed = &mut tmp_buffer[..];
let mut initial = None;
while let Some((&chunk_header, encoded)) = compressed.split_first() {
let (level, num_bits) = decode_chunk_header(chunk_header);
let (bytes_read, decompressed) = match level {
BitPackerLevel::None => {
if num_bits != u32::BITS as u8 {
return Err(io::Error::new(
ErrorKind::InvalidData,
"invalid number of bits to encode non-compressed u32s",
));
}
let chunks = encoded.chunks_exact(size_of::<u32>());
if !chunks.remainder().is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"expecting last chunk to be a multiple of the size of an u32",
));
}
let integers = chunks
// safety: This unwrap cannot happen as
// the size of u32 is set correctly.
.map(|b| b.try_into().unwrap())
.map(u32::from_ne_bytes);
bitmap
.append(integers)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;
// This is basically always the last chunk that exists in
// this delta-encoded format as the raw u32s are appended
// when there is not enough of them to fit in a bitpacker.
break;
}
BitPackerLevel::BitPacker1x => {
decode_with_packer(&bitpacker1x, decompressed, initial, encoded, num_bits)
}
BitPackerLevel::BitPacker4x => {
decode_with_packer(&bitpacker4x, decompressed, initial, encoded, num_bits)
}
BitPackerLevel::BitPacker8x => {
decode_with_packer(&bitpacker8x, decompressed, initial, encoded, num_bits)
}
};
initial = decompressed.iter().last().copied();
// TODO investigate perf
// Safety: Bitpackers cannot output unsorter integers when
// used with the compress_strictly_sorted function.
bitmap.append(decompressed.iter().copied()).unwrap();
// What the delta-decoding read plus the chunk header size
compressed = &compressed[bytes_read + 1..];
}
Ok(bitmap)
}
}
/// Takes a strickly sorted list of u32s and outputs delta-encoded
/// bytes with a chunk header. We expect the output buffer to be
/// at least BLOCK_LEN + 1.
fn encode_with_packer<'c, B: BitPackerExt>(
bitpacker: &B,
decompressed: &[u32],
initial: Option<u32>,
output: &'c mut [u8],
) -> &'c [u8] {
let num_bits = bitpacker.num_bits_strictly_sorted(initial, decompressed);
let compressed_len = B::compressed_block_size(num_bits);
let chunk_header = encode_chunk_header(B::level(), num_bits);
let buffer = &mut output[..compressed_len + 1];
// Safety: The buffer is at least one byte
let (header_in_buffer, encoded) = buffer.split_first_mut().unwrap();
*header_in_buffer = chunk_header;
bitpacker.compress_strictly_sorted(initial, decompressed, encoded, num_bits);
buffer
}
/// Returns the number of bytes read and the decoded unsigned integers.
fn decode_with_packer<'d, B: BitPacker>(
bitpacker: &B,
decompressed: &'d mut [u32],
initial: Option<u32>,
compressed: &[u8],
num_bits: u8,
) -> (usize, &'d [u32]) {
let decompressed = &mut decompressed[..B::BLOCK_LEN];
let read = bitpacker.decompress_strictly_sorted(initial, compressed, decompressed, num_bits);
(read, decompressed)
}
/// An identifier for the bitpacker to be able
/// to correctly decode the compressed integers.
#[derive(Debug, PartialEq, Eq)]
#[repr(u8)]
enum BitPackerLevel {
/// The remaining bytes are raw little endian encoded u32s.
None,
/// The remaining bits are encoded using a `BitPacker1x`.
BitPacker1x,
/// The remaining bits are encoded using a `BitPacker4x`.
BitPacker4x,
/// The remaining bits are encoded using a `BitPacker8x`.
BitPacker8x,
}
/// Returns the chunk header based on the bitpacker level
/// and the number of bits to encode the list of integers.
fn encode_chunk_header(level: BitPackerLevel, num_bits: u8) -> u8 {
debug_assert!(num_bits as u32 <= 2_u32.pow(6));
let level = level as u8;
debug_assert!(level <= 3);
num_bits | (level << 6)
}
/// Decodes the chunk header and output the bitpacker level
/// and the number of bits to decode the following bytes.
fn decode_chunk_header(data: u8) -> (BitPackerLevel, u8) {
let num_bits = data & 0b00111111;
let level = match data >> 6 {
0 => BitPackerLevel::None,
1 => BitPackerLevel::BitPacker1x,
2 => BitPackerLevel::BitPacker4x,
3 => BitPackerLevel::BitPacker8x,
invalid => panic!("Invalid bitpacker level: {invalid}"),
};
debug_assert!(num_bits as u32 <= 2_u32.pow(6));
(level, num_bits)
}
/// A simple helper trait to get the BitPackerLevel
/// and correctly generate the chunk header.
trait BitPackerExt: BitPacker {
/// Returns the level of the bitpacker: an identifier to be
/// able to decode the numbers with the right bitpacker.
fn level() -> BitPackerLevel;
}
impl BitPackerExt for BitPacker8x {
fn level() -> BitPackerLevel {
BitPackerLevel::BitPacker8x
}
}
impl BitPackerExt for BitPacker4x {
fn level() -> BitPackerLevel {
BitPackerLevel::BitPacker4x
}
}
impl BitPackerExt for BitPacker1x {
fn level() -> BitPackerLevel {
BitPackerLevel::BitPacker1x
}
}
#[cfg(test)]
mod tests {
use quickcheck::quickcheck;
use roaring::RoaringBitmap;
use super::DeRoaringBitmapCodec;
quickcheck! {
fn qc_random(xs: Vec<u32>) -> bool {
let bitmap = RoaringBitmap::from_iter(xs);
let mut compressed = Vec::new();
DeRoaringBitmapCodec::serialize_into(&bitmap, &mut compressed).unwrap();
let decompressed = DeRoaringBitmapCodec::deserialize_from(&compressed[..]).unwrap();
decompressed == bitmap
}
}
quickcheck! {
fn qc_random_check_serialized_size(xs: Vec<u32>) -> bool {
let bitmap = RoaringBitmap::from_iter(xs);
let mut compressed = Vec::new();
let mut tmp_buffer = Vec::new();
DeRoaringBitmapCodec::serialize_into(&bitmap, &mut compressed).unwrap();
let expected_len = DeRoaringBitmapCodec::serialized_size_with_tmp_buffer(&bitmap, &mut tmp_buffer);
compressed.len() == expected_len
}
}
}

View File

@@ -1,7 +1,10 @@
mod bo_roaring_bitmap_codec;
pub mod cbo_roaring_bitmap_codec;
pub mod de_cbo_roaring_bitmap_codec;
mod de_roaring_bitmap_codec;
mod roaring_bitmap_codec;
pub use self::bo_roaring_bitmap_codec::BoRoaringBitmapCodec;
pub use self::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
// pub use self::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
pub use self::de_cbo_roaring_bitmap_codec::{DeCboRoaringBitmapCodec, DELTA_ENCODING_STATUS};
pub use self::roaring_bitmap_codec::RoaringBitmapCodec;

View File

@@ -385,10 +385,9 @@ pub struct SearchResult {
pub query_vector: Option<Embedding>,
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TermsMatchingStrategy {
// remove last word first
#[default]
Last,
// all words are mandatory
All,
@@ -396,6 +395,12 @@ pub enum TermsMatchingStrategy {
Frequency,
}
impl Default for TermsMatchingStrategy {
fn default() -> Self {
Self::Last
}
}
impl From<MatchingStrategy> for TermsMatchingStrategy {
fn from(other: MatchingStrategy) -> Self {
match other {

View File

@@ -124,7 +124,7 @@ impl GrenadParameters {
/// This should be called inside of a rayon thread pool,
/// otherwise, it will take the global number of threads.
pub fn max_memory_by_thread(&self) -> Option<usize> {
self.max_memory.map(|max_memory| max_memory / rayon::current_num_threads())
self.max_memory.map(|max_memory| (max_memory / rayon::current_num_threads()))
}
}

View File

@@ -54,12 +54,11 @@ pub struct DocumentAdditionResult {
pub number_of_documents: u64,
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[non_exhaustive]
pub enum IndexDocumentsMethod {
/// Replace the previous document with the new one,
/// removing all the already known attributes.
#[default]
ReplaceDocuments,
/// Merge the previous version of the document with the new version,
@@ -67,6 +66,12 @@ pub enum IndexDocumentsMethod {
UpdateDocuments,
}
impl Default for IndexDocumentsMethod {
fn default() -> Self {
Self::ReplaceDocuments
}
}
pub struct IndexDocuments<'t, 'i, 'a, FP, FA> {
wtxn: &'t mut heed::RwTxn<'i>,
index: &'i Index,

View File

@@ -48,11 +48,10 @@ use crate::{
ChannelCongestion, FieldId, FilterableAttributesRule, Index, LocalizedAttributesRule, Result,
};
#[derive(Default, Debug, Clone, PartialEq, Eq, Copy)]
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum Setting<T> {
Set(T),
Reset,
#[default]
NotSet,
}
@@ -72,6 +71,12 @@ where
}
}
impl<T> Default for Setting<T> {
fn default() -> Self {
Self::NotSet
}
}
impl<T> Setting<T> {
pub fn set(self) -> Option<T> {
match self {

View File

@@ -67,7 +67,7 @@ impl<F> Embeddings<F> {
///
/// If `embeddings.len() % self.dimension != 0`, then the append operation fails.
pub fn append(&mut self, mut embeddings: Vec<F>) -> Result<(), Vec<F>> {
if !embeddings.len().is_multiple_of(self.dimension) {
if embeddings.len() % self.dimension != 0 {
return Err(embeddings);
}
self.data.append(&mut embeddings);

View File

@@ -178,7 +178,6 @@ pub fn get_arch() -> anyhow::Result<&'static str> {
#[cfg(not(all(target_os = "linux", target_arch = "aarch64")))]
#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))]
#[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
#[cfg(not(all(target_os = "macos", target_arch = "x86_64")))]
anyhow::bail!("unsupported platform")
}

View File

@@ -1,3 +1,3 @@
[toolchain]
channel = "1.91.1"
channel = "1.89.0"
components = ["clippy"]