Compare commits

...

19 Commits

Author SHA1 Message Date
Kerollmops
1bafd9a63c Create a new export documents meilitool subcommand based on v1.12 2025-02-04 12:35:11 +01:00
meili-bors[bot]
876084d480 Merge #5294
5294: Accept the max readers param by env var and reduce rayon tasks r=dureuill a=Kerollmops

This PR fixes a customer's issue with the maximum number of readers. You can find more info on [this support issue](https://github.com/meilisearch/meilisearch-support/issues/643) and [this Slack channel](https://meilisearch.slack.com/archives/C03T1T47TUG/p1737966988042699). It allows configuring the max readers via the `MEILI_INDEX_MAX_READERS` environment variable, logs API calls, reduces rayon operations to prevent the MDB_READERS_FULL errors, and is aimed for inclusion in version 1.12.8 with an experimental variable for adjusting max readers.

The prototype is named `prototype-accept-env-var-max-reader-4` and [has been built in the CI](https://github.com/meilisearch/meilisearch/actions/runs/13028049950)  but there is the new one `prototype-accept-env-var-max-reader-5` that is [being built in this CI](https://github.com/meilisearch/meilisearch/actions/runs/13035529978).

## Before Merging in v1.12.8
- [x] Remove the commit that unwraps everywhere.
- [ ] (optional) Fix the `internal: unexpected end of hex escape at line 1 column 5149` error.

Co-authored-by: Kerollmops <clement@meilisearch.com>
2025-01-30 10:52:23 +00:00
Kerollmops
350093baa3 Add a link to the experimental feature GitHub discussion 2025-01-30 11:43:01 +01:00
Kerollmops
24e0919d15 Better document the rayon limitation condition 2025-01-30 11:29:41 +01:00
Kerollmops
4b488b2baf Do not create too many rayon tasks when processing the settings 2025-01-30 11:24:49 +01:00
meili-bors[bot]
9bcb271f00 Merge #5297
5297: Update version for the next release (v1.12.8) in Cargo.toml r=Kerollmops a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: Kerollmops <Kerollmops@users.noreply.github.com>
2025-01-29 10:06:49 +00:00
Kerollmops
9f5ac967a0 Update version for the next release (v1.12.8) in Cargo.toml 2025-01-29 10:01:28 +00:00
Kerollmops
e0f446e4d3 Remove a log that would log too much 2025-01-28 21:31:01 +01:00
Kerollmops
3bbad823e0 Refine the env variable and the max readers 2025-01-28 21:31:01 +01:00
Kerollmops
b605549bf2 Do not create too many rayon tasks 2025-01-28 21:31:01 +01:00
Kerollmops
6a1062edf5 Add more logs to see calls to the embedders 2025-01-28 21:31:01 +01:00
Kerollmops
426ea5aa97 Accept the max readers param by env var and increase it 2025-01-28 21:31:00 +01:00
meili-bors[bot]
e20b91210d Merge #5276
5276: Fix the stuck indexation due to the internal BBQueue capacity r=curquiza a=Kerollmops

Fixes https://github.com/meilisearch/meilisearch/issues/5277. Reduce the maximum reserve grant in the BBQueue so we are never stuck.

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
2025-01-23 13:41:34 +00:00
meili-bors[bot]
17478301ab Merge #5278
Some checks failed
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 14s
Test suite / Run tests in debug (push) Failing after 13s
Test suite / Run Clippy (push) Failing after 14s
Test suite / Tests on windows-2022 (push) Failing after 26s
Test suite / Run Rustfmt (push) Successful in 1m43s
Test suite / Tests on macos-13 (push) Has been cancelled
5278: Update version for the next release (v1.12.7) in Cargo.toml r=dureuill a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: dureuill <dureuill@users.noreply.github.com>
2025-01-23 10:47:30 +00:00
dureuill
968c9dff27 Update version for the next release (v1.12.7) in Cargo.toml 2025-01-23 10:17:23 +00:00
Louis Dureuil
463553988c Support offline upgrade up to v1.12.7 2025-01-23 11:11:40 +01:00
Clément Renault
c321fdb9c0 Comment the max grant of the bbqueue
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2025-01-23 11:09:20 +01:00
Louis Dureuil
36b6e94b29 Give more RAM to bbqueue.
- bbqueue buffers used to have (5% * 2%) / num_threads
- they now have 5% / num_threads
2025-01-23 10:55:03 +01:00
Kerollmops
34dea863e5 Reduce the maximum grant possible we can store in the BBQueue 2025-01-23 10:43:28 +01:00
11 changed files with 324 additions and 120 deletions

34
Cargo.lock generated
View File

@@ -496,7 +496,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
[[package]]
name = "benchmarks"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"anyhow",
"bumpalo",
@@ -689,7 +689,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"anyhow",
"time",
@@ -1664,7 +1664,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"anyhow",
"big_s",
@@ -1876,7 +1876,7 @@ checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
[[package]]
name = "file-store"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"tempfile",
"thiserror",
@@ -1898,7 +1898,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"insta",
"nom",
@@ -1918,7 +1918,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"criterion",
"serde_json",
@@ -2057,7 +2057,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"arbitrary",
"bumpalo",
@@ -2624,7 +2624,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
[[package]]
name = "index-scheduler"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"anyhow",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2822,7 +2822,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"criterion",
"serde_json",
@@ -3441,7 +3441,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"insta",
"md5",
@@ -3450,7 +3450,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"actix-cors",
"actix-http",
@@ -3540,7 +3540,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@@ -3559,7 +3559,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"actix-web",
"anyhow",
@@ -3592,7 +3592,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"anyhow",
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
@@ -3627,7 +3627,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"allocator-api2",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -4083,7 +4083,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permissive-json-pointer"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"big_s",
"serde_json",
@@ -6486,7 +6486,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.12.6"
version = "1.12.8"
dependencies = [
"anyhow",
"build-info",

View File

@@ -22,7 +22,7 @@ members = [
]
[workspace.package]
version = "1.12.6"
version = "1.12.8"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@@ -1,5 +1,7 @@
use std::collections::BTreeMap;
use std::env::VarError;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
@@ -302,7 +304,18 @@ fn create_or_open_index(
) -> Result<Index> {
let mut options = EnvOpenOptions::new();
options.map_size(clamp_to_page_size(map_size));
options.max_readers(1024);
// You can find more details about this experimental
// environment variable on the following GitHub discussion:
// <https://github.com/orgs/meilisearch/discussions/806>
let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") {
Ok(value) => u32::from_str(&value).unwrap(),
Err(VarError::NotPresent) => 1024,
Err(VarError::NotUnicode(value)) => panic!(
"Invalid unicode for the `MEILI_EXPERIMENTAL_INDEX_MAX_READERS` env var: {value:?}"
),
};
options.max_readers(max_readers);
if enable_mdb_writemap {
unsafe { options.flags(EnvFlags::WRITE_MAP) };
}

View File

@@ -1,5 +1,5 @@
use std::fs::{read_dir, read_to_string, remove_file, File};
use std::io::BufWriter;
use std::io::{BufWriter, Write as _};
use std::path::PathBuf;
use anyhow::Context;
@@ -9,11 +9,15 @@ use file_store::FileStore;
use meilisearch_auth::AuthController;
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
use meilisearch_types::milli;
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::{obkv_to_json, BEU32};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::versioning::{get_version, parse_version};
use meilisearch_types::Index;
use milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use serde_json::Value::Object;
use time::macros::format_description;
use time::OffsetDateTime;
use upgrade::OfflineUpgrade;
@@ -65,6 +69,20 @@ enum Command {
skip_enqueued_tasks: bool,
},
/// Exports the documents of an index in NDJSON format from a Meilisearch index to stdout.
///
/// This command can be executed on a running Meilisearch database. However, please note that
/// it will maintain a read-only transaction for the duration of the extraction process.
ExportDocuments {
/// The index name to export the documents from.
#[arg(long)]
index_name: String,
/// Do not export vectors with the documents.
#[arg(long)]
ignore_vectors: bool,
},
/// Attempts to upgrade from one major version to the next without a dump.
///
/// Make sure to run this commmand when Meilisearch is not running!
@@ -90,6 +108,9 @@ fn main() -> anyhow::Result<()> {
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
}
Command::ExportDocuments { index_name, ignore_vectors } => {
export_documents(db_path, index_name, ignore_vectors)
}
Command::OfflineUpgrade { target_version } => {
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
@@ -188,7 +209,7 @@ fn export_a_dump(
dump_dir: PathBuf,
skip_enqueued_tasks: bool,
detected_version: (String, String, String),
) -> Result<(), anyhow::Error> {
) -> anyhow::Result<()> {
let started_at = OffsetDateTime::now_utc();
// 1. Extracts the instance UID from disk
@@ -351,3 +372,95 @@ fn export_a_dump(
Ok(())
}
fn export_documents(
db_path: PathBuf,
index_name: String,
ignore_vectors: bool,
) -> anyhow::Result<()> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let rtxn = env.read_txn()?;
let index_mapping: Database<Str, UuidCodec> =
try_opening_database(&env, &rtxn, "index-mapping")?;
for result in index_mapping.iter(&rtxn)? {
let (uid, uuid) = result?;
if uid == index_name {
let index_path = db_path.join("indexes").join(uuid.to_string());
let index = Index::new(EnvOpenOptions::new(), &index_path).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let rtxn = index.read_txn()?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let embedding_configs = index.embedding_configs(&rtxn)?;
let mut stdout = BufWriter::new(std::io::stdout());
for ret in index.all_documents(&rtxn)? {
let (id, doc) = ret?;
let mut document = obkv_to_json(&all_fields, &fields_ids_map, doc)?;
if !ignore_vectors {
'inject_vectors: {
let embeddings = index.embeddings(&rtxn, id)?;
if embeddings.is_empty() {
break 'inject_vectors;
}
let vectors = document
.entry(RESERVED_VECTORS_FIELD_NAME)
.or_insert(Object(Default::default()));
let Object(vectors) = vectors else {
return Err(meilisearch_types::milli::Error::UserError(
meilisearch_types::milli::UserError::InvalidVectorsMapType {
document_id: {
if let Ok(Some(Ok(index))) = index
.external_id_of(&rtxn, std::iter::once(id))
.map(|it| it.into_iter().next())
{
index
} else {
format!("internal docid={id}")
}
},
value: vectors.clone(),
},
)
.into());
};
for (embedder_name, embeddings) in embeddings {
let user_provided = embedding_configs
.iter()
.find(|conf| conf.name == embedder_name)
.is_some_and(|conf| conf.user_provided.contains(id));
let embeddings = ExplicitVectors {
embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors(
embeddings,
)),
regenerate: !user_provided,
};
vectors
.insert(embedder_name, serde_json::to_value(embeddings).unwrap());
}
}
}
serde_json::to_writer(&mut stdout, &document)?;
}
stdout.flush()?;
} else {
eprintln!("Found index {uid} but it's not the right index...");
}
}
Ok(())
}

View File

@@ -44,9 +44,9 @@ impl OfflineUpgrade {
}
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5";
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.7";
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5";
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.7";
let upgrade_list = [
(
@@ -73,7 +73,7 @@ impl OfflineUpgrade {
("1", "10", _) => 1,
("1", "11", _) => 2,
("1", "12", "0" | "1" | "2") => 3,
("1", "12", "3" | "4" | "5") => no_upgrade,
("1", "12", "3" | "4" | "5" | "6" | "7") => no_upgrade,
_ => {
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
@@ -87,7 +87,7 @@ impl OfflineUpgrade {
("1", "10", _) => 0,
("1", "11", _) => 1,
("1", "12", "0" | "1" | "2") => 2,
("1", "12", "3" | "4" | "5") => 3,
("1", "12", "3" | "4" | "5" | "6" | "7") => 3,
(major, _, _) if major.starts_with('v') => {
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
}

View File

@@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use rayon::{ThreadPool, ThreadPoolBuilder};
@@ -9,6 +9,8 @@ use thiserror::Error;
#[derive(Debug)]
pub struct ThreadPoolNoAbort {
thread_pool: ThreadPool,
/// The number of active operations.
active_operations: AtomicUsize,
/// Set to true if the thread pool catched a panic.
pool_catched_panic: Arc<AtomicBool>,
}
@@ -19,7 +21,9 @@ impl ThreadPoolNoAbort {
OP: FnOnce() -> R + Send,
R: Send,
{
self.active_operations.fetch_add(1, Ordering::Relaxed);
let output = self.thread_pool.install(op);
self.active_operations.fetch_sub(1, Ordering::Relaxed);
// While reseting the pool panic catcher we return an error if we catched one.
if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
Err(PanicCatched)
@@ -31,6 +35,11 @@ impl ThreadPoolNoAbort {
pub fn current_num_threads(&self) -> usize {
self.thread_pool.current_num_threads()
}
/// The number of active operations.
pub fn active_operations(&self) -> usize {
self.active_operations.load(Ordering::Relaxed)
}
}
#[derive(Error, Debug)]
@@ -64,6 +73,10 @@ impl ThreadPoolNoAbortBuilder {
let catched_panic = pool_catched_panic.clone();
move |_result| catched_panic.store(true, Ordering::SeqCst)
});
Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic })
Ok(ThreadPoolNoAbort {
thread_pool: self.0.build()?,
active_operations: AtomicUsize::new(0),
pool_catched_panic,
})
}
}

View File

@@ -27,6 +27,12 @@ use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding;
use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError};
/// Note that the FrameProducer requires up to 9 bytes to
/// encode the length, the max grant has been computed accordingly.
///
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
const MAX_FRAME_HEADER_SIZE: usize = 9;
/// Creates a tuple of senders/receiver to be used by
/// the extractors and the writer loop.
///
@@ -53,8 +59,9 @@ pub fn extractor_writer_bbqueue(
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
let capacity = bbbuffers.first().unwrap().capacity();
// Read the field description to understand this
let capacity = capacity.checked_sub(9).unwrap();
// 1. Due to fragmentation in the bbbuffer, we can only accept up to half the capacity in a single message.
// 2. Read the documentation for `MAX_FRAME_HEADER_SIZE` for more information about why it is here.
let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap();
let producers = ThreadLocal::with_capacity(bbbuffers.len());
let consumers = rayon::broadcast(|bi| {
@@ -65,7 +72,7 @@ pub fn extractor_writer_bbqueue(
});
let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, capacity };
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
let receiver = WriterBbqueueReceiver {
receiver,
look_at_consumer: (0..consumers.len()).cycle(),
@@ -81,13 +88,10 @@ pub struct ExtractorBbqueueSender<'a> {
/// A memory buffer, one by thread, is used to serialize
/// the entries directly in this shared, lock-free space.
producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
/// The capacity of this frame producer, will never be able to store more than that.
///
/// Note that the FrameProducer requires up to 9 bytes to encode the length,
/// the capacity has been shrunk accordingly.
///
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
capacity: usize,
/// The maximum frame grant that a producer can reserve.
/// It will never be able to store more than that as the
/// buffer cannot split data into two parts.
max_grant: usize,
}
pub struct WriterBbqueueReceiver<'a> {
@@ -443,14 +447,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
let capacity = self.capacity;
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
let total_length = EntryHeader::total_delete_vector_size();
if total_length > capacity {
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
if total_length > max_grant {
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
}
// Spin loop to have a frame the size we requested.
@@ -468,7 +472,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
embedder_id: u8,
embeddings: &[Vec<f32>],
) -> crate::Result<()> {
let capacity = self.capacity;
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
@@ -479,7 +483,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
if total_length > capacity {
if total_length > max_grant {
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
for embedding in embeddings {
let mut embedding_bytes = bytemuck::cast_slice(embedding);
@@ -540,14 +544,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
where
F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>,
{
let capacity = self.capacity;
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let operation = DbOperation { database, key_length: Some(key_length) };
let payload_header = EntryHeader::DbOperation(operation);
let total_length = EntryHeader::total_key_value_size(key_length, value_length);
if total_length > capacity {
if total_length > max_grant {
let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice();
let value_file = tempfile::tempfile()?;
value_file.set_len(value_length.try_into().unwrap())?;
@@ -601,7 +605,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
where
F: FnOnce(&mut [u8]) -> crate::Result<()>,
{
let capacity = self.capacity;
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
@@ -610,8 +614,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
let operation = DbOperation { database, key_length: None };
let payload_header = EntryHeader::DbOperation(operation);
let total_length = EntryHeader::total_key_size(key_length);
if total_length > capacity {
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
if total_length > max_grant {
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
}
// Spin loop to have a frame the size we requested.

View File

@@ -93,17 +93,25 @@ where
..grenad_parameters
};
// We compute and remove the allocated BBQueues buffers capacity from the indexing memory.
let minimum_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
// 5% percent of the allocated memory for the extractors, or min 100MiB
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
//
// Minimum capacity for bbqueues
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default
(
GrenadParameters {
max_memory: Some(minimum_total_extractors_capacity),
..grenad_parameters
},
minimum_total_bbbuffer_capacity,
), // 100 MiB by thread by default
|max_memory| {
// 2% of the indexing memory
let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity);
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
let new_grenad_parameters = GrenadParameters {
max_memory: Some(
max_memory.saturating_sub(total_bbbuffer_capacity).max(100 * 1024 * 1024),
),
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)

View File

@@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::DistributionShift;
use super::{DistributionShift, REQUEST_PARALLELISM};
use crate::error::FaultSource;
use crate::vector::Embedding;
use crate::ThreadPoolNoAbort;
@@ -98,14 +98,20 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub(crate) fn embed_chunks_ref(
@@ -113,20 +119,32 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub fn chunk_count_hint(&self) -> usize {

View File

@@ -6,7 +6,7 @@ use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, NewEmbedderError};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::DistributionShift;
use super::{DistributionShift, REQUEST_PARALLELISM};
use crate::error::FaultSource;
use crate::vector::error::EmbedErrorKind;
use crate::vector::Embedding;
@@ -255,14 +255,20 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub(crate) fn embed_chunks_ref(
@@ -270,20 +276,31 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub fn chunk_count_hint(&self) -> usize {

View File

@@ -188,14 +188,20 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub(crate) fn embed_chunks_ref(
@@ -203,20 +209,32 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Embedding>, EmbedError> {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None))
.collect();
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub fn chunk_count_hint(&self) -> usize {