mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-21 05:41:01 +00:00
Compare commits
54 Commits
lazy-word-
...
v1.12.5
Author | SHA1 | Date | |
---|---|---|---|
1c78447226 | |||
c55891f73b | |||
40f8c0d840 | |||
34d8c1a903 | |||
3c9483b6e0 | |||
8c789b3c7a | |||
3403eae9ee | |||
11458eefd9 | |||
289eb92bef | |||
cea0c89212 | |||
1cadab9ad8 | |||
6383f8f19e | |||
8a9f952bda | |||
a5c44b4d79 | |||
8c35744848 | |||
c0d414fc3c | |||
b56358f606 | |||
b84c0a5390 | |||
ce621e447e | |||
aee74f47aa | |||
be2717edbd | |||
c66841626e | |||
d0bc8c755a | |||
031abfd281 | |||
27169bc7b4 | |||
181a01f8d8 | |||
1d153c1867 | |||
5fde2a3ee1 | |||
4465a1a3c9 | |||
e342ae1b46 | |||
dcb4c49cf2 | |||
e83c021755 | |||
7ec7200378 | |||
6a577254fa | |||
fd88c834c3 | |||
b4005593f4 | |||
8ee3793259 | |||
3648abbfd5 | |||
4d2433de12 | |||
28cc6df7a3 | |||
7b14cb10a1 | |||
34f4602ae8 | |||
12e21a177b | |||
7a9290aaae | |||
5d219587b8 | |||
6e9aa49893 | |||
6b3a2c7281 | |||
5908aec6cb | |||
19f48c15fb | |||
47b484c07c | |||
7d5e28b475 | |||
0648e06aa2 | |||
33921747b7 | |||
970a489dcc |
34
Cargo.lock
generated
34
Cargo.lock
generated
@ -496,7 +496,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "benchmarks"
|
name = "benchmarks"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bumpalo",
|
"bumpalo",
|
||||||
@ -689,7 +689,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "build-info"
|
name = "build-info"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"time",
|
"time",
|
||||||
@ -1664,7 +1664,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "dump"
|
name = "dump"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"big_s",
|
"big_s",
|
||||||
@ -1876,7 +1876,7 @@ checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "file-store"
|
name = "file-store"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
@ -1898,7 +1898,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "filter-parser"
|
name = "filter-parser"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"insta",
|
"insta",
|
||||||
"nom",
|
"nom",
|
||||||
@ -1918,7 +1918,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flatten-serde-json"
|
name = "flatten-serde-json"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"criterion",
|
"criterion",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@ -2057,7 +2057,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fuzzers"
|
name = "fuzzers"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arbitrary",
|
"arbitrary",
|
||||||
"bumpalo",
|
"bumpalo",
|
||||||
@ -2624,7 +2624,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "index-scheduler"
|
name = "index-scheduler"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
@ -2822,7 +2822,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "json-depth-checker"
|
name = "json-depth-checker"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"criterion",
|
"criterion",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@ -3441,7 +3441,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meili-snap"
|
name = "meili-snap"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"insta",
|
"insta",
|
||||||
"md5",
|
"md5",
|
||||||
@ -3450,7 +3450,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meilisearch"
|
name = "meilisearch"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-cors",
|
"actix-cors",
|
||||||
"actix-http",
|
"actix-http",
|
||||||
@ -3540,7 +3540,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meilisearch-auth"
|
name = "meilisearch-auth"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.22.1",
|
"base64 0.22.1",
|
||||||
"enum-iterator",
|
"enum-iterator",
|
||||||
@ -3559,7 +3559,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meilisearch-types"
|
name = "meilisearch-types"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-web",
|
"actix-web",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
@ -3592,7 +3592,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meilitool"
|
name = "meilitool"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
|
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
|
||||||
@ -3627,7 +3627,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "milli"
|
name = "milli"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"allocator-api2",
|
"allocator-api2",
|
||||||
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
@ -4083,7 +4083,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "permissive-json-pointer"
|
name = "permissive-json-pointer"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"big_s",
|
"big_s",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@ -6486,7 +6486,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "xtask"
|
name = "xtask"
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"build-info",
|
"build-info",
|
||||||
|
@ -22,7 +22,7 @@ members = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "1.12.0"
|
version = "1.12.5"
|
||||||
authors = [
|
authors = [
|
||||||
"Quentin de Quelen <quentin@dequelen.me>",
|
"Quentin de Quelen <quentin@dequelen.me>",
|
||||||
"Clément Renault <clement@meilisearch.com>",
|
"Clément Renault <clement@meilisearch.com>",
|
||||||
|
@ -29,7 +29,7 @@ use bumpalo::Bump;
|
|||||||
use dump::IndexMetadata;
|
use dump::IndexMetadata;
|
||||||
use meilisearch_types::batches::BatchId;
|
use meilisearch_types::batches::BatchId;
|
||||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
|
use meilisearch_types::milli::documents::PrimaryKey;
|
||||||
use meilisearch_types::milli::heed::CompactionOption;
|
use meilisearch_types::milli::heed::CompactionOption;
|
||||||
use meilisearch_types::milli::progress::Progress;
|
use meilisearch_types::milli::progress::Progress;
|
||||||
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
|
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
|
||||||
@ -819,6 +819,13 @@ impl IndexScheduler {
|
|||||||
t.started_at = Some(started_at);
|
t.started_at = Some(started_at);
|
||||||
t.finished_at = Some(finished_at);
|
t.finished_at = Some(finished_at);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Patch the task to remove the batch uid, because as of v1.12.5 batches are not persisted.
|
||||||
|
// This prevent from referencing *future* batches not actually associated with the task.
|
||||||
|
//
|
||||||
|
// See <https://github.com/meilisearch/meilisearch/issues/5247> for details.
|
||||||
|
t.batch_uid = None;
|
||||||
|
|
||||||
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
|
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
|
||||||
|
|
||||||
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
|
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
|
||||||
@ -829,21 +836,18 @@ impl IndexScheduler {
|
|||||||
if status == Status::Enqueued {
|
if status == Status::Enqueued {
|
||||||
let content_file = self.file_store.get_update(content_file)?;
|
let content_file = self.file_store.get_update(content_file)?;
|
||||||
|
|
||||||
let reader = DocumentsBatchReader::from_reader(content_file)
|
for document in
|
||||||
.map_err(|e| Error::from_milli(e.into(), None))?;
|
serde_json::de::Deserializer::from_reader(content_file).into_iter()
|
||||||
|
|
||||||
let (mut cursor, documents_batch_index) =
|
|
||||||
reader.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
while let Some(doc) = cursor
|
|
||||||
.next_document()
|
|
||||||
.map_err(|e| Error::from_milli(e.into(), None))?
|
|
||||||
{
|
{
|
||||||
dump_content_file.push_document(
|
let document = document.map_err(|e| {
|
||||||
&obkv_to_object(doc, &documents_batch_index)
|
Error::from_milli(
|
||||||
.map_err(|e| Error::from_milli(e, None))?,
|
milli::InternalError::SerdeJson(e).into(),
|
||||||
)?;
|
None,
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
dump_content_file.push_document(&document)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
dump_content_file.flush()?;
|
dump_content_file.flush()?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1312,9 +1316,7 @@ impl IndexScheduler {
|
|||||||
if let DocumentOperation::Add(content_uuid) = operation {
|
if let DocumentOperation::Add(content_uuid) = operation {
|
||||||
let content_file = self.file_store.get_update(*content_uuid)?;
|
let content_file = self.file_store.get_update(*content_uuid)?;
|
||||||
let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
|
let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
|
||||||
if !mmap.is_empty() {
|
content_files.push(mmap);
|
||||||
content_files.push(mmap);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,7 +55,6 @@ use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFea
|
|||||||
use meilisearch_types::heed::byteorder::BE;
|
use meilisearch_types::heed::byteorder::BE;
|
||||||
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128};
|
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128};
|
||||||
use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn};
|
use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
|
||||||
use meilisearch_types::milli::index::IndexEmbeddingConfig;
|
use meilisearch_types::milli::index::IndexEmbeddingConfig;
|
||||||
use meilisearch_types::milli::update::IndexerConfig;
|
use meilisearch_types::milli::update::IndexerConfig;
|
||||||
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
|
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
|
||||||
@ -2017,14 +2016,19 @@ impl<'a> Dump<'a> {
|
|||||||
task: TaskDump,
|
task: TaskDump,
|
||||||
content_file: Option<Box<UpdateFile>>,
|
content_file: Option<Box<UpdateFile>>,
|
||||||
) -> Result<Task> {
|
) -> Result<Task> {
|
||||||
|
let task_has_no_docs = matches!(task.kind, KindDump::DocumentImport { documents_count, .. } if documents_count == 0);
|
||||||
|
|
||||||
let content_uuid = match content_file {
|
let content_uuid = match content_file {
|
||||||
Some(content_file) if task.status == Status::Enqueued => {
|
Some(content_file) if task.status == Status::Enqueued => {
|
||||||
let (uuid, mut file) = self.index_scheduler.create_update_file(false)?;
|
let (uuid, file) = self.index_scheduler.create_update_file(false)?;
|
||||||
let mut builder = DocumentsBatchBuilder::new(&mut file);
|
let mut writer = io::BufWriter::new(file);
|
||||||
for doc in content_file {
|
for doc in content_file {
|
||||||
builder.append_json_object(&doc?)?;
|
let doc = doc?;
|
||||||
|
serde_json::to_writer(&mut writer, &doc).map_err(|e| {
|
||||||
|
Error::from_milli(milli::InternalError::SerdeJson(e).into(), None)
|
||||||
|
})?;
|
||||||
}
|
}
|
||||||
builder.into_inner()?;
|
let file = writer.into_inner().map_err(|e| e.into_error())?;
|
||||||
file.persist()?;
|
file.persist()?;
|
||||||
|
|
||||||
Some(uuid)
|
Some(uuid)
|
||||||
@ -2032,6 +2036,12 @@ impl<'a> Dump<'a> {
|
|||||||
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
|
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
|
||||||
// in case we try to open it later.
|
// in case we try to open it later.
|
||||||
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
|
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
|
||||||
|
None if task.status == Status::Enqueued && task_has_no_docs => {
|
||||||
|
let (uuid, file) = self.index_scheduler.create_update_file(false)?;
|
||||||
|
file.persist()?;
|
||||||
|
|
||||||
|
Some(uuid)
|
||||||
|
}
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -291,7 +291,10 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
debug_assert!(old_task != *task);
|
debug_assert!(old_task != *task);
|
||||||
debug_assert_eq!(old_task.uid, task.uid);
|
debug_assert_eq!(old_task.uid, task.uid);
|
||||||
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
|
debug_assert!(
|
||||||
|
old_task.batch_uid.is_none() && task.batch_uid.is_some(),
|
||||||
|
"\n==> old: {old_task:?}\n==> new: {task:?}"
|
||||||
|
);
|
||||||
|
|
||||||
if old_task.status != task.status {
|
if old_task.status != task.status {
|
||||||
self.update_status(wtxn, old_task.status, |bitmap| {
|
self.update_status(wtxn, old_task.status, |bitmap| {
|
||||||
|
@ -1220,9 +1220,89 @@ async fn replace_document() {
|
|||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn add_no_documents() {
|
async fn add_no_documents() {
|
||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
let index = server.index("test");
|
let index = server.index("kefir");
|
||||||
let (_response, code) = index.add_documents(json!([]), None).await;
|
let (task, code) = index.add_documents(json!([]), None).await;
|
||||||
snapshot!(code, @"202 Accepted");
|
snapshot!(code, @"202 Accepted");
|
||||||
|
let task = server.wait_task(task.uid()).await;
|
||||||
|
let task = task.succeeded();
|
||||||
|
snapshot!(task, @r#"
|
||||||
|
{
|
||||||
|
"uid": "[uid]",
|
||||||
|
"batchUid": "[batch_uid]",
|
||||||
|
"indexUid": "kefir",
|
||||||
|
"status": "succeeded",
|
||||||
|
"type": "documentAdditionOrUpdate",
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"receivedDocuments": 0,
|
||||||
|
"indexedDocuments": 0
|
||||||
|
},
|
||||||
|
"error": null,
|
||||||
|
"duration": "[duration]",
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]"
|
||||||
|
}
|
||||||
|
"#);
|
||||||
|
|
||||||
|
let (task, _code) = index.add_documents(json!([]), Some("kefkef")).await;
|
||||||
|
let task = server.wait_task(task.uid()).await;
|
||||||
|
let task = task.succeeded();
|
||||||
|
snapshot!(task, @r#"
|
||||||
|
{
|
||||||
|
"uid": "[uid]",
|
||||||
|
"batchUid": "[batch_uid]",
|
||||||
|
"indexUid": "kefir",
|
||||||
|
"status": "succeeded",
|
||||||
|
"type": "documentAdditionOrUpdate",
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"receivedDocuments": 0,
|
||||||
|
"indexedDocuments": 0
|
||||||
|
},
|
||||||
|
"error": null,
|
||||||
|
"duration": "[duration]",
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]"
|
||||||
|
}
|
||||||
|
"#);
|
||||||
|
|
||||||
|
let (task, _code) = index.add_documents(json!([{ "kefkef": 1 }]), None).await;
|
||||||
|
let task = server.wait_task(task.uid()).await;
|
||||||
|
let task = task.succeeded();
|
||||||
|
snapshot!(task, @r#"
|
||||||
|
{
|
||||||
|
"uid": "[uid]",
|
||||||
|
"batchUid": "[batch_uid]",
|
||||||
|
"indexUid": "kefir",
|
||||||
|
"status": "succeeded",
|
||||||
|
"type": "documentAdditionOrUpdate",
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"receivedDocuments": 1,
|
||||||
|
"indexedDocuments": 1
|
||||||
|
},
|
||||||
|
"error": null,
|
||||||
|
"duration": "[duration]",
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]"
|
||||||
|
}
|
||||||
|
"#);
|
||||||
|
let (documents, _status) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
||||||
|
snapshot!(documents, @r#"
|
||||||
|
{
|
||||||
|
"results": [
|
||||||
|
{
|
||||||
|
"kefkef": 1
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"offset": 0,
|
||||||
|
"limit": 20,
|
||||||
|
"total": 1
|
||||||
|
}
|
||||||
|
"#);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
|
@ -14,11 +14,11 @@ arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/ar
|
|||||||
clap = { version = "4.5.9", features = ["derive"] }
|
clap = { version = "4.5.9", features = ["derive"] }
|
||||||
dump = { path = "../dump" }
|
dump = { path = "../dump" }
|
||||||
file-store = { path = "../file-store" }
|
file-store = { path = "../file-store" }
|
||||||
indexmap = {version = "2.7.0", features = ["serde"]}
|
indexmap = { version = "2.7.0", features = ["serde"] }
|
||||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||||
meilisearch-types = { path = "../meilisearch-types" }
|
meilisearch-types = { path = "../meilisearch-types" }
|
||||||
serde = { version = "1.0.209", features = ["derive"] }
|
serde = { version = "1.0.209", features = ["derive"] }
|
||||||
serde_json = {version = "1.0.133", features = ["preserve_order"]}
|
serde_json = { version = "1.0.133", features = ["preserve_order"] }
|
||||||
tempfile = "3.14.0"
|
tempfile = "3.14.0"
|
||||||
time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] }
|
time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] }
|
||||||
uuid = { version = "1.10.0", features = ["v4"], default-features = false }
|
uuid = { version = "1.10.0", features = ["v4"], default-features = false }
|
||||||
|
@ -88,7 +88,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
match command {
|
match command {
|
||||||
Command::ClearTaskQueue => clear_task_queue(db_path),
|
Command::ClearTaskQueue => clear_task_queue(db_path),
|
||||||
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
|
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
|
||||||
export_a_dump(db_path, dump_dir, skip_enqueued_tasks)
|
export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
|
||||||
}
|
}
|
||||||
Command::OfflineUpgrade { target_version } => {
|
Command::OfflineUpgrade { target_version } => {
|
||||||
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
|
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
|
||||||
@ -187,6 +187,7 @@ fn export_a_dump(
|
|||||||
db_path: PathBuf,
|
db_path: PathBuf,
|
||||||
dump_dir: PathBuf,
|
dump_dir: PathBuf,
|
||||||
skip_enqueued_tasks: bool,
|
skip_enqueued_tasks: bool,
|
||||||
|
detected_version: (String, String, String),
|
||||||
) -> Result<(), anyhow::Error> {
|
) -> Result<(), anyhow::Error> {
|
||||||
let started_at = OffsetDateTime::now_utc();
|
let started_at = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
@ -238,9 +239,6 @@ fn export_a_dump(
|
|||||||
if skip_enqueued_tasks {
|
if skip_enqueued_tasks {
|
||||||
eprintln!("Skip dumping the enqueued tasks...");
|
eprintln!("Skip dumping the enqueued tasks...");
|
||||||
} else {
|
} else {
|
||||||
eprintln!("Dumping the enqueued tasks...");
|
|
||||||
|
|
||||||
// 3. dump the tasks
|
|
||||||
let mut dump_tasks = dump.create_tasks_queue()?;
|
let mut dump_tasks = dump.create_tasks_queue()?;
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
for ret in all_tasks.iter(&rtxn)? {
|
for ret in all_tasks.iter(&rtxn)? {
|
||||||
@ -254,18 +252,39 @@ fn export_a_dump(
|
|||||||
if status == Status::Enqueued {
|
if status == Status::Enqueued {
|
||||||
let content_file = file_store.get_update(content_file_uuid)?;
|
let content_file = file_store.get_update(content_file_uuid)?;
|
||||||
|
|
||||||
let reader =
|
if (
|
||||||
DocumentsBatchReader::from_reader(content_file).with_context(|| {
|
detected_version.0.as_str(),
|
||||||
format!("While reading content file {:?}", content_file_uuid)
|
detected_version.1.as_str(),
|
||||||
})?;
|
detected_version.2.as_str(),
|
||||||
|
) < ("1", "12", "0")
|
||||||
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
|
{
|
||||||
while let Some(doc) = cursor.next_document().with_context(|| {
|
eprintln!("Dumping the enqueued tasks reading them in obkv format...");
|
||||||
format!("While iterating on content file {:?}", content_file_uuid)
|
let reader =
|
||||||
})? {
|
DocumentsBatchReader::from_reader(content_file).with_context(|| {
|
||||||
dump_content_file
|
format!("While reading content file {:?}", content_file_uuid)
|
||||||
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
|
})?;
|
||||||
|
let (mut cursor, documents_batch_index) =
|
||||||
|
reader.into_cursor_and_fields_index();
|
||||||
|
while let Some(doc) = cursor.next_document().with_context(|| {
|
||||||
|
format!("While iterating on content file {:?}", content_file_uuid)
|
||||||
|
})? {
|
||||||
|
dump_content_file
|
||||||
|
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
eprintln!(
|
||||||
|
"Dumping the enqueued tasks reading them in JSON stream format..."
|
||||||
|
);
|
||||||
|
for document in
|
||||||
|
serde_json::de::Deserializer::from_reader(content_file).into_iter()
|
||||||
|
{
|
||||||
|
let document = document.with_context(|| {
|
||||||
|
format!("While reading content file {:?}", content_file_uuid)
|
||||||
|
})?;
|
||||||
|
dump_content_file.push_document(&document)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dump_content_file.flush()?;
|
dump_content_file.flush()?;
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ use std::path::{Path, PathBuf};
|
|||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use meilisearch_types::versioning::create_version_file;
|
use meilisearch_types::versioning::create_version_file;
|
||||||
use v1_10::v1_9_to_v1_10;
|
use v1_10::v1_9_to_v1_10;
|
||||||
use v1_12::v1_11_to_v1_12;
|
use v1_12::{v1_11_to_v1_12, v1_12_to_v1_12_3};
|
||||||
|
|
||||||
use crate::upgrade::v1_11::v1_10_to_v1_11;
|
use crate::upgrade::v1_11::v1_10_to_v1_11;
|
||||||
|
|
||||||
@ -20,12 +20,48 @@ pub struct OfflineUpgrade {
|
|||||||
|
|
||||||
impl OfflineUpgrade {
|
impl OfflineUpgrade {
|
||||||
pub fn upgrade(self) -> anyhow::Result<()> {
|
pub fn upgrade(self) -> anyhow::Result<()> {
|
||||||
|
// Adding a version?
|
||||||
|
//
|
||||||
|
// 1. Update the LAST_SUPPORTED_UPGRADE_FROM_VERSION and LAST_SUPPORTED_UPGRADE_TO_VERSION.
|
||||||
|
// 2. Add new version to the upgrade list if necessary
|
||||||
|
// 3. Use `no_upgrade` as index for versions that are compatible.
|
||||||
|
|
||||||
|
if self.current_version == self.target_version {
|
||||||
|
println!("Database is already at the target version. Exiting.");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.current_version > self.target_version {
|
||||||
|
bail!(
|
||||||
|
"Cannot downgrade from {}.{}.{} to {}.{}.{}. Downgrade not supported",
|
||||||
|
self.current_version.0,
|
||||||
|
self.current_version.1,
|
||||||
|
self.current_version.2,
|
||||||
|
self.target_version.0,
|
||||||
|
self.target_version.1,
|
||||||
|
self.target_version.2
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
|
||||||
|
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5";
|
||||||
|
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
|
||||||
|
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5";
|
||||||
|
|
||||||
let upgrade_list = [
|
let upgrade_list = [
|
||||||
(v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"),
|
(
|
||||||
|
v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>,
|
||||||
|
"1",
|
||||||
|
"10",
|
||||||
|
"0",
|
||||||
|
),
|
||||||
(v1_10_to_v1_11, "1", "11", "0"),
|
(v1_10_to_v1_11, "1", "11", "0"),
|
||||||
(v1_11_to_v1_12, "1", "12", "0"),
|
(v1_11_to_v1_12, "1", "12", "0"),
|
||||||
|
(v1_12_to_v1_12_3, "1", "12", "3"),
|
||||||
];
|
];
|
||||||
|
|
||||||
|
let no_upgrade: usize = upgrade_list.len();
|
||||||
|
|
||||||
let (current_major, current_minor, current_patch) = &self.current_version;
|
let (current_major, current_minor, current_patch) = &self.current_version;
|
||||||
|
|
||||||
let start_at = match (
|
let start_at = match (
|
||||||
@ -36,8 +72,12 @@ impl OfflineUpgrade {
|
|||||||
("1", "9", _) => 0,
|
("1", "9", _) => 0,
|
||||||
("1", "10", _) => 1,
|
("1", "10", _) => 1,
|
||||||
("1", "11", _) => 2,
|
("1", "11", _) => 2,
|
||||||
|
("1", "12", "0" | "1" | "2") => 3,
|
||||||
|
("1", "12", "3" | "4" | "5") => no_upgrade,
|
||||||
_ => {
|
_ => {
|
||||||
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10")
|
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
|
||||||
|
FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
|
||||||
|
LAST_SUPPORTED_UPGRADE_FROM_VERSION);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -46,21 +86,32 @@ impl OfflineUpgrade {
|
|||||||
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
|
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
|
||||||
("1", "10", _) => 0,
|
("1", "10", _) => 0,
|
||||||
("1", "11", _) => 1,
|
("1", "11", _) => 1,
|
||||||
("1", "12", _) => 2,
|
("1", "12", "0" | "1" | "2") => 2,
|
||||||
|
("1", "12", "3" | "4" | "5") => 3,
|
||||||
(major, _, _) if major.starts_with('v') => {
|
(major, _, _) if major.starts_with('v') => {
|
||||||
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
|
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10 and v1.11")
|
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]",
|
||||||
|
FIRST_SUPPORTED_UPGRADE_TO_VERSION,
|
||||||
|
LAST_SUPPORTED_UPGRADE_TO_VERSION);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}");
|
println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}");
|
||||||
|
|
||||||
|
if start_at == no_upgrade {
|
||||||
|
println!("No upgrade operation to perform, writing VERSION file");
|
||||||
|
create_version_file(&self.db_path, target_major, target_minor, target_patch)
|
||||||
|
.context("while writing VERSION file after the upgrade")?;
|
||||||
|
println!("Success");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::needless_range_loop)]
|
#[allow(clippy::needless_range_loop)]
|
||||||
for index in start_at..=ends_at {
|
for index in start_at..=ends_at {
|
||||||
let (func, major, minor, patch) = upgrade_list[index];
|
let (func, major, minor, patch) = upgrade_list[index];
|
||||||
(func)(&self.db_path)?;
|
(func)(&self.db_path, current_major, current_minor, current_patch)?;
|
||||||
println!("Done");
|
println!("Done");
|
||||||
// We're writing the version file just in case an issue arise _while_ upgrading.
|
// We're writing the version file just in case an issue arise _while_ upgrading.
|
||||||
// We don't want the DB to fail in an unknown state.
|
// We don't want the DB to fail in an unknown state.
|
||||||
|
@ -151,7 +151,12 @@ fn date_round_trip(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn v1_9_to_v1_10(db_path: &Path) -> anyhow::Result<()> {
|
pub fn v1_9_to_v1_10(
|
||||||
|
db_path: &Path,
|
||||||
|
_origin_major: &str,
|
||||||
|
_origin_minor: &str,
|
||||||
|
_origin_patch: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
println!("Upgrading from v1.9.0 to v1.10.0");
|
println!("Upgrading from v1.9.0 to v1.10.0");
|
||||||
// 2 changes here
|
// 2 changes here
|
||||||
|
|
||||||
|
@ -14,7 +14,12 @@ use meilisearch_types::milli::index::db_name;
|
|||||||
use crate::uuid_codec::UuidCodec;
|
use crate::uuid_codec::UuidCodec;
|
||||||
use crate::{try_opening_database, try_opening_poly_database};
|
use crate::{try_opening_database, try_opening_poly_database};
|
||||||
|
|
||||||
pub fn v1_10_to_v1_11(db_path: &Path) -> anyhow::Result<()> {
|
pub fn v1_10_to_v1_11(
|
||||||
|
db_path: &Path,
|
||||||
|
_origin_major: &str,
|
||||||
|
_origin_minor: &str,
|
||||||
|
_origin_patch: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
println!("Upgrading from v1.10.0 to v1.11.0");
|
println!("Upgrading from v1.10.0 to v1.11.0");
|
||||||
|
|
||||||
let index_scheduler_path = db_path.join("tasks");
|
let index_scheduler_path = db_path.join("tasks");
|
||||||
|
@ -1,17 +1,34 @@
|
|||||||
//! The breaking changes that happened between the v1.11 and the v1.12 are:
|
//! The breaking changes that happened between the v1.11 and the v1.12 are:
|
||||||
//! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900
|
//! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900
|
||||||
|
|
||||||
|
use std::borrow::Cow;
|
||||||
use std::io::BufWriter;
|
use std::io::BufWriter;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use file_store::FileStore;
|
use file_store::FileStore;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use meilisearch_types::milli::documents::DocumentsBatchReader;
|
use meilisearch_types::milli::documents::DocumentsBatchReader;
|
||||||
|
use meilisearch_types::milli::heed::types::{SerdeJson, Str};
|
||||||
|
use meilisearch_types::milli::heed::{Database, EnvOpenOptions, RoTxn, RwTxn};
|
||||||
|
use meilisearch_types::milli::progress::Step;
|
||||||
|
use meilisearch_types::milli::{FieldDistribution, Index};
|
||||||
|
use serde::Serialize;
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> {
|
use crate::try_opening_database;
|
||||||
|
use crate::uuid_codec::UuidCodec;
|
||||||
|
|
||||||
|
pub fn v1_11_to_v1_12(
|
||||||
|
db_path: &Path,
|
||||||
|
_origin_major: &str,
|
||||||
|
_origin_minor: &str,
|
||||||
|
_origin_patch: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
println!("Upgrading from v1.11.0 to v1.12.0");
|
println!("Upgrading from v1.11.0 to v1.12.0");
|
||||||
|
|
||||||
convert_update_files(db_path)?;
|
convert_update_files(db_path)?;
|
||||||
@ -19,6 +36,23 @@ pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn v1_12_to_v1_12_3(
|
||||||
|
db_path: &Path,
|
||||||
|
origin_major: &str,
|
||||||
|
origin_minor: &str,
|
||||||
|
origin_patch: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
println!("Upgrading from v1.12.{{0, 1, 2}} to v1.12.3");
|
||||||
|
|
||||||
|
if origin_minor == "12" {
|
||||||
|
rebuild_field_distribution(db_path)?;
|
||||||
|
} else {
|
||||||
|
println!("Not rebuilding field distribution as it wasn't corrupted coming from v{origin_major}.{origin_minor}.{origin_patch}");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Convert the update files from OBKV to ndjson format.
|
/// Convert the update files from OBKV to ndjson format.
|
||||||
///
|
///
|
||||||
/// 1) List all the update files using the file store.
|
/// 1) List all the update files using the file store.
|
||||||
@ -77,3 +111,188 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Rebuild field distribution as it was wrongly computed in v1.12.x if x < 3
|
||||||
|
fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
||||||
|
let index_scheduler_path = db_path.join("tasks");
|
||||||
|
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
|
||||||
|
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
||||||
|
|
||||||
|
let mut sched_wtxn = env.write_txn()?;
|
||||||
|
|
||||||
|
let index_mapping: Database<Str, UuidCodec> =
|
||||||
|
try_opening_database(&env, &sched_wtxn, "index-mapping")?;
|
||||||
|
let stats_db: Database<UuidCodec, SerdeJson<IndexStats>> =
|
||||||
|
try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| {
|
||||||
|
format!("While trying to open {:?}", index_scheduler_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let index_count =
|
||||||
|
index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?;
|
||||||
|
|
||||||
|
// FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn
|
||||||
|
// 1. immutably for the iteration
|
||||||
|
// 2. mutably for updating index stats
|
||||||
|
let indexes: Vec<_> = index_mapping
|
||||||
|
.iter(&sched_wtxn)?
|
||||||
|
.map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let progress = meilisearch_types::milli::progress::Progress::default();
|
||||||
|
let finished = AtomicBool::new(false);
|
||||||
|
|
||||||
|
std::thread::scope(|scope| {
|
||||||
|
let display_progress = std::thread::Builder::new()
|
||||||
|
.name("display_progress".into())
|
||||||
|
.spawn_scoped(scope, || {
|
||||||
|
while !finished.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||||
|
let view = progress.as_progress_view();
|
||||||
|
let Ok(view) = serde_json::to_string(&view) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
println!("{view}");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
for (index_index, result) in indexes.into_iter().enumerate() {
|
||||||
|
let (uid, uuid) = result?;
|
||||||
|
progress.update_progress(VariableNameStep::new(
|
||||||
|
&uid,
|
||||||
|
index_index as u32,
|
||||||
|
index_count as u32,
|
||||||
|
));
|
||||||
|
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"[{}/{index_count}]Updating index `{uid}` at `{}`",
|
||||||
|
index_index + 1,
|
||||||
|
index_path.display()
|
||||||
|
);
|
||||||
|
|
||||||
|
println!("\t- Rebuilding field distribution");
|
||||||
|
|
||||||
|
let index = meilisearch_types::milli::Index::new(EnvOpenOptions::new(), &index_path)
|
||||||
|
.with_context(|| {
|
||||||
|
format!("while opening index {uid} at '{}'", index_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mut index_txn = index.write_txn()?;
|
||||||
|
|
||||||
|
meilisearch_types::milli::update::new::reindex::field_distribution(
|
||||||
|
&index,
|
||||||
|
&mut index_txn,
|
||||||
|
&progress,
|
||||||
|
)
|
||||||
|
.context("while rebuilding field distribution")?;
|
||||||
|
|
||||||
|
let stats = IndexStats::new(&index, &index_txn)
|
||||||
|
.with_context(|| format!("computing stats for index `{uid}`"))?;
|
||||||
|
store_stats_of(stats_db, uuid, &mut sched_wtxn, &uid, &stats)?;
|
||||||
|
|
||||||
|
index_txn.commit().context("while committing the write txn for the updated index")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?;
|
||||||
|
|
||||||
|
finished.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
|
if let Err(panic) = display_progress.join() {
|
||||||
|
let msg = match panic.downcast_ref::<&'static str>() {
|
||||||
|
Some(s) => *s,
|
||||||
|
None => match panic.downcast_ref::<String>() {
|
||||||
|
Some(s) => &s[..],
|
||||||
|
None => "Box<dyn Any>",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
eprintln!("WARN: the display thread panicked with {msg}");
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Upgrading database succeeded");
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct VariableNameStep {
|
||||||
|
name: String,
|
||||||
|
current: u32,
|
||||||
|
total: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VariableNameStep {
|
||||||
|
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
|
||||||
|
Self { name: name.into(), current, total }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Step for VariableNameStep {
|
||||||
|
fn name(&self) -> Cow<'static, str> {
|
||||||
|
self.name.clone().into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn current(&self) -> u32 {
|
||||||
|
self.current
|
||||||
|
}
|
||||||
|
|
||||||
|
fn total(&self) -> u32 {
|
||||||
|
self.total
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn store_stats_of(
|
||||||
|
stats_db: Database<UuidCodec, SerdeJson<IndexStats>>,
|
||||||
|
index_uuid: Uuid,
|
||||||
|
sched_wtxn: &mut RwTxn,
|
||||||
|
index_uid: &str,
|
||||||
|
stats: &IndexStats,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
stats_db
|
||||||
|
.put(sched_wtxn, &index_uuid, stats)
|
||||||
|
.with_context(|| format!("storing stats for index `{index_uid}`"))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The statistics that can be computed from an `Index` object.
|
||||||
|
#[derive(Serialize, Debug)]
|
||||||
|
pub struct IndexStats {
|
||||||
|
/// Number of documents in the index.
|
||||||
|
pub number_of_documents: u64,
|
||||||
|
/// Size taken up by the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// This includes the size taken by both the used and free pages of the DB, and as the free pages
|
||||||
|
/// are not returned to the disk after a deletion, this number is typically larger than
|
||||||
|
/// `used_database_size` that only includes the size of the used pages.
|
||||||
|
pub database_size: u64,
|
||||||
|
/// Size taken by the used pages of the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// As the DB backend does not return to the disk the pages that are not currently used by the DB,
|
||||||
|
/// this value is typically smaller than `database_size`.
|
||||||
|
pub used_database_size: u64,
|
||||||
|
/// Association of every field name with the number of times it occurs in the documents.
|
||||||
|
pub field_distribution: FieldDistribution,
|
||||||
|
/// Creation date of the index.
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub created_at: OffsetDateTime,
|
||||||
|
/// Date of the last update of the index.
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub updated_at: OffsetDateTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IndexStats {
|
||||||
|
/// Compute the stats of an index
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
///
|
||||||
|
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
|
||||||
|
pub fn new(index: &Index, rtxn: &RoTxn) -> meilisearch_types::milli::Result<Self> {
|
||||||
|
Ok(IndexStats {
|
||||||
|
number_of_documents: index.number_of_documents(rtxn)?,
|
||||||
|
database_size: index.on_disk_size()?,
|
||||||
|
used_database_size: index.used_size()?,
|
||||||
|
field_distribution: index.field_distribution(rtxn)?,
|
||||||
|
created_at: index.created_at(rtxn)?,
|
||||||
|
updated_at: index.updated_at(rtxn)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -79,22 +79,29 @@ pub const FACET_MIN_LEVEL_SIZE: u8 = 5;
|
|||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::BufReader;
|
use std::io::BufReader;
|
||||||
|
use std::ops::Bound;
|
||||||
|
|
||||||
use grenad::Merger;
|
use grenad::Merger;
|
||||||
use heed::types::{Bytes, DecodeIgnore};
|
use heed::types::{Bytes, DecodeIgnore};
|
||||||
|
use heed::BytesDecode as _;
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use self::incremental::FacetsUpdateIncremental;
|
use self::incremental::FacetsUpdateIncremental;
|
||||||
use super::{FacetsUpdateBulk, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps};
|
use super::{FacetsUpdateBulk, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps};
|
||||||
use crate::facet::FacetType;
|
use crate::facet::FacetType;
|
||||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
|
use crate::heed_codec::facet::{
|
||||||
|
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec, OrderedF64Codec,
|
||||||
|
};
|
||||||
use crate::heed_codec::BytesRefCodec;
|
use crate::heed_codec::BytesRefCodec;
|
||||||
|
use crate::search::facet::get_highest_level;
|
||||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
||||||
use crate::{try_split_array_at, FieldId, Index, Result};
|
use crate::{try_split_array_at, FieldId, Index, Result};
|
||||||
|
|
||||||
pub mod bulk;
|
pub mod bulk;
|
||||||
pub mod incremental;
|
pub mod incremental;
|
||||||
|
pub mod new_incremental;
|
||||||
|
|
||||||
/// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases.
|
/// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases.
|
||||||
///
|
///
|
||||||
@ -646,3 +653,194 @@ mod comparison_bench {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Run sanity checks on the specified fid tree
|
||||||
|
///
|
||||||
|
/// 1. No "orphan" child value, any child value has a parent
|
||||||
|
/// 2. Any docid in the child appears in the parent
|
||||||
|
/// 3. No docid in the parent is missing from all its children
|
||||||
|
/// 4. no group is bigger than max_group_size
|
||||||
|
/// 5. Less than 50% of groups are bigger than group_size
|
||||||
|
/// 6. group size matches the number of children
|
||||||
|
/// 7. max_level is < 255
|
||||||
|
pub(crate) fn sanity_checks(
|
||||||
|
index: &Index,
|
||||||
|
rtxn: &heed::RoTxn,
|
||||||
|
field_id: FieldId,
|
||||||
|
facet_type: FacetType,
|
||||||
|
group_size: usize,
|
||||||
|
_min_level_size: usize, // might add a check on level size later
|
||||||
|
max_group_size: usize,
|
||||||
|
) -> Result<()> {
|
||||||
|
tracing::info!(%field_id, ?facet_type, "performing sanity checks");
|
||||||
|
let database = match facet_type {
|
||||||
|
FacetType::String => {
|
||||||
|
index.facet_id_string_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||||
|
}
|
||||||
|
FacetType::Number => {
|
||||||
|
index.facet_id_f64_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let leaf_prefix: FacetGroupKey<&[u8]> = FacetGroupKey { field_id, level: 0, left_bound: &[] };
|
||||||
|
|
||||||
|
let leaf_it = database.prefix_iter(rtxn, &leaf_prefix)?;
|
||||||
|
|
||||||
|
let max_level = get_highest_level(rtxn, database, field_id)?;
|
||||||
|
if max_level == u8::MAX {
|
||||||
|
panic!("max_level == 255");
|
||||||
|
}
|
||||||
|
|
||||||
|
for leaf in leaf_it {
|
||||||
|
let (leaf_facet_value, leaf_docids) = leaf?;
|
||||||
|
let mut current_level = 0;
|
||||||
|
|
||||||
|
let mut current_parent_facet_value: Option<FacetGroupKey<&[u8]>> = None;
|
||||||
|
let mut current_parent_docids: Option<crate::heed_codec::facet::FacetGroupValue> = None;
|
||||||
|
loop {
|
||||||
|
current_level += 1;
|
||||||
|
if current_level >= max_level {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let parent_key_right_bound = FacetGroupKey {
|
||||||
|
field_id,
|
||||||
|
level: current_level,
|
||||||
|
left_bound: leaf_facet_value.left_bound,
|
||||||
|
};
|
||||||
|
let (parent_facet_value, parent_docids) = database
|
||||||
|
.get_lower_than_or_equal_to(rtxn, &parent_key_right_bound)?
|
||||||
|
.expect("no parent found");
|
||||||
|
if parent_facet_value.level != current_level {
|
||||||
|
panic!(
|
||||||
|
"wrong parent level, found_level={}, expected_level={}",
|
||||||
|
parent_facet_value.level, current_level
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if parent_facet_value.field_id != field_id {
|
||||||
|
panic!("wrong parent fid");
|
||||||
|
}
|
||||||
|
if parent_facet_value.left_bound > leaf_facet_value.left_bound {
|
||||||
|
panic!("wrong parent left bound");
|
||||||
|
}
|
||||||
|
|
||||||
|
if !leaf_docids.bitmap.is_subset(&parent_docids.bitmap) {
|
||||||
|
panic!(
|
||||||
|
"missing docids from leaf in parent, current_level={}, parent={}, child={}, missing={missing:?}, child_len={}, child={:?}",
|
||||||
|
current_level,
|
||||||
|
facet_to_string(parent_facet_value.left_bound, facet_type),
|
||||||
|
facet_to_string(leaf_facet_value.left_bound, facet_type),
|
||||||
|
leaf_docids.bitmap.len(),
|
||||||
|
leaf_docids.bitmap.clone(),
|
||||||
|
missing=leaf_docids.bitmap - parent_docids.bitmap,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(current_parent_facet_value) = current_parent_facet_value {
|
||||||
|
if current_parent_facet_value.field_id != parent_facet_value.field_id {
|
||||||
|
panic!("wrong parent parent fid");
|
||||||
|
}
|
||||||
|
if current_parent_facet_value.level + 1 != parent_facet_value.level {
|
||||||
|
panic!("wrong parent parent level");
|
||||||
|
}
|
||||||
|
if current_parent_facet_value.left_bound < parent_facet_value.left_bound {
|
||||||
|
panic!("wrong parent parent left bound");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(current_parent_docids) = current_parent_docids {
|
||||||
|
if !current_parent_docids.bitmap.is_subset(&parent_docids.bitmap) {
|
||||||
|
panic!("missing docids from intermediate node in parent, parent_level={}, parent={}, intermediate={}, missing={missing:?}, intermediate={:?}",
|
||||||
|
parent_facet_value.level,
|
||||||
|
facet_to_string(parent_facet_value.left_bound, facet_type),
|
||||||
|
facet_to_string(current_parent_facet_value.unwrap().left_bound, facet_type),
|
||||||
|
current_parent_docids.bitmap.clone(),
|
||||||
|
missing=current_parent_docids.bitmap - parent_docids.bitmap,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
current_parent_facet_value = Some(parent_facet_value);
|
||||||
|
current_parent_docids = Some(parent_docids);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tracing::info!(%field_id, ?facet_type, "checked all leaves");
|
||||||
|
|
||||||
|
let mut current_level = max_level;
|
||||||
|
let mut greater_than_group = 0usize;
|
||||||
|
let mut total = 0usize;
|
||||||
|
loop {
|
||||||
|
if current_level == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let child_level = current_level - 1;
|
||||||
|
tracing::info!(%field_id, ?facet_type, %current_level, "checked groups for level");
|
||||||
|
let level_groups_prefix: FacetGroupKey<&[u8]> =
|
||||||
|
FacetGroupKey { field_id, level: current_level, left_bound: &[] };
|
||||||
|
let mut level_groups_it = database.prefix_iter(rtxn, &level_groups_prefix)?.peekable();
|
||||||
|
|
||||||
|
'group_it: loop {
|
||||||
|
let Some(group) = level_groups_it.next() else { break 'group_it };
|
||||||
|
|
||||||
|
let (group_facet_value, group_docids) = group?;
|
||||||
|
let child_left_bound = group_facet_value.left_bound.to_owned();
|
||||||
|
let mut expected_docids = RoaringBitmap::new();
|
||||||
|
let mut expected_size = 0usize;
|
||||||
|
let right_bound = level_groups_it
|
||||||
|
.peek()
|
||||||
|
.and_then(|res| res.as_ref().ok())
|
||||||
|
.map(|(key, _)| key.left_bound);
|
||||||
|
let child_left_bound = FacetGroupKey {
|
||||||
|
field_id,
|
||||||
|
level: child_level,
|
||||||
|
left_bound: child_left_bound.as_slice(),
|
||||||
|
};
|
||||||
|
let child_left_bound = Bound::Included(&child_left_bound);
|
||||||
|
let child_right_bound;
|
||||||
|
let child_right_bound = if let Some(right_bound) = right_bound {
|
||||||
|
child_right_bound =
|
||||||
|
FacetGroupKey { field_id, level: child_level, left_bound: right_bound };
|
||||||
|
Bound::Excluded(&child_right_bound)
|
||||||
|
} else {
|
||||||
|
Bound::Unbounded
|
||||||
|
};
|
||||||
|
let children = database.range(rtxn, &(child_left_bound, child_right_bound))?;
|
||||||
|
for child in children {
|
||||||
|
let (child_facet_value, child_docids) = child?;
|
||||||
|
if child_facet_value.field_id != field_id {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if child_facet_value.level != child_level {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
expected_size += 1;
|
||||||
|
expected_docids |= &child_docids.bitmap;
|
||||||
|
}
|
||||||
|
assert_eq!(expected_size, group_docids.size as usize);
|
||||||
|
assert!(expected_size <= max_group_size);
|
||||||
|
assert_eq!(expected_docids, group_docids.bitmap);
|
||||||
|
total += 1;
|
||||||
|
if expected_size > group_size {
|
||||||
|
greater_than_group += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
current_level -= 1;
|
||||||
|
}
|
||||||
|
if greater_than_group * 2 > total {
|
||||||
|
panic!("too many groups have a size > group_size");
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!("sanity checks OK");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn facet_to_string(facet_value: &[u8], facet_type: FacetType) -> String {
|
||||||
|
match facet_type {
|
||||||
|
FacetType::String => bstr::BStr::new(facet_value).to_string(),
|
||||||
|
FacetType::Number => match OrderedF64Codec::bytes_decode(facet_value) {
|
||||||
|
Ok(value) => value.to_string(),
|
||||||
|
Err(e) => format!("error: {e} (bytes: {facet_value:?}"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
498
crates/milli/src/update/facet/new_incremental.rs
Normal file
498
crates/milli/src/update/facet/new_incremental.rs
Normal file
@ -0,0 +1,498 @@
|
|||||||
|
use std::ops::Bound;
|
||||||
|
|
||||||
|
use heed::types::{Bytes, DecodeIgnore};
|
||||||
|
use heed::{BytesDecode as _, Database, RwTxn};
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
|
use crate::facet::FacetType;
|
||||||
|
use crate::heed_codec::facet::{
|
||||||
|
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
|
||||||
|
};
|
||||||
|
use crate::heed_codec::BytesRefCodec;
|
||||||
|
use crate::search::facet::get_highest_level;
|
||||||
|
use crate::update::valid_facet_value;
|
||||||
|
use crate::{FieldId, Index, Result};
|
||||||
|
|
||||||
|
pub struct FacetsUpdateIncremental {
|
||||||
|
inner: FacetsUpdateIncrementalInner,
|
||||||
|
delta_data: Vec<FacetFieldIdChange>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct FacetsUpdateIncrementalInner {
|
||||||
|
db: Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
|
||||||
|
field_id: FieldId,
|
||||||
|
group_size: u8,
|
||||||
|
min_level_size: u8,
|
||||||
|
max_group_size: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FacetsUpdateIncremental {
|
||||||
|
pub fn new(
|
||||||
|
index: &Index,
|
||||||
|
facet_type: FacetType,
|
||||||
|
field_id: FieldId,
|
||||||
|
delta_data: Vec<FacetFieldIdChange>,
|
||||||
|
group_size: u8,
|
||||||
|
min_level_size: u8,
|
||||||
|
max_group_size: u8,
|
||||||
|
) -> Self {
|
||||||
|
FacetsUpdateIncremental {
|
||||||
|
inner: FacetsUpdateIncrementalInner {
|
||||||
|
db: match facet_type {
|
||||||
|
FacetType::String => index
|
||||||
|
.facet_id_string_docids
|
||||||
|
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>(),
|
||||||
|
FacetType::Number => index
|
||||||
|
.facet_id_f64_docids
|
||||||
|
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>(),
|
||||||
|
},
|
||||||
|
field_id,
|
||||||
|
group_size,
|
||||||
|
min_level_size,
|
||||||
|
max_group_size,
|
||||||
|
},
|
||||||
|
|
||||||
|
delta_data,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::incremental")]
|
||||||
|
pub fn execute(mut self, wtxn: &mut RwTxn) -> Result<()> {
|
||||||
|
if self.delta_data.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
self.delta_data.sort_unstable_by(
|
||||||
|
|FacetFieldIdChange { facet_value: left, .. },
|
||||||
|
FacetFieldIdChange { facet_value: right, .. }| {
|
||||||
|
left.cmp(right)
|
||||||
|
// sort in **reverse** lexicographic order
|
||||||
|
.reverse()
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
self.inner.find_changed_parents(wtxn, self.delta_data)?;
|
||||||
|
|
||||||
|
self.inner.add_or_delete_level(wtxn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FacetsUpdateIncrementalInner {
|
||||||
|
/// WARNING: `changed_children` must be sorted in **reverse** lexicographic order.
|
||||||
|
fn find_changed_parents(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
mut changed_children: Vec<FacetFieldIdChange>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut changed_parents = vec![];
|
||||||
|
for child_level in 0u8..u8::MAX {
|
||||||
|
// child_level < u8::MAX by construction
|
||||||
|
let parent_level = child_level + 1;
|
||||||
|
let parent_level_left_bound: FacetGroupKey<&[u8]> =
|
||||||
|
FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] };
|
||||||
|
|
||||||
|
let mut last_parent: Option<Box<[u8]>> = None;
|
||||||
|
let mut child_it = changed_children
|
||||||
|
// drain all changed children
|
||||||
|
.drain(..)
|
||||||
|
// keep only children whose value is valid in the LMDB sense
|
||||||
|
.filter(|child| valid_facet_value(&child.facet_value));
|
||||||
|
// `while let` rather than `for` because we advance `child_it` inside of the loop
|
||||||
|
'current_level: while let Some(child) = child_it.next() {
|
||||||
|
if let Some(last_parent) = &last_parent {
|
||||||
|
if &child.facet_value >= last_parent {
|
||||||
|
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
|
||||||
|
continue 'current_level;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// need to find a new parent
|
||||||
|
let parent_key_prefix = FacetGroupKey {
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: parent_level,
|
||||||
|
left_bound: &*child.facet_value,
|
||||||
|
};
|
||||||
|
|
||||||
|
let parent = self
|
||||||
|
.db
|
||||||
|
.remap_data_type::<DecodeIgnore>()
|
||||||
|
.rev_range(
|
||||||
|
wtxn,
|
||||||
|
&(
|
||||||
|
Bound::Excluded(&parent_level_left_bound),
|
||||||
|
Bound::Included(&parent_key_prefix),
|
||||||
|
),
|
||||||
|
)?
|
||||||
|
.next();
|
||||||
|
|
||||||
|
match parent {
|
||||||
|
Some(Ok((parent_key, _parent_value))) => {
|
||||||
|
// found parent, cache it for next keys
|
||||||
|
last_parent = Some(parent_key.left_bound.to_owned().into_boxed_slice());
|
||||||
|
|
||||||
|
// add to modified list for parent level
|
||||||
|
changed_parents.push(FacetFieldIdChange {
|
||||||
|
facet_value: parent_key.left_bound.to_owned().into_boxed_slice(),
|
||||||
|
});
|
||||||
|
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
|
||||||
|
}
|
||||||
|
Some(Err(err)) => return Err(err.into()),
|
||||||
|
None => {
|
||||||
|
// no parent for that key
|
||||||
|
let mut parent_it = self
|
||||||
|
.db
|
||||||
|
.remap_data_type::<DecodeIgnore>()
|
||||||
|
.prefix_iter_mut(wtxn, &parent_level_left_bound)?;
|
||||||
|
match parent_it.next() {
|
||||||
|
// 1. left of the current left bound, or
|
||||||
|
Some(Ok((first_key, _first_value))) => {
|
||||||
|
// make sure we don't spill on the neighboring fid (level also included defensively)
|
||||||
|
if first_key.field_id != self.field_id
|
||||||
|
|| first_key.level != parent_level
|
||||||
|
{
|
||||||
|
// max level reached, exit
|
||||||
|
drop(parent_it);
|
||||||
|
self.compute_parent_group(
|
||||||
|
wtxn,
|
||||||
|
child_level,
|
||||||
|
child.facet_value,
|
||||||
|
)?;
|
||||||
|
for child in child_it.by_ref() {
|
||||||
|
self.compute_parent_group(
|
||||||
|
wtxn,
|
||||||
|
child_level,
|
||||||
|
child.facet_value,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
// remove old left bound
|
||||||
|
unsafe { parent_it.del_current()? };
|
||||||
|
drop(parent_it);
|
||||||
|
changed_parents.push(FacetFieldIdChange {
|
||||||
|
facet_value: child.facet_value.clone(),
|
||||||
|
});
|
||||||
|
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
|
||||||
|
// pop all elements in order to visit the new left bound
|
||||||
|
let new_left_bound =
|
||||||
|
&mut changed_parents.last_mut().unwrap().facet_value;
|
||||||
|
for child in child_it.by_ref() {
|
||||||
|
new_left_bound.clone_from(&child.facet_value);
|
||||||
|
|
||||||
|
self.compute_parent_group(
|
||||||
|
wtxn,
|
||||||
|
child_level,
|
||||||
|
child.facet_value,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Err(err)) => return Err(err.into()),
|
||||||
|
// 2. max level reached, exit
|
||||||
|
None => {
|
||||||
|
drop(parent_it);
|
||||||
|
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
|
||||||
|
for child in child_it.by_ref() {
|
||||||
|
self.compute_parent_group(
|
||||||
|
wtxn,
|
||||||
|
child_level,
|
||||||
|
child.facet_value,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if changed_parents.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
drop(child_it);
|
||||||
|
std::mem::swap(&mut changed_children, &mut changed_parents);
|
||||||
|
// changed_parents is now empty because changed_children was emptied by the drain
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn compute_parent_group(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn<'_>,
|
||||||
|
parent_level: u8,
|
||||||
|
parent_left_bound: Box<[u8]>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut range_left_bound: Vec<u8> = parent_left_bound.into();
|
||||||
|
if parent_level == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let child_level = parent_level - 1;
|
||||||
|
|
||||||
|
let parent_key = FacetGroupKey {
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: parent_level,
|
||||||
|
left_bound: &*range_left_bound,
|
||||||
|
};
|
||||||
|
let child_right_bound = self
|
||||||
|
.db
|
||||||
|
.remap_data_type::<DecodeIgnore>()
|
||||||
|
.get_greater_than(wtxn, &parent_key)?
|
||||||
|
.and_then(
|
||||||
|
|(
|
||||||
|
FacetGroupKey {
|
||||||
|
level: right_level,
|
||||||
|
field_id: right_fid,
|
||||||
|
left_bound: right_bound,
|
||||||
|
},
|
||||||
|
_,
|
||||||
|
)| {
|
||||||
|
if parent_level != right_level || self.field_id != right_fid {
|
||||||
|
// there was a greater key, but with a greater level or fid, so not a sibling to the parent: ignore
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(right_bound.to_owned())
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let child_right_bound = match &child_right_bound {
|
||||||
|
Some(right_bound) => Bound::Excluded(FacetGroupKey {
|
||||||
|
left_bound: right_bound.as_slice(),
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: child_level,
|
||||||
|
}),
|
||||||
|
None => Bound::Unbounded,
|
||||||
|
};
|
||||||
|
|
||||||
|
let child_left_key = FacetGroupKey {
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: child_level,
|
||||||
|
left_bound: &*range_left_bound,
|
||||||
|
};
|
||||||
|
let mut child_left_bound = Bound::Included(child_left_key);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// do a first pass on the range to find the number of children
|
||||||
|
let child_count = self
|
||||||
|
.db
|
||||||
|
.remap_data_type::<DecodeIgnore>()
|
||||||
|
.range(wtxn, &(child_left_bound, child_right_bound))?
|
||||||
|
.take(self.max_group_size as usize * 2)
|
||||||
|
.count();
|
||||||
|
let mut child_it = self.db.range(wtxn, &(child_left_bound, child_right_bound))?;
|
||||||
|
|
||||||
|
// pick the right group_size depending on the number of children
|
||||||
|
let group_size = if child_count >= self.max_group_size as usize * 2 {
|
||||||
|
// more than twice the max_group_size => there will be space for at least 2 groups of max_group_size
|
||||||
|
self.max_group_size as usize
|
||||||
|
} else if child_count >= self.group_size as usize {
|
||||||
|
// size in [group_size, max_group_size * 2[
|
||||||
|
// divided by 2 it is between [group_size / 2, max_group_size[
|
||||||
|
// this ensures that the tree is balanced
|
||||||
|
child_count / 2
|
||||||
|
} else {
|
||||||
|
// take everything
|
||||||
|
child_count
|
||||||
|
};
|
||||||
|
|
||||||
|
let res: Result<_> = child_it
|
||||||
|
.by_ref()
|
||||||
|
.take(group_size)
|
||||||
|
// stop if we go to the next level or field id
|
||||||
|
.take_while(|res| match res {
|
||||||
|
Ok((child_key, _)) => {
|
||||||
|
child_key.field_id == self.field_id && child_key.level == child_level
|
||||||
|
}
|
||||||
|
Err(_) => true,
|
||||||
|
})
|
||||||
|
.try_fold(
|
||||||
|
(None, FacetGroupValue { size: 0, bitmap: Default::default() }),
|
||||||
|
|(bounds, mut group_value), child_res| {
|
||||||
|
let (child_key, child_value) = child_res?;
|
||||||
|
let bounds = match bounds {
|
||||||
|
Some((left_bound, _)) => Some((left_bound, child_key.left_bound)),
|
||||||
|
None => Some((child_key.left_bound, child_key.left_bound)),
|
||||||
|
};
|
||||||
|
// max_group_size <= u8::MAX
|
||||||
|
group_value.size += 1;
|
||||||
|
group_value.bitmap |= &child_value.bitmap;
|
||||||
|
Ok((bounds, group_value))
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let (bounds, group_value) = res?;
|
||||||
|
|
||||||
|
let Some((group_left_bound, right_bound)) = bounds else {
|
||||||
|
let update_key = FacetGroupKey {
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: parent_level,
|
||||||
|
left_bound: &*range_left_bound,
|
||||||
|
};
|
||||||
|
drop(child_it);
|
||||||
|
if let Bound::Included(_) = child_left_bound {
|
||||||
|
self.db.delete(wtxn, &update_key)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
|
||||||
|
drop(child_it);
|
||||||
|
let current_left_bound = group_left_bound.to_owned();
|
||||||
|
|
||||||
|
let delete_old_bound = match child_left_bound {
|
||||||
|
Bound::Included(bound) => {
|
||||||
|
if bound.left_bound != current_left_bound {
|
||||||
|
Some(range_left_bound.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
range_left_bound.clear();
|
||||||
|
range_left_bound.extend_from_slice(right_bound);
|
||||||
|
let child_left_key = FacetGroupKey {
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: child_level,
|
||||||
|
left_bound: range_left_bound.as_slice(),
|
||||||
|
};
|
||||||
|
child_left_bound = Bound::Excluded(child_left_key);
|
||||||
|
|
||||||
|
if let Some(old_bound) = delete_old_bound {
|
||||||
|
let update_key = FacetGroupKey {
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: parent_level,
|
||||||
|
left_bound: old_bound.as_slice(),
|
||||||
|
};
|
||||||
|
self.db.delete(wtxn, &update_key)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let update_key = FacetGroupKey {
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: parent_level,
|
||||||
|
left_bound: current_left_bound.as_slice(),
|
||||||
|
};
|
||||||
|
if group_value.bitmap.is_empty() {
|
||||||
|
self.db.delete(wtxn, &update_key)?;
|
||||||
|
} else {
|
||||||
|
self.db.put(wtxn, &update_key, &group_value)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check whether the highest level has exceeded `min_level_size` * `self.group_size`.
|
||||||
|
/// If it has, we must build an addition level above it.
|
||||||
|
/// Then check whether the highest level is under `min_level_size`.
|
||||||
|
/// If it has, we must remove the complete level.
|
||||||
|
pub(crate) fn add_or_delete_level(&self, txn: &mut RwTxn<'_>) -> Result<()> {
|
||||||
|
let highest_level = get_highest_level(txn, self.db, self.field_id)?;
|
||||||
|
let mut highest_level_prefix = vec![];
|
||||||
|
highest_level_prefix.extend_from_slice(&self.field_id.to_be_bytes());
|
||||||
|
highest_level_prefix.push(highest_level);
|
||||||
|
|
||||||
|
let size_highest_level =
|
||||||
|
self.db.remap_types::<Bytes, Bytes>().prefix_iter(txn, &highest_level_prefix)?.count();
|
||||||
|
|
||||||
|
if size_highest_level >= self.group_size as usize * self.min_level_size as usize {
|
||||||
|
self.add_level(txn, highest_level, &highest_level_prefix, size_highest_level)
|
||||||
|
} else if size_highest_level < self.min_level_size as usize && highest_level != 0 {
|
||||||
|
self.delete_level(txn, &highest_level_prefix)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete a level.
|
||||||
|
fn delete_level(&self, txn: &mut RwTxn<'_>, highest_level_prefix: &[u8]) -> Result<()> {
|
||||||
|
let mut to_delete = vec![];
|
||||||
|
let mut iter =
|
||||||
|
self.db.remap_types::<Bytes, Bytes>().prefix_iter(txn, highest_level_prefix)?;
|
||||||
|
for el in iter.by_ref() {
|
||||||
|
let (k, _) = el?;
|
||||||
|
to_delete.push(
|
||||||
|
FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(k)
|
||||||
|
.map_err(heed::Error::Encoding)?
|
||||||
|
.into_owned(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
drop(iter);
|
||||||
|
for k in to_delete {
|
||||||
|
self.db.delete(txn, &k.as_ref())?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build an additional level for the field id.
|
||||||
|
fn add_level(
|
||||||
|
&self,
|
||||||
|
txn: &mut RwTxn<'_>,
|
||||||
|
highest_level: u8,
|
||||||
|
highest_level_prefix: &[u8],
|
||||||
|
size_highest_level: usize,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut groups_iter = self
|
||||||
|
.db
|
||||||
|
.remap_types::<Bytes, FacetGroupValueCodec>()
|
||||||
|
.prefix_iter(txn, highest_level_prefix)?;
|
||||||
|
|
||||||
|
let nbr_new_groups = size_highest_level / self.group_size as usize;
|
||||||
|
let nbr_leftover_elements = size_highest_level % self.group_size as usize;
|
||||||
|
|
||||||
|
let mut to_add = vec![];
|
||||||
|
for _ in 0..nbr_new_groups {
|
||||||
|
let mut first_key = None;
|
||||||
|
let mut values = RoaringBitmap::new();
|
||||||
|
for _ in 0..self.group_size {
|
||||||
|
let (key_bytes, value_i) = groups_iter.next().unwrap()?;
|
||||||
|
let key_i = FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(key_bytes)
|
||||||
|
.map_err(heed::Error::Encoding)?;
|
||||||
|
|
||||||
|
if first_key.is_none() {
|
||||||
|
first_key = Some(key_i);
|
||||||
|
}
|
||||||
|
values |= value_i.bitmap;
|
||||||
|
}
|
||||||
|
let key = FacetGroupKey {
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: highest_level + 1,
|
||||||
|
left_bound: first_key.unwrap().left_bound,
|
||||||
|
};
|
||||||
|
let value = FacetGroupValue { size: self.group_size, bitmap: values };
|
||||||
|
to_add.push((key.into_owned(), value));
|
||||||
|
}
|
||||||
|
// now we add the rest of the level, in case its size is > group_size * min_level_size
|
||||||
|
// this can indeed happen if the min_level_size parameter changes between two calls to `insert`
|
||||||
|
if nbr_leftover_elements > 0 {
|
||||||
|
let mut first_key = None;
|
||||||
|
let mut values = RoaringBitmap::new();
|
||||||
|
for _ in 0..nbr_leftover_elements {
|
||||||
|
let (key_bytes, value_i) = groups_iter.next().unwrap()?;
|
||||||
|
let key_i = FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(key_bytes)
|
||||||
|
.map_err(heed::Error::Encoding)?;
|
||||||
|
|
||||||
|
if first_key.is_none() {
|
||||||
|
first_key = Some(key_i);
|
||||||
|
}
|
||||||
|
values |= value_i.bitmap;
|
||||||
|
}
|
||||||
|
let key = FacetGroupKey {
|
||||||
|
field_id: self.field_id,
|
||||||
|
level: highest_level + 1,
|
||||||
|
left_bound: first_key.unwrap().left_bound,
|
||||||
|
};
|
||||||
|
// Note: nbr_leftover_elements can be casted to a u8 since it is bounded by `max_group_size`
|
||||||
|
// when it is created above.
|
||||||
|
let value = FacetGroupValue { size: nbr_leftover_elements as u8, bitmap: values };
|
||||||
|
to_add.push((key.into_owned(), value));
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(groups_iter);
|
||||||
|
for (key, value) in to_add {
|
||||||
|
self.db.put(txn, &key.as_ref(), &value)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct FacetFieldIdChange {
|
||||||
|
pub facet_value: Box<[u8]>,
|
||||||
|
}
|
@ -10,10 +10,14 @@ use fst::{IntoStreamer, Streamer};
|
|||||||
pub use grenad_helpers::*;
|
pub use grenad_helpers::*;
|
||||||
pub use merge_functions::*;
|
pub use merge_functions::*;
|
||||||
|
|
||||||
use crate::MAX_WORD_LENGTH;
|
use crate::MAX_LMDB_KEY_LENGTH;
|
||||||
|
|
||||||
pub fn valid_lmdb_key(key: impl AsRef<[u8]>) -> bool {
|
pub fn valid_lmdb_key(key: impl AsRef<[u8]>) -> bool {
|
||||||
key.as_ref().len() <= MAX_WORD_LENGTH * 2 && !key.as_ref().is_empty()
|
key.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !key.as_ref().is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn valid_facet_value(facet_value: impl AsRef<[u8]>) -> bool {
|
||||||
|
facet_value.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !facet_value.as_ref().is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Divides one slice into two at an index, returns `None` if mid is out of bounds.
|
/// Divides one slice into two at an index, returns `None` if mid is out of bounds.
|
||||||
|
@ -3334,6 +3334,44 @@ mod tests {
|
|||||||
rtxn.commit().unwrap();
|
rtxn.commit().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn incremental_update_without_changing_facet_distribution() {
|
||||||
|
let index = TempIndex::new();
|
||||||
|
index
|
||||||
|
.add_documents(documents!([
|
||||||
|
{"id": 0, "some_field": "aaa", "other_field": "aaa" },
|
||||||
|
{"id": 1, "some_field": "bbb", "other_field": "bbb" },
|
||||||
|
]))
|
||||||
|
.unwrap();
|
||||||
|
{
|
||||||
|
let rtxn = index.read_txn().unwrap();
|
||||||
|
// count field distribution
|
||||||
|
let results = index.field_distribution(&rtxn).unwrap();
|
||||||
|
assert_eq!(Some(&2), results.get("id"));
|
||||||
|
assert_eq!(Some(&2), results.get("some_field"));
|
||||||
|
assert_eq!(Some(&2), results.get("other_field"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut index = index;
|
||||||
|
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
||||||
|
|
||||||
|
index
|
||||||
|
.add_documents(documents!([
|
||||||
|
{"id": 0, "other_field": "bbb" },
|
||||||
|
{"id": 1, "some_field": "ccc" },
|
||||||
|
]))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
{
|
||||||
|
let rtxn = index.read_txn().unwrap();
|
||||||
|
// count field distribution
|
||||||
|
let results = index.field_distribution(&rtxn).unwrap();
|
||||||
|
assert_eq!(Some(&2), results.get("id"));
|
||||||
|
assert_eq!(Some(&2), results.get("some_field"));
|
||||||
|
assert_eq!(Some(&2), results.get("other_field"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn delete_words_exact_attributes() {
|
fn delete_words_exact_attributes() {
|
||||||
let index = TempIndex::new();
|
let index = TempIndex::new();
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
---
|
---
|
||||||
source: milli/src/update/index_documents/mod.rs
|
source: crates/milli/src/update/index_documents/mod.rs
|
||||||
---
|
---
|
||||||
3 0 48.9021 1 [19, ]
|
3 0 48.9021 1 [19, ]
|
||||||
3 0 49.9314 1 [17, ]
|
3 0 49.9314 1 [17, ]
|
||||||
@ -15,6 +15,11 @@ source: milli/src/update/index_documents/mod.rs
|
|||||||
3 0 50.7453 1 [7, ]
|
3 0 50.7453 1 [7, ]
|
||||||
3 0 50.8466 1 [10, ]
|
3 0 50.8466 1 [10, ]
|
||||||
3 0 51.0537 1 [9, ]
|
3 0 51.0537 1 [9, ]
|
||||||
|
3 1 48.9021 2 [17, 19, ]
|
||||||
|
3 1 50.1793 3 [13, 14, 15, ]
|
||||||
|
3 1 50.4502 4 [0, 3, 8, 12, ]
|
||||||
|
3 1 50.6312 2 [1, 2, ]
|
||||||
|
3 1 50.7453 3 [7, 9, 10, ]
|
||||||
4 0 2.271 1 [17, ]
|
4 0 2.271 1 [17, ]
|
||||||
4 0 2.3708 1 [19, ]
|
4 0 2.3708 1 [19, ]
|
||||||
4 0 2.7637 1 [14, ]
|
4 0 2.7637 1 [14, ]
|
||||||
@ -28,4 +33,3 @@ source: milli/src/update/index_documents/mod.rs
|
|||||||
4 0 3.6957 1 [9, ]
|
4 0 3.6957 1 [9, ]
|
||||||
4 0 3.9623 1 [12, ]
|
4 0 3.9623 1 [12, ]
|
||||||
4 0 4.337 1 [10, ]
|
4 0 4.337 1 [10, ]
|
||||||
|
|
||||||
|
@ -89,7 +89,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
|||||||
.or_default();
|
.or_default();
|
||||||
*entry -= 1;
|
*entry -= 1;
|
||||||
}
|
}
|
||||||
let content = update.updated();
|
let content =
|
||||||
|
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||||
let geo_iter =
|
let geo_iter =
|
||||||
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||||
for res in content.iter_top_level_fields().chain(geo_iter) {
|
for res in content.iter_top_level_fields().chain(geo_iter) {
|
||||||
|
@ -252,6 +252,24 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
|
|||||||
previous_offset = iter.byte_offset();
|
previous_offset = iter.byte_offset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if payload.is_empty() {
|
||||||
|
let result = retrieve_or_guess_primary_key(
|
||||||
|
rtxn,
|
||||||
|
index,
|
||||||
|
new_fields_ids_map,
|
||||||
|
primary_key_from_op,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
match result {
|
||||||
|
Ok(Ok((pk, _))) => {
|
||||||
|
primary_key.get_or_insert(pk);
|
||||||
|
}
|
||||||
|
Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (),
|
||||||
|
Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
|
||||||
|
Err(error) => return Err(error),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
Ok(new_docids_version_offsets)
|
Ok(new_docids_version_offsets)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ use std::sync::{OnceLock, RwLock};
|
|||||||
use std::thread::{self, Builder};
|
use std::thread::{self, Builder};
|
||||||
|
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
|
use bstr::ByteSlice as _;
|
||||||
use bumparaw_collections::RawMap;
|
use bumparaw_collections::RawMap;
|
||||||
use document_changes::{extract, DocumentChanges, IndexingContext};
|
use document_changes::{extract, DocumentChanges, IndexingContext};
|
||||||
pub use document_deletion::DocumentDeletion;
|
pub use document_deletion::DocumentDeletion;
|
||||||
@ -36,6 +37,8 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
|
|||||||
use crate::progress::Progress;
|
use crate::progress::Progress;
|
||||||
use crate::proximity::ProximityPrecision;
|
use crate::proximity::ProximityPrecision;
|
||||||
use crate::update::del_add::DelAdd;
|
use crate::update::del_add::DelAdd;
|
||||||
|
use crate::update::facet::new_incremental::FacetsUpdateIncremental;
|
||||||
|
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
|
||||||
use crate::update::new::extract::EmbeddingExtractor;
|
use crate::update::new::extract::EmbeddingExtractor;
|
||||||
use crate::update::new::merger::merge_and_send_rtree;
|
use crate::update::new::merger::merge_and_send_rtree;
|
||||||
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
|
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
|
||||||
@ -107,7 +110,7 @@ where
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let (extractor_sender, mut writer_receiver) = pool
|
let (extractor_sender, writer_receiver) = pool
|
||||||
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@ -203,6 +206,7 @@ where
|
|||||||
caches,
|
caches,
|
||||||
FacetDatabases::new(index),
|
FacetDatabases::new(index),
|
||||||
index,
|
index,
|
||||||
|
&rtxn,
|
||||||
extractor_sender.facet_docids(),
|
extractor_sender.facet_docids(),
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
@ -421,6 +425,7 @@ where
|
|||||||
let mut arroy_writers = arroy_writers?;
|
let mut arroy_writers = arroy_writers?;
|
||||||
|
|
||||||
{
|
{
|
||||||
|
let mut writer_receiver = writer_receiver;
|
||||||
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
|
|
||||||
@ -580,7 +585,12 @@ fn write_from_bbqueue(
|
|||||||
}
|
}
|
||||||
(key, None) => match database.delete(wtxn, key) {
|
(key, None) => match database.delete(wtxn, key) {
|
||||||
Ok(false) => {
|
Ok(false) => {
|
||||||
unreachable!("We tried to delete an unknown key: {key:?}")
|
tracing::error!(
|
||||||
|
database_name,
|
||||||
|
key_bytes = ?key,
|
||||||
|
formatted_key = ?key.as_bstr(),
|
||||||
|
"Attempt to delete an unknown key"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
@ -735,27 +745,66 @@ fn compute_facet_search_database(
|
|||||||
fn compute_facet_level_database(
|
fn compute_facet_level_database(
|
||||||
index: &Index,
|
index: &Index,
|
||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
facet_field_ids_delta: FacetFieldIdsDelta,
|
mut facet_field_ids_delta: FacetFieldIdsDelta,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() {
|
for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() {
|
||||||
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
|
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
FacetsUpdateBulk::new_not_updating_level_0(
|
match delta {
|
||||||
index,
|
super::merger::FacetFieldIdDelta::Bulk => {
|
||||||
modified_facet_string_ids,
|
tracing::debug!(%fid, "bulk string facet processing");
|
||||||
FacetType::String,
|
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
|
||||||
)
|
.execute(wtxn)?
|
||||||
.execute(wtxn)?;
|
}
|
||||||
|
super::merger::FacetFieldIdDelta::Incremental(delta_data) => {
|
||||||
|
tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing");
|
||||||
|
FacetsUpdateIncremental::new(
|
||||||
|
index,
|
||||||
|
FacetType::String,
|
||||||
|
fid,
|
||||||
|
delta_data,
|
||||||
|
FACET_GROUP_SIZE,
|
||||||
|
FACET_MIN_LEVEL_SIZE,
|
||||||
|
FACET_MAX_GROUP_SIZE,
|
||||||
|
)
|
||||||
|
.execute(wtxn)?
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() {
|
|
||||||
|
for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() {
|
||||||
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
|
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
FacetsUpdateBulk::new_not_updating_level_0(
|
match delta {
|
||||||
|
super::merger::FacetFieldIdDelta::Bulk => {
|
||||||
|
tracing::debug!(%fid, "bulk number facet processing");
|
||||||
|
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number)
|
||||||
|
.execute(wtxn)?
|
||||||
|
}
|
||||||
|
super::merger::FacetFieldIdDelta::Incremental(delta_data) => {
|
||||||
|
tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing");
|
||||||
|
FacetsUpdateIncremental::new(
|
||||||
|
index,
|
||||||
|
FacetType::Number,
|
||||||
|
fid,
|
||||||
|
delta_data,
|
||||||
|
FACET_GROUP_SIZE,
|
||||||
|
FACET_MIN_LEVEL_SIZE,
|
||||||
|
FACET_MAX_GROUP_SIZE,
|
||||||
|
)
|
||||||
|
.execute(wtxn)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debug_assert!(crate::update::facet::sanity_checks(
|
||||||
index,
|
index,
|
||||||
modified_facet_number_ids,
|
wtxn,
|
||||||
|
fid,
|
||||||
FacetType::Number,
|
FacetType::Number,
|
||||||
|
FACET_GROUP_SIZE as usize,
|
||||||
|
FACET_MIN_LEVEL_SIZE as usize,
|
||||||
|
FACET_MAX_GROUP_SIZE as usize,
|
||||||
)
|
)
|
||||||
.execute(wtxn)?;
|
.is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
|
|
||||||
use hashbrown::HashSet;
|
use hashbrown::HashMap;
|
||||||
use heed::types::Bytes;
|
use heed::types::Bytes;
|
||||||
use heed::{Database, RoTxn};
|
use heed::{Database, RoTxn};
|
||||||
use memmap2::Mmap;
|
use memmap2::Mmap;
|
||||||
@ -12,6 +12,7 @@ use super::extract::{
|
|||||||
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
|
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
|
||||||
FacetKind, GeoExtractorData,
|
FacetKind, GeoExtractorData,
|
||||||
};
|
};
|
||||||
|
use crate::update::facet::new_incremental::FacetFieldIdChange;
|
||||||
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
|
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
|
||||||
@ -100,12 +101,18 @@ pub fn merge_and_send_facet_docids<'extractor>(
|
|||||||
mut caches: Vec<BalancedCaches<'extractor>>,
|
mut caches: Vec<BalancedCaches<'extractor>>,
|
||||||
database: FacetDatabases,
|
database: FacetDatabases,
|
||||||
index: &Index,
|
index: &Index,
|
||||||
|
rtxn: &RoTxn,
|
||||||
docids_sender: FacetDocidsSender,
|
docids_sender: FacetDocidsSender,
|
||||||
) -> Result<FacetFieldIdsDelta> {
|
) -> Result<FacetFieldIdsDelta> {
|
||||||
|
let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize;
|
||||||
|
let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize;
|
||||||
|
let max_string_count = max_string_count.clamp(1000, 100_000);
|
||||||
|
let max_number_count = max_number_count.clamp(1000, 100_000);
|
||||||
transpose_and_freeze_caches(&mut caches)?
|
transpose_and_freeze_caches(&mut caches)?
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|frozen| {
|
.map(|frozen| {
|
||||||
let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
|
let mut facet_field_ids_delta =
|
||||||
|
FacetFieldIdsDelta::new(max_string_count, max_number_count);
|
||||||
let rtxn = index.read_txn()?;
|
let rtxn = index.read_txn()?;
|
||||||
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
|
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
|
||||||
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
|
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
|
||||||
@ -126,7 +133,10 @@ pub fn merge_and_send_facet_docids<'extractor>(
|
|||||||
|
|
||||||
Ok(facet_field_ids_delta)
|
Ok(facet_field_ids_delta)
|
||||||
})
|
})
|
||||||
.reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?)))
|
.reduce(
|
||||||
|
|| Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)),
|
||||||
|
|lhs, rhs| Ok(lhs?.merge(rhs?)),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FacetDatabases<'a> {
|
pub struct FacetDatabases<'a> {
|
||||||
@ -155,60 +165,131 @@ impl<'a> FacetDatabases<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug)]
|
||||||
|
pub enum FacetFieldIdDelta {
|
||||||
|
Bulk,
|
||||||
|
Incremental(Vec<FacetFieldIdChange>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FacetFieldIdDelta {
|
||||||
|
fn push(&mut self, facet_value: &[u8], max_count: usize) {
|
||||||
|
*self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) {
|
||||||
|
FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk,
|
||||||
|
FacetFieldIdDelta::Incremental(mut v) => {
|
||||||
|
if v.len() >= max_count {
|
||||||
|
FacetFieldIdDelta::Bulk
|
||||||
|
} else {
|
||||||
|
v.push(FacetFieldIdChange { facet_value: facet_value.into() });
|
||||||
|
FacetFieldIdDelta::Incremental(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn merge(&mut self, rhs: Option<Self>, max_count: usize) {
|
||||||
|
let Some(rhs) = rhs else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
*self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) {
|
||||||
|
(FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk,
|
||||||
|
(
|
||||||
|
FacetFieldIdDelta::Incremental(mut left),
|
||||||
|
FacetFieldIdDelta::Incremental(mut right),
|
||||||
|
) => {
|
||||||
|
if left.len() + right.len() >= max_count {
|
||||||
|
FacetFieldIdDelta::Bulk
|
||||||
|
} else {
|
||||||
|
left.append(&mut right);
|
||||||
|
FacetFieldIdDelta::Incremental(left)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct FacetFieldIdsDelta {
|
pub struct FacetFieldIdsDelta {
|
||||||
/// The field ids that have been modified
|
/// The field ids that have been modified
|
||||||
modified_facet_string_ids: HashSet<FieldId>,
|
modified_facet_string_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
|
||||||
modified_facet_number_ids: HashSet<FieldId>,
|
modified_facet_number_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
|
||||||
|
max_string_count: usize,
|
||||||
|
max_number_count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FacetFieldIdsDelta {
|
impl FacetFieldIdsDelta {
|
||||||
fn register_facet_string_id(&mut self, field_id: FieldId) {
|
pub fn new(max_string_count: usize, max_number_count: usize) -> Self {
|
||||||
self.modified_facet_string_ids.insert(field_id);
|
Self {
|
||||||
|
max_string_count,
|
||||||
|
max_number_count,
|
||||||
|
modified_facet_string_ids: Default::default(),
|
||||||
|
modified_facet_number_ids: Default::default(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_facet_number_id(&mut self, field_id: FieldId) {
|
fn register_facet_string_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
|
||||||
self.modified_facet_number_ids.insert(field_id);
|
self.modified_facet_string_ids
|
||||||
|
.entry(field_id)
|
||||||
|
.or_insert(FacetFieldIdDelta::Incremental(Default::default()))
|
||||||
|
.push(facet_value, self.max_string_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_facet_number_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
|
||||||
|
self.modified_facet_number_ids
|
||||||
|
.entry(field_id)
|
||||||
|
.or_insert(FacetFieldIdDelta::Incremental(Default::default()))
|
||||||
|
.push(facet_value, self.max_number_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_from_key(&mut self, key: &[u8]) {
|
fn register_from_key(&mut self, key: &[u8]) {
|
||||||
let (facet_kind, field_id) = self.extract_key_data(key);
|
let (facet_kind, field_id, facet_value) = self.extract_key_data(key);
|
||||||
match facet_kind {
|
match (facet_kind, facet_value) {
|
||||||
FacetKind::Number => self.register_facet_number_id(field_id),
|
(FacetKind::Number, Some(facet_value)) => {
|
||||||
FacetKind::String => self.register_facet_string_id(field_id),
|
self.register_facet_number_id(field_id, facet_value)
|
||||||
|
}
|
||||||
|
(FacetKind::String, Some(facet_value)) => {
|
||||||
|
self.register_facet_string_id(field_id, facet_value)
|
||||||
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn extract_key_data(&self, key: &[u8]) -> (FacetKind, FieldId) {
|
fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, Option<&'key [u8]>) {
|
||||||
let facet_kind = FacetKind::from(key[0]);
|
let facet_kind = FacetKind::from(key[0]);
|
||||||
let field_id = FieldId::from_be_bytes([key[1], key[2]]);
|
let field_id = FieldId::from_be_bytes([key[1], key[2]]);
|
||||||
(facet_kind, field_id)
|
let facet_value = if key.len() >= 4 {
|
||||||
|
// level is also stored in the key at [3] (always 0)
|
||||||
|
Some(&key[4..])
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
(facet_kind, field_id, facet_value)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn modified_facet_string_ids(&self) -> Option<Vec<FieldId>> {
|
pub fn consume_facet_string_delta(
|
||||||
if self.modified_facet_string_ids.is_empty() {
|
&mut self,
|
||||||
None
|
) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
|
||||||
} else {
|
self.modified_facet_string_ids.drain()
|
||||||
Some(self.modified_facet_string_ids.iter().copied().collect())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn modified_facet_number_ids(&self) -> Option<Vec<FieldId>> {
|
pub fn consume_facet_number_delta(
|
||||||
if self.modified_facet_number_ids.is_empty() {
|
&mut self,
|
||||||
None
|
) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
|
||||||
} else {
|
self.modified_facet_number_ids.drain()
|
||||||
Some(self.modified_facet_number_ids.iter().copied().collect())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn merge(mut self, rhs: Self) -> Self {
|
pub fn merge(mut self, rhs: Self) -> Self {
|
||||||
let Self { modified_facet_number_ids, modified_facet_string_ids } = rhs;
|
// rhs.max_xx_count is assumed to be equal to self.max_xx_count, and so gets unused
|
||||||
modified_facet_number_ids.into_iter().for_each(|fid| {
|
let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs;
|
||||||
self.modified_facet_number_ids.insert(fid);
|
modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| {
|
||||||
|
let old_delta = self.modified_facet_number_ids.remove(&fid);
|
||||||
|
delta.merge(old_delta, self.max_number_count);
|
||||||
|
self.modified_facet_number_ids.insert(fid, delta);
|
||||||
});
|
});
|
||||||
modified_facet_string_ids.into_iter().for_each(|fid| {
|
modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| {
|
||||||
self.modified_facet_string_ids.insert(fid);
|
let old_delta = self.modified_facet_string_ids.remove(&fid);
|
||||||
|
delta.merge(old_delta, self.max_string_count);
|
||||||
|
self.modified_facet_string_ids.insert(fid, delta);
|
||||||
});
|
});
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ pub mod indexer;
|
|||||||
mod merger;
|
mod merger;
|
||||||
mod parallel_iterator_ext;
|
mod parallel_iterator_ext;
|
||||||
mod ref_cell_ext;
|
mod ref_cell_ext;
|
||||||
|
pub mod reindex;
|
||||||
pub(crate) mod steps;
|
pub(crate) mod steps;
|
||||||
pub(crate) mod thread_local;
|
pub(crate) mod thread_local;
|
||||||
pub mod vector_document;
|
pub mod vector_document;
|
||||||
|
38
crates/milli/src/update/new/reindex.rs
Normal file
38
crates/milli/src/update/new/reindex.rs
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
use heed::RwTxn;
|
||||||
|
|
||||||
|
use super::document::{Document, DocumentFromDb};
|
||||||
|
use crate::progress::{self, AtomicSubStep, Progress};
|
||||||
|
use crate::{FieldDistribution, Index, Result};
|
||||||
|
|
||||||
|
pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progress) -> Result<()> {
|
||||||
|
let mut distribution = FieldDistribution::new();
|
||||||
|
|
||||||
|
let document_count = index.number_of_documents(wtxn)?;
|
||||||
|
let field_id_map = index.fields_ids_map(wtxn)?;
|
||||||
|
|
||||||
|
let (update_document_count, sub_step) =
|
||||||
|
AtomicSubStep::<progress::Document>::new(document_count as u32);
|
||||||
|
progress.update_progress(sub_step);
|
||||||
|
|
||||||
|
let docids = index.documents_ids(wtxn)?;
|
||||||
|
|
||||||
|
for docid in docids {
|
||||||
|
update_document_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
|
let Some(document) = DocumentFromDb::new(docid, wtxn, index, &field_id_map)? else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let geo_iter = document.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||||
|
for res in document.iter_top_level_fields().chain(geo_iter) {
|
||||||
|
let (field_name, _) = res?;
|
||||||
|
if let Some(count) = distribution.get_mut(field_name) {
|
||||||
|
*count += 1;
|
||||||
|
} else {
|
||||||
|
distribution.insert(field_name.to_owned(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
index.put_field_distribution(wtxn, &distribution)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
Reference in New Issue
Block a user