Compare commits

...

7 Commits

Author SHA1 Message Date
Kerollmops
1bafd9a63c Create a new export documents meilitool subcommand based on v1.12 2025-02-04 12:35:11 +01:00
meili-bors[bot]
876084d480 Merge #5294
5294: Accept the max readers param by env var and reduce rayon tasks r=dureuill a=Kerollmops

This PR fixes a customer's issue with the maximum number of readers. You can find more info on [this support issue](https://github.com/meilisearch/meilisearch-support/issues/643) and [this Slack channel](https://meilisearch.slack.com/archives/C03T1T47TUG/p1737966988042699). It allows configuring the max readers via the `MEILI_INDEX_MAX_READERS` environment variable, logs API calls, reduces rayon operations to prevent the MDB_READERS_FULL errors, and is aimed for inclusion in version 1.12.8 with an experimental variable for adjusting max readers.

The prototype is named `prototype-accept-env-var-max-reader-4` and [has been built in the CI](https://github.com/meilisearch/meilisearch/actions/runs/13028049950)  but there is the new one `prototype-accept-env-var-max-reader-5` that is [being built in this CI](https://github.com/meilisearch/meilisearch/actions/runs/13035529978).

## Before Merging in v1.12.8
- [x] Remove the commit that unwraps everywhere.
- [ ] (optional) Fix the `internal: unexpected end of hex escape at line 1 column 5149` error.

Co-authored-by: Kerollmops <clement@meilisearch.com>
2025-01-30 10:52:23 +00:00
Kerollmops
350093baa3 Add a link to the experimental feature GitHub discussion 2025-01-30 11:43:01 +01:00
Kerollmops
24e0919d15 Better document the rayon limitation condition 2025-01-30 11:29:41 +01:00
Kerollmops
4b488b2baf Do not create too many rayon tasks when processing the settings 2025-01-30 11:24:49 +01:00
meili-bors[bot]
9bcb271f00 Merge #5297
5297: Update version for the next release (v1.12.8) in Cargo.toml r=Kerollmops a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: Kerollmops <Kerollmops@users.noreply.github.com>
2025-01-29 10:06:49 +00:00
Kerollmops
9f5ac967a0 Update version for the next release (v1.12.8) in Cargo.toml 2025-01-29 10:01:28 +00:00
7 changed files with 184 additions and 44 deletions

34
Cargo.lock generated
View File

@@ -496,7 +496,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
[[package]]
name = "benchmarks"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"anyhow",
"bumpalo",
@@ -689,7 +689,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"anyhow",
"time",
@@ -1664,7 +1664,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"anyhow",
"big_s",
@@ -1876,7 +1876,7 @@ checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
[[package]]
name = "file-store"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"tempfile",
"thiserror",
@@ -1898,7 +1898,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"insta",
"nom",
@@ -1918,7 +1918,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"criterion",
"serde_json",
@@ -2057,7 +2057,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"arbitrary",
"bumpalo",
@@ -2624,7 +2624,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
[[package]]
name = "index-scheduler"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"anyhow",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2822,7 +2822,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"criterion",
"serde_json",
@@ -3441,7 +3441,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"insta",
"md5",
@@ -3450,7 +3450,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"actix-cors",
"actix-http",
@@ -3540,7 +3540,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@@ -3559,7 +3559,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"actix-web",
"anyhow",
@@ -3592,7 +3592,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"anyhow",
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
@@ -3627,7 +3627,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"allocator-api2",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -4083,7 +4083,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permissive-json-pointer"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"big_s",
"serde_json",
@@ -6486,7 +6486,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.12.7"
version = "1.12.8"
dependencies = [
"anyhow",
"build-info",

View File

@@ -22,7 +22,7 @@ members = [
]
[workspace.package]
version = "1.12.7"
version = "1.12.8"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@@ -305,6 +305,9 @@ fn create_or_open_index(
let mut options = EnvOpenOptions::new();
options.map_size(clamp_to_page_size(map_size));
// You can find more details about this experimental
// environment variable on the following GitHub discussion:
// <https://github.com/orgs/meilisearch/discussions/806>
let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") {
Ok(value) => u32::from_str(&value).unwrap(),
Err(VarError::NotPresent) => 1024,

View File

@@ -1,5 +1,5 @@
use std::fs::{read_dir, read_to_string, remove_file, File};
use std::io::BufWriter;
use std::io::{BufWriter, Write as _};
use std::path::PathBuf;
use anyhow::Context;
@@ -9,11 +9,15 @@ use file_store::FileStore;
use meilisearch_auth::AuthController;
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
use meilisearch_types::milli;
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::{obkv_to_json, BEU32};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::versioning::{get_version, parse_version};
use meilisearch_types::Index;
use milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use serde_json::Value::Object;
use time::macros::format_description;
use time::OffsetDateTime;
use upgrade::OfflineUpgrade;
@@ -65,6 +69,20 @@ enum Command {
skip_enqueued_tasks: bool,
},
/// Exports the documents of an index in NDJSON format from a Meilisearch index to stdout.
///
/// This command can be executed on a running Meilisearch database. However, please note that
/// it will maintain a read-only transaction for the duration of the extraction process.
ExportDocuments {
/// The index name to export the documents from.
#[arg(long)]
index_name: String,
/// Do not export vectors with the documents.
#[arg(long)]
ignore_vectors: bool,
},
/// Attempts to upgrade from one major version to the next without a dump.
///
/// Make sure to run this commmand when Meilisearch is not running!
@@ -90,6 +108,9 @@ fn main() -> anyhow::Result<()> {
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
}
Command::ExportDocuments { index_name, ignore_vectors } => {
export_documents(db_path, index_name, ignore_vectors)
}
Command::OfflineUpgrade { target_version } => {
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
@@ -188,7 +209,7 @@ fn export_a_dump(
dump_dir: PathBuf,
skip_enqueued_tasks: bool,
detected_version: (String, String, String),
) -> Result<(), anyhow::Error> {
) -> anyhow::Result<()> {
let started_at = OffsetDateTime::now_utc();
// 1. Extracts the instance UID from disk
@@ -351,3 +372,95 @@ fn export_a_dump(
Ok(())
}
fn export_documents(
db_path: PathBuf,
index_name: String,
ignore_vectors: bool,
) -> anyhow::Result<()> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let rtxn = env.read_txn()?;
let index_mapping: Database<Str, UuidCodec> =
try_opening_database(&env, &rtxn, "index-mapping")?;
for result in index_mapping.iter(&rtxn)? {
let (uid, uuid) = result?;
if uid == index_name {
let index_path = db_path.join("indexes").join(uuid.to_string());
let index = Index::new(EnvOpenOptions::new(), &index_path).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let rtxn = index.read_txn()?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let embedding_configs = index.embedding_configs(&rtxn)?;
let mut stdout = BufWriter::new(std::io::stdout());
for ret in index.all_documents(&rtxn)? {
let (id, doc) = ret?;
let mut document = obkv_to_json(&all_fields, &fields_ids_map, doc)?;
if !ignore_vectors {
'inject_vectors: {
let embeddings = index.embeddings(&rtxn, id)?;
if embeddings.is_empty() {
break 'inject_vectors;
}
let vectors = document
.entry(RESERVED_VECTORS_FIELD_NAME)
.or_insert(Object(Default::default()));
let Object(vectors) = vectors else {
return Err(meilisearch_types::milli::Error::UserError(
meilisearch_types::milli::UserError::InvalidVectorsMapType {
document_id: {
if let Ok(Some(Ok(index))) = index
.external_id_of(&rtxn, std::iter::once(id))
.map(|it| it.into_iter().next())
{
index
} else {
format!("internal docid={id}")
}
},
value: vectors.clone(),
},
)
.into());
};
for (embedder_name, embeddings) in embeddings {
let user_provided = embedding_configs
.iter()
.find(|conf| conf.name == embedder_name)
.is_some_and(|conf| conf.user_provided.contains(id));
let embeddings = ExplicitVectors {
embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors(
embeddings,
)),
regenerate: !user_provided,
};
vectors
.insert(embedder_name, serde_json::to_value(embeddings).unwrap());
}
}
}
serde_json::to_writer(&mut stdout, &document)?;
}
stdout.flush()?;
} else {
eprintln!("Found index {uid} but it's not the right index...");
}
}
Ok(())
}

View File

@@ -98,14 +98,20 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub(crate) fn embed_chunks_ref(
@@ -113,6 +119,8 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())

View File

@@ -255,14 +255,20 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub(crate) fn embed_chunks_ref(
@@ -270,6 +276,8 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())

View File

@@ -188,14 +188,20 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub(crate) fn embed_chunks_ref(
@@ -203,6 +209,8 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Embedding>, EmbedError> {
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())