mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-08 05:35:42 +00:00
Compare commits
2 Commits
v1.12.6
...
tmp-increm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79111230ee | ||
|
|
d6957a8e5d |
34
Cargo.lock
generated
34
Cargo.lock
generated
@@ -496,7 +496,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "benchmarks"
|
name = "benchmarks"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bumpalo",
|
"bumpalo",
|
||||||
@@ -689,7 +689,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "build-info"
|
name = "build-info"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"time",
|
"time",
|
||||||
@@ -1664,7 +1664,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "dump"
|
name = "dump"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"big_s",
|
"big_s",
|
||||||
@@ -1876,7 +1876,7 @@ checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "file-store"
|
name = "file-store"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
@@ -1898,7 +1898,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "filter-parser"
|
name = "filter-parser"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"insta",
|
"insta",
|
||||||
"nom",
|
"nom",
|
||||||
@@ -1918,7 +1918,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flatten-serde-json"
|
name = "flatten-serde-json"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"criterion",
|
"criterion",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@@ -2057,7 +2057,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fuzzers"
|
name = "fuzzers"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arbitrary",
|
"arbitrary",
|
||||||
"bumpalo",
|
"bumpalo",
|
||||||
@@ -2624,7 +2624,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "index-scheduler"
|
name = "index-scheduler"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
@@ -2822,7 +2822,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "json-depth-checker"
|
name = "json-depth-checker"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"criterion",
|
"criterion",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@@ -3441,7 +3441,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meili-snap"
|
name = "meili-snap"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"insta",
|
"insta",
|
||||||
"md5",
|
"md5",
|
||||||
@@ -3450,7 +3450,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meilisearch"
|
name = "meilisearch"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-cors",
|
"actix-cors",
|
||||||
"actix-http",
|
"actix-http",
|
||||||
@@ -3540,7 +3540,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meilisearch-auth"
|
name = "meilisearch-auth"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.22.1",
|
"base64 0.22.1",
|
||||||
"enum-iterator",
|
"enum-iterator",
|
||||||
@@ -3559,7 +3559,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meilisearch-types"
|
name = "meilisearch-types"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-web",
|
"actix-web",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
@@ -3592,7 +3592,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "meilitool"
|
name = "meilitool"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
|
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
|
||||||
@@ -3627,7 +3627,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "milli"
|
name = "milli"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"allocator-api2",
|
"allocator-api2",
|
||||||
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
@@ -4083,7 +4083,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "permissive-json-pointer"
|
name = "permissive-json-pointer"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"big_s",
|
"big_s",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@@ -6486,7 +6486,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "xtask"
|
name = "xtask"
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"build-info",
|
"build-info",
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ members = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "1.12.6"
|
version = "1.12.1"
|
||||||
authors = [
|
authors = [
|
||||||
"Quentin de Quelen <quentin@dequelen.me>",
|
"Quentin de Quelen <quentin@dequelen.me>",
|
||||||
"Clément Renault <clement@meilisearch.com>",
|
"Clément Renault <clement@meilisearch.com>",
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ use bumpalo::Bump;
|
|||||||
use dump::IndexMetadata;
|
use dump::IndexMetadata;
|
||||||
use meilisearch_types::batches::BatchId;
|
use meilisearch_types::batches::BatchId;
|
||||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::documents::PrimaryKey;
|
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
|
||||||
use meilisearch_types::milli::heed::CompactionOption;
|
use meilisearch_types::milli::heed::CompactionOption;
|
||||||
use meilisearch_types::milli::progress::Progress;
|
use meilisearch_types::milli::progress::Progress;
|
||||||
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
|
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
|
||||||
@@ -819,13 +819,6 @@ impl IndexScheduler {
|
|||||||
t.started_at = Some(started_at);
|
t.started_at = Some(started_at);
|
||||||
t.finished_at = Some(finished_at);
|
t.finished_at = Some(finished_at);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Patch the task to remove the batch uid, because as of v1.12.5 batches are not persisted.
|
|
||||||
// This prevent from referencing *future* batches not actually associated with the task.
|
|
||||||
//
|
|
||||||
// See <https://github.com/meilisearch/meilisearch/issues/5247> for details.
|
|
||||||
t.batch_uid = None;
|
|
||||||
|
|
||||||
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
|
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
|
||||||
|
|
||||||
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
|
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
|
||||||
@@ -836,18 +829,21 @@ impl IndexScheduler {
|
|||||||
if status == Status::Enqueued {
|
if status == Status::Enqueued {
|
||||||
let content_file = self.file_store.get_update(content_file)?;
|
let content_file = self.file_store.get_update(content_file)?;
|
||||||
|
|
||||||
for document in
|
let reader = DocumentsBatchReader::from_reader(content_file)
|
||||||
serde_json::de::Deserializer::from_reader(content_file).into_iter()
|
.map_err(|e| Error::from_milli(e.into(), None))?;
|
||||||
{
|
|
||||||
let document = document.map_err(|e| {
|
|
||||||
Error::from_milli(
|
|
||||||
milli::InternalError::SerdeJson(e).into(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
dump_content_file.push_document(&document)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
let (mut cursor, documents_batch_index) =
|
||||||
|
reader.into_cursor_and_fields_index();
|
||||||
|
|
||||||
|
while let Some(doc) = cursor
|
||||||
|
.next_document()
|
||||||
|
.map_err(|e| Error::from_milli(e.into(), None))?
|
||||||
|
{
|
||||||
|
dump_content_file.push_document(
|
||||||
|
&obkv_to_object(doc, &documents_batch_index)
|
||||||
|
.map_err(|e| Error::from_milli(e, None))?,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
dump_content_file.flush()?;
|
dump_content_file.flush()?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFea
|
|||||||
use meilisearch_types::heed::byteorder::BE;
|
use meilisearch_types::heed::byteorder::BE;
|
||||||
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128};
|
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128};
|
||||||
use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn};
|
use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn};
|
||||||
|
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
||||||
use meilisearch_types::milli::index::IndexEmbeddingConfig;
|
use meilisearch_types::milli::index::IndexEmbeddingConfig;
|
||||||
use meilisearch_types::milli::update::IndexerConfig;
|
use meilisearch_types::milli::update::IndexerConfig;
|
||||||
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
|
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
|
||||||
@@ -2016,19 +2017,14 @@ impl<'a> Dump<'a> {
|
|||||||
task: TaskDump,
|
task: TaskDump,
|
||||||
content_file: Option<Box<UpdateFile>>,
|
content_file: Option<Box<UpdateFile>>,
|
||||||
) -> Result<Task> {
|
) -> Result<Task> {
|
||||||
let task_has_no_docs = matches!(task.kind, KindDump::DocumentImport { documents_count, .. } if documents_count == 0);
|
|
||||||
|
|
||||||
let content_uuid = match content_file {
|
let content_uuid = match content_file {
|
||||||
Some(content_file) if task.status == Status::Enqueued => {
|
Some(content_file) if task.status == Status::Enqueued => {
|
||||||
let (uuid, file) = self.index_scheduler.create_update_file(false)?;
|
let (uuid, mut file) = self.index_scheduler.create_update_file(false)?;
|
||||||
let mut writer = io::BufWriter::new(file);
|
let mut builder = DocumentsBatchBuilder::new(&mut file);
|
||||||
for doc in content_file {
|
for doc in content_file {
|
||||||
let doc = doc?;
|
builder.append_json_object(&doc?)?;
|
||||||
serde_json::to_writer(&mut writer, &doc).map_err(|e| {
|
|
||||||
Error::from_milli(milli::InternalError::SerdeJson(e).into(), None)
|
|
||||||
})?;
|
|
||||||
}
|
}
|
||||||
let file = writer.into_inner().map_err(|e| e.into_error())?;
|
builder.into_inner()?;
|
||||||
file.persist()?;
|
file.persist()?;
|
||||||
|
|
||||||
Some(uuid)
|
Some(uuid)
|
||||||
@@ -2036,12 +2032,6 @@ impl<'a> Dump<'a> {
|
|||||||
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
|
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
|
||||||
// in case we try to open it later.
|
// in case we try to open it later.
|
||||||
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
|
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
|
||||||
None if task.status == Status::Enqueued && task_has_no_docs => {
|
|
||||||
let (uuid, file) = self.index_scheduler.create_update_file(false)?;
|
|
||||||
file.persist()?;
|
|
||||||
|
|
||||||
Some(uuid)
|
|
||||||
}
|
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -1746,57 +1746,3 @@ async fn change_attributes_settings() {
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Modifying facets with different casing should work correctly
|
|
||||||
#[actix_rt::test]
|
|
||||||
async fn change_facet_casing() {
|
|
||||||
let server = Server::new().await;
|
|
||||||
let index = server.index("test");
|
|
||||||
|
|
||||||
let (response, code) = index
|
|
||||||
.update_settings(json!({
|
|
||||||
"filterableAttributes": ["dog"],
|
|
||||||
}))
|
|
||||||
.await;
|
|
||||||
assert_eq!("202", code.as_str(), "{:?}", response);
|
|
||||||
index.wait_task(response.uid()).await;
|
|
||||||
|
|
||||||
let (response, _code) = index
|
|
||||||
.add_documents(
|
|
||||||
json!([
|
|
||||||
{
|
|
||||||
"id": 1,
|
|
||||||
"dog": "Bouvier Bernois"
|
|
||||||
}
|
|
||||||
]),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
index.wait_task(response.uid()).await;
|
|
||||||
|
|
||||||
let (response, _code) = index
|
|
||||||
.add_documents(
|
|
||||||
json!([
|
|
||||||
{
|
|
||||||
"id": 1,
|
|
||||||
"dog": "bouvier bernois"
|
|
||||||
}
|
|
||||||
]),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
index.wait_task(response.uid()).await;
|
|
||||||
|
|
||||||
index
|
|
||||||
.search(json!({ "facets": ["dog"] }), |response, code| {
|
|
||||||
meili_snap::snapshot!(code, @"200 OK");
|
|
||||||
meili_snap::snapshot!(meili_snap::json_string!(response["facetDistribution"]), @r###"
|
|
||||||
{
|
|
||||||
"dog": {
|
|
||||||
"bouvier bernois": 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
"###);
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -14,11 +14,11 @@ arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/ar
|
|||||||
clap = { version = "4.5.9", features = ["derive"] }
|
clap = { version = "4.5.9", features = ["derive"] }
|
||||||
dump = { path = "../dump" }
|
dump = { path = "../dump" }
|
||||||
file-store = { path = "../file-store" }
|
file-store = { path = "../file-store" }
|
||||||
indexmap = { version = "2.7.0", features = ["serde"] }
|
indexmap = {version = "2.7.0", features = ["serde"]}
|
||||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||||
meilisearch-types = { path = "../meilisearch-types" }
|
meilisearch-types = { path = "../meilisearch-types" }
|
||||||
serde = { version = "1.0.209", features = ["derive"] }
|
serde = { version = "1.0.209", features = ["derive"] }
|
||||||
serde_json = { version = "1.0.133", features = ["preserve_order"] }
|
serde_json = {version = "1.0.133", features = ["preserve_order"]}
|
||||||
tempfile = "3.14.0"
|
tempfile = "3.14.0"
|
||||||
time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] }
|
time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] }
|
||||||
uuid = { version = "1.10.0", features = ["v4"], default-features = false }
|
uuid = { version = "1.10.0", features = ["v4"], default-features = false }
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
match command {
|
match command {
|
||||||
Command::ClearTaskQueue => clear_task_queue(db_path),
|
Command::ClearTaskQueue => clear_task_queue(db_path),
|
||||||
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
|
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
|
||||||
export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
|
export_a_dump(db_path, dump_dir, skip_enqueued_tasks)
|
||||||
}
|
}
|
||||||
Command::OfflineUpgrade { target_version } => {
|
Command::OfflineUpgrade { target_version } => {
|
||||||
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
|
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
|
||||||
@@ -187,7 +187,6 @@ fn export_a_dump(
|
|||||||
db_path: PathBuf,
|
db_path: PathBuf,
|
||||||
dump_dir: PathBuf,
|
dump_dir: PathBuf,
|
||||||
skip_enqueued_tasks: bool,
|
skip_enqueued_tasks: bool,
|
||||||
detected_version: (String, String, String),
|
|
||||||
) -> Result<(), anyhow::Error> {
|
) -> Result<(), anyhow::Error> {
|
||||||
let started_at = OffsetDateTime::now_utc();
|
let started_at = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
@@ -239,6 +238,9 @@ fn export_a_dump(
|
|||||||
if skip_enqueued_tasks {
|
if skip_enqueued_tasks {
|
||||||
eprintln!("Skip dumping the enqueued tasks...");
|
eprintln!("Skip dumping the enqueued tasks...");
|
||||||
} else {
|
} else {
|
||||||
|
eprintln!("Dumping the enqueued tasks...");
|
||||||
|
|
||||||
|
// 3. dump the tasks
|
||||||
let mut dump_tasks = dump.create_tasks_queue()?;
|
let mut dump_tasks = dump.create_tasks_queue()?;
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
for ret in all_tasks.iter(&rtxn)? {
|
for ret in all_tasks.iter(&rtxn)? {
|
||||||
@@ -252,39 +254,18 @@ fn export_a_dump(
|
|||||||
if status == Status::Enqueued {
|
if status == Status::Enqueued {
|
||||||
let content_file = file_store.get_update(content_file_uuid)?;
|
let content_file = file_store.get_update(content_file_uuid)?;
|
||||||
|
|
||||||
if (
|
let reader =
|
||||||
detected_version.0.as_str(),
|
DocumentsBatchReader::from_reader(content_file).with_context(|| {
|
||||||
detected_version.1.as_str(),
|
format!("While reading content file {:?}", content_file_uuid)
|
||||||
detected_version.2.as_str(),
|
})?;
|
||||||
) < ("1", "12", "0")
|
|
||||||
{
|
|
||||||
eprintln!("Dumping the enqueued tasks reading them in obkv format...");
|
|
||||||
let reader =
|
|
||||||
DocumentsBatchReader::from_reader(content_file).with_context(|| {
|
|
||||||
format!("While reading content file {:?}", content_file_uuid)
|
|
||||||
})?;
|
|
||||||
let (mut cursor, documents_batch_index) =
|
|
||||||
reader.into_cursor_and_fields_index();
|
|
||||||
while let Some(doc) = cursor.next_document().with_context(|| {
|
|
||||||
format!("While iterating on content file {:?}", content_file_uuid)
|
|
||||||
})? {
|
|
||||||
dump_content_file
|
|
||||||
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
eprintln!(
|
|
||||||
"Dumping the enqueued tasks reading them in JSON stream format..."
|
|
||||||
);
|
|
||||||
for document in
|
|
||||||
serde_json::de::Deserializer::from_reader(content_file).into_iter()
|
|
||||||
{
|
|
||||||
let document = document.with_context(|| {
|
|
||||||
format!("While reading content file {:?}", content_file_uuid)
|
|
||||||
})?;
|
|
||||||
dump_content_file.push_document(&document)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
|
||||||
|
while let Some(doc) = cursor.next_document().with_context(|| {
|
||||||
|
format!("While iterating on content file {:?}", content_file_uuid)
|
||||||
|
})? {
|
||||||
|
dump_content_file
|
||||||
|
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
|
||||||
|
}
|
||||||
dump_content_file.flush()?;
|
dump_content_file.flush()?;
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use std::path::{Path, PathBuf};
|
|||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use meilisearch_types::versioning::create_version_file;
|
use meilisearch_types::versioning::create_version_file;
|
||||||
use v1_10::v1_9_to_v1_10;
|
use v1_10::v1_9_to_v1_10;
|
||||||
use v1_12::{v1_11_to_v1_12, v1_12_to_v1_12_3};
|
use v1_12::v1_11_to_v1_12;
|
||||||
|
|
||||||
use crate::upgrade::v1_11::v1_10_to_v1_11;
|
use crate::upgrade::v1_11::v1_10_to_v1_11;
|
||||||
|
|
||||||
@@ -20,48 +20,12 @@ pub struct OfflineUpgrade {
|
|||||||
|
|
||||||
impl OfflineUpgrade {
|
impl OfflineUpgrade {
|
||||||
pub fn upgrade(self) -> anyhow::Result<()> {
|
pub fn upgrade(self) -> anyhow::Result<()> {
|
||||||
// Adding a version?
|
|
||||||
//
|
|
||||||
// 1. Update the LAST_SUPPORTED_UPGRADE_FROM_VERSION and LAST_SUPPORTED_UPGRADE_TO_VERSION.
|
|
||||||
// 2. Add new version to the upgrade list if necessary
|
|
||||||
// 3. Use `no_upgrade` as index for versions that are compatible.
|
|
||||||
|
|
||||||
if self.current_version == self.target_version {
|
|
||||||
println!("Database is already at the target version. Exiting.");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.current_version > self.target_version {
|
|
||||||
bail!(
|
|
||||||
"Cannot downgrade from {}.{}.{} to {}.{}.{}. Downgrade not supported",
|
|
||||||
self.current_version.0,
|
|
||||||
self.current_version.1,
|
|
||||||
self.current_version.2,
|
|
||||||
self.target_version.0,
|
|
||||||
self.target_version.1,
|
|
||||||
self.target_version.2
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
|
|
||||||
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5";
|
|
||||||
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
|
|
||||||
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5";
|
|
||||||
|
|
||||||
let upgrade_list = [
|
let upgrade_list = [
|
||||||
(
|
(v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"),
|
||||||
v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>,
|
|
||||||
"1",
|
|
||||||
"10",
|
|
||||||
"0",
|
|
||||||
),
|
|
||||||
(v1_10_to_v1_11, "1", "11", "0"),
|
(v1_10_to_v1_11, "1", "11", "0"),
|
||||||
(v1_11_to_v1_12, "1", "12", "0"),
|
(v1_11_to_v1_12, "1", "12", "0"),
|
||||||
(v1_12_to_v1_12_3, "1", "12", "3"),
|
|
||||||
];
|
];
|
||||||
|
|
||||||
let no_upgrade: usize = upgrade_list.len();
|
|
||||||
|
|
||||||
let (current_major, current_minor, current_patch) = &self.current_version;
|
let (current_major, current_minor, current_patch) = &self.current_version;
|
||||||
|
|
||||||
let start_at = match (
|
let start_at = match (
|
||||||
@@ -72,12 +36,8 @@ impl OfflineUpgrade {
|
|||||||
("1", "9", _) => 0,
|
("1", "9", _) => 0,
|
||||||
("1", "10", _) => 1,
|
("1", "10", _) => 1,
|
||||||
("1", "11", _) => 2,
|
("1", "11", _) => 2,
|
||||||
("1", "12", "0" | "1" | "2") => 3,
|
|
||||||
("1", "12", "3" | "4" | "5") => no_upgrade,
|
|
||||||
_ => {
|
_ => {
|
||||||
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
|
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10")
|
||||||
FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
|
|
||||||
LAST_SUPPORTED_UPGRADE_FROM_VERSION);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -86,32 +46,21 @@ impl OfflineUpgrade {
|
|||||||
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
|
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
|
||||||
("1", "10", _) => 0,
|
("1", "10", _) => 0,
|
||||||
("1", "11", _) => 1,
|
("1", "11", _) => 1,
|
||||||
("1", "12", "0" | "1" | "2") => 2,
|
("1", "12", _) => 2,
|
||||||
("1", "12", "3" | "4" | "5") => 3,
|
|
||||||
(major, _, _) if major.starts_with('v') => {
|
(major, _, _) if major.starts_with('v') => {
|
||||||
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
|
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]",
|
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10 and v1.11")
|
||||||
FIRST_SUPPORTED_UPGRADE_TO_VERSION,
|
|
||||||
LAST_SUPPORTED_UPGRADE_TO_VERSION);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}");
|
println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}");
|
||||||
|
|
||||||
if start_at == no_upgrade {
|
|
||||||
println!("No upgrade operation to perform, writing VERSION file");
|
|
||||||
create_version_file(&self.db_path, target_major, target_minor, target_patch)
|
|
||||||
.context("while writing VERSION file after the upgrade")?;
|
|
||||||
println!("Success");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::needless_range_loop)]
|
#[allow(clippy::needless_range_loop)]
|
||||||
for index in start_at..=ends_at {
|
for index in start_at..=ends_at {
|
||||||
let (func, major, minor, patch) = upgrade_list[index];
|
let (func, major, minor, patch) = upgrade_list[index];
|
||||||
(func)(&self.db_path, current_major, current_minor, current_patch)?;
|
(func)(&self.db_path)?;
|
||||||
println!("Done");
|
println!("Done");
|
||||||
// We're writing the version file just in case an issue arise _while_ upgrading.
|
// We're writing the version file just in case an issue arise _while_ upgrading.
|
||||||
// We don't want the DB to fail in an unknown state.
|
// We don't want the DB to fail in an unknown state.
|
||||||
|
|||||||
@@ -151,12 +151,7 @@ fn date_round_trip(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn v1_9_to_v1_10(
|
pub fn v1_9_to_v1_10(db_path: &Path) -> anyhow::Result<()> {
|
||||||
db_path: &Path,
|
|
||||||
_origin_major: &str,
|
|
||||||
_origin_minor: &str,
|
|
||||||
_origin_patch: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
println!("Upgrading from v1.9.0 to v1.10.0");
|
println!("Upgrading from v1.9.0 to v1.10.0");
|
||||||
// 2 changes here
|
// 2 changes here
|
||||||
|
|
||||||
|
|||||||
@@ -14,12 +14,7 @@ use meilisearch_types::milli::index::db_name;
|
|||||||
use crate::uuid_codec::UuidCodec;
|
use crate::uuid_codec::UuidCodec;
|
||||||
use crate::{try_opening_database, try_opening_poly_database};
|
use crate::{try_opening_database, try_opening_poly_database};
|
||||||
|
|
||||||
pub fn v1_10_to_v1_11(
|
pub fn v1_10_to_v1_11(db_path: &Path) -> anyhow::Result<()> {
|
||||||
db_path: &Path,
|
|
||||||
_origin_major: &str,
|
|
||||||
_origin_minor: &str,
|
|
||||||
_origin_patch: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
println!("Upgrading from v1.10.0 to v1.11.0");
|
println!("Upgrading from v1.10.0 to v1.11.0");
|
||||||
|
|
||||||
let index_scheduler_path = db_path.join("tasks");
|
let index_scheduler_path = db_path.join("tasks");
|
||||||
|
|||||||
@@ -1,34 +1,17 @@
|
|||||||
//! The breaking changes that happened between the v1.11 and the v1.12 are:
|
//! The breaking changes that happened between the v1.11 and the v1.12 are:
|
||||||
//! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900
|
//! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900
|
||||||
|
|
||||||
use std::borrow::Cow;
|
|
||||||
use std::io::BufWriter;
|
use std::io::BufWriter;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::atomic::AtomicBool;
|
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use file_store::FileStore;
|
use file_store::FileStore;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use meilisearch_types::milli::documents::DocumentsBatchReader;
|
use meilisearch_types::milli::documents::DocumentsBatchReader;
|
||||||
use meilisearch_types::milli::heed::types::{SerdeJson, Str};
|
|
||||||
use meilisearch_types::milli::heed::{Database, EnvOpenOptions, RoTxn, RwTxn};
|
|
||||||
use meilisearch_types::milli::progress::Step;
|
|
||||||
use meilisearch_types::milli::{FieldDistribution, Index};
|
|
||||||
use serde::Serialize;
|
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
use time::OffsetDateTime;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::try_opening_database;
|
pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> {
|
||||||
use crate::uuid_codec::UuidCodec;
|
|
||||||
|
|
||||||
pub fn v1_11_to_v1_12(
|
|
||||||
db_path: &Path,
|
|
||||||
_origin_major: &str,
|
|
||||||
_origin_minor: &str,
|
|
||||||
_origin_patch: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
println!("Upgrading from v1.11.0 to v1.12.0");
|
println!("Upgrading from v1.11.0 to v1.12.0");
|
||||||
|
|
||||||
convert_update_files(db_path)?;
|
convert_update_files(db_path)?;
|
||||||
@@ -36,23 +19,6 @@ pub fn v1_11_to_v1_12(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn v1_12_to_v1_12_3(
|
|
||||||
db_path: &Path,
|
|
||||||
origin_major: &str,
|
|
||||||
origin_minor: &str,
|
|
||||||
origin_patch: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
println!("Upgrading from v1.12.{{0, 1, 2}} to v1.12.3");
|
|
||||||
|
|
||||||
if origin_minor == "12" {
|
|
||||||
rebuild_field_distribution(db_path)?;
|
|
||||||
} else {
|
|
||||||
println!("Not rebuilding field distribution as it wasn't corrupted coming from v{origin_major}.{origin_minor}.{origin_patch}");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Convert the update files from OBKV to ndjson format.
|
/// Convert the update files from OBKV to ndjson format.
|
||||||
///
|
///
|
||||||
/// 1) List all the update files using the file store.
|
/// 1) List all the update files using the file store.
|
||||||
@@ -111,188 +77,3 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rebuild field distribution as it was wrongly computed in v1.12.x if x < 3
|
|
||||||
fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
|
||||||
let index_scheduler_path = db_path.join("tasks");
|
|
||||||
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
|
|
||||||
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
|
||||||
|
|
||||||
let mut sched_wtxn = env.write_txn()?;
|
|
||||||
|
|
||||||
let index_mapping: Database<Str, UuidCodec> =
|
|
||||||
try_opening_database(&env, &sched_wtxn, "index-mapping")?;
|
|
||||||
let stats_db: Database<UuidCodec, SerdeJson<IndexStats>> =
|
|
||||||
try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| {
|
|
||||||
format!("While trying to open {:?}", index_scheduler_path.display())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let index_count =
|
|
||||||
index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?;
|
|
||||||
|
|
||||||
// FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn
|
|
||||||
// 1. immutably for the iteration
|
|
||||||
// 2. mutably for updating index stats
|
|
||||||
let indexes: Vec<_> = index_mapping
|
|
||||||
.iter(&sched_wtxn)?
|
|
||||||
.map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let progress = meilisearch_types::milli::progress::Progress::default();
|
|
||||||
let finished = AtomicBool::new(false);
|
|
||||||
|
|
||||||
std::thread::scope(|scope| {
|
|
||||||
let display_progress = std::thread::Builder::new()
|
|
||||||
.name("display_progress".into())
|
|
||||||
.spawn_scoped(scope, || {
|
|
||||||
while !finished.load(std::sync::atomic::Ordering::Relaxed) {
|
|
||||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
|
||||||
let view = progress.as_progress_view();
|
|
||||||
let Ok(view) = serde_json::to_string(&view) else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
println!("{view}");
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
for (index_index, result) in indexes.into_iter().enumerate() {
|
|
||||||
let (uid, uuid) = result?;
|
|
||||||
progress.update_progress(VariableNameStep::new(
|
|
||||||
&uid,
|
|
||||||
index_index as u32,
|
|
||||||
index_count as u32,
|
|
||||||
));
|
|
||||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
|
||||||
|
|
||||||
println!(
|
|
||||||
"[{}/{index_count}]Updating index `{uid}` at `{}`",
|
|
||||||
index_index + 1,
|
|
||||||
index_path.display()
|
|
||||||
);
|
|
||||||
|
|
||||||
println!("\t- Rebuilding field distribution");
|
|
||||||
|
|
||||||
let index = meilisearch_types::milli::Index::new(EnvOpenOptions::new(), &index_path)
|
|
||||||
.with_context(|| {
|
|
||||||
format!("while opening index {uid} at '{}'", index_path.display())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let mut index_txn = index.write_txn()?;
|
|
||||||
|
|
||||||
meilisearch_types::milli::update::new::reindex::field_distribution(
|
|
||||||
&index,
|
|
||||||
&mut index_txn,
|
|
||||||
&progress,
|
|
||||||
)
|
|
||||||
.context("while rebuilding field distribution")?;
|
|
||||||
|
|
||||||
let stats = IndexStats::new(&index, &index_txn)
|
|
||||||
.with_context(|| format!("computing stats for index `{uid}`"))?;
|
|
||||||
store_stats_of(stats_db, uuid, &mut sched_wtxn, &uid, &stats)?;
|
|
||||||
|
|
||||||
index_txn.commit().context("while committing the write txn for the updated index")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?;
|
|
||||||
|
|
||||||
finished.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
|
|
||||||
if let Err(panic) = display_progress.join() {
|
|
||||||
let msg = match panic.downcast_ref::<&'static str>() {
|
|
||||||
Some(s) => *s,
|
|
||||||
None => match panic.downcast_ref::<String>() {
|
|
||||||
Some(s) => &s[..],
|
|
||||||
None => "Box<dyn Any>",
|
|
||||||
},
|
|
||||||
};
|
|
||||||
eprintln!("WARN: the display thread panicked with {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Upgrading database succeeded");
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct VariableNameStep {
|
|
||||||
name: String,
|
|
||||||
current: u32,
|
|
||||||
total: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl VariableNameStep {
|
|
||||||
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
|
|
||||||
Self { name: name.into(), current, total }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Step for VariableNameStep {
|
|
||||||
fn name(&self) -> Cow<'static, str> {
|
|
||||||
self.name.clone().into()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn current(&self) -> u32 {
|
|
||||||
self.current
|
|
||||||
}
|
|
||||||
|
|
||||||
fn total(&self) -> u32 {
|
|
||||||
self.total
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn store_stats_of(
|
|
||||||
stats_db: Database<UuidCodec, SerdeJson<IndexStats>>,
|
|
||||||
index_uuid: Uuid,
|
|
||||||
sched_wtxn: &mut RwTxn,
|
|
||||||
index_uid: &str,
|
|
||||||
stats: &IndexStats,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
stats_db
|
|
||||||
.put(sched_wtxn, &index_uuid, stats)
|
|
||||||
.with_context(|| format!("storing stats for index `{index_uid}`"))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The statistics that can be computed from an `Index` object.
|
|
||||||
#[derive(Serialize, Debug)]
|
|
||||||
pub struct IndexStats {
|
|
||||||
/// Number of documents in the index.
|
|
||||||
pub number_of_documents: u64,
|
|
||||||
/// Size taken up by the index' DB, in bytes.
|
|
||||||
///
|
|
||||||
/// This includes the size taken by both the used and free pages of the DB, and as the free pages
|
|
||||||
/// are not returned to the disk after a deletion, this number is typically larger than
|
|
||||||
/// `used_database_size` that only includes the size of the used pages.
|
|
||||||
pub database_size: u64,
|
|
||||||
/// Size taken by the used pages of the index' DB, in bytes.
|
|
||||||
///
|
|
||||||
/// As the DB backend does not return to the disk the pages that are not currently used by the DB,
|
|
||||||
/// this value is typically smaller than `database_size`.
|
|
||||||
pub used_database_size: u64,
|
|
||||||
/// Association of every field name with the number of times it occurs in the documents.
|
|
||||||
pub field_distribution: FieldDistribution,
|
|
||||||
/// Creation date of the index.
|
|
||||||
#[serde(with = "time::serde::rfc3339")]
|
|
||||||
pub created_at: OffsetDateTime,
|
|
||||||
/// Date of the last update of the index.
|
|
||||||
#[serde(with = "time::serde::rfc3339")]
|
|
||||||
pub updated_at: OffsetDateTime,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexStats {
|
|
||||||
/// Compute the stats of an index
|
|
||||||
///
|
|
||||||
/// # Parameters
|
|
||||||
///
|
|
||||||
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
|
|
||||||
pub fn new(index: &Index, rtxn: &RoTxn) -> meilisearch_types::milli::Result<Self> {
|
|
||||||
Ok(IndexStats {
|
|
||||||
number_of_documents: index.number_of_documents(rtxn)?,
|
|
||||||
database_size: index.on_disk_size()?,
|
|
||||||
used_database_size: index.used_size()?,
|
|
||||||
field_distribution: index.field_distribution(rtxn)?,
|
|
||||||
created_at: index.created_at(rtxn)?,
|
|
||||||
updated_at: index.updated_at(rtxn)?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -219,19 +219,12 @@ impl<'a> FacetDistribution<'a> {
|
|||||||
let facet_key = StrRefCodec::bytes_decode(facet_key).unwrap();
|
let facet_key = StrRefCodec::bytes_decode(facet_key).unwrap();
|
||||||
|
|
||||||
let key: (FieldId, _, &str) = (field_id, any_docid, facet_key);
|
let key: (FieldId, _, &str) = (field_id, any_docid, facet_key);
|
||||||
let optional_original_string =
|
let original_string = self
|
||||||
self.index.field_id_docid_facet_strings.get(self.rtxn, &key)?;
|
.index
|
||||||
|
.field_id_docid_facet_strings
|
||||||
let original_string = match optional_original_string {
|
.get(self.rtxn, &key)?
|
||||||
Some(original_string) => original_string.to_owned(),
|
.unwrap()
|
||||||
None => {
|
.to_owned();
|
||||||
tracing::error!(
|
|
||||||
"Missing original facet string. Using the normalized facet {} instead",
|
|
||||||
facet_key
|
|
||||||
);
|
|
||||||
facet_key.to_string()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
distribution.insert(original_string, nbr_docids);
|
distribution.insert(original_string, nbr_docids);
|
||||||
if distribution.len() == self.max_values_per_facet {
|
if distribution.len() == self.max_values_per_facet {
|
||||||
|
|||||||
@@ -3334,44 +3334,6 @@ mod tests {
|
|||||||
rtxn.commit().unwrap();
|
rtxn.commit().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn incremental_update_without_changing_facet_distribution() {
|
|
||||||
let index = TempIndex::new();
|
|
||||||
index
|
|
||||||
.add_documents(documents!([
|
|
||||||
{"id": 0, "some_field": "aaa", "other_field": "aaa" },
|
|
||||||
{"id": 1, "some_field": "bbb", "other_field": "bbb" },
|
|
||||||
]))
|
|
||||||
.unwrap();
|
|
||||||
{
|
|
||||||
let rtxn = index.read_txn().unwrap();
|
|
||||||
// count field distribution
|
|
||||||
let results = index.field_distribution(&rtxn).unwrap();
|
|
||||||
assert_eq!(Some(&2), results.get("id"));
|
|
||||||
assert_eq!(Some(&2), results.get("some_field"));
|
|
||||||
assert_eq!(Some(&2), results.get("other_field"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut index = index;
|
|
||||||
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
|
||||||
|
|
||||||
index
|
|
||||||
.add_documents(documents!([
|
|
||||||
{"id": 0, "other_field": "bbb" },
|
|
||||||
{"id": 1, "some_field": "ccc" },
|
|
||||||
]))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
{
|
|
||||||
let rtxn = index.read_txn().unwrap();
|
|
||||||
// count field distribution
|
|
||||||
let results = index.field_distribution(&rtxn).unwrap();
|
|
||||||
assert_eq!(Some(&2), results.get("id"));
|
|
||||||
assert_eq!(Some(&2), results.get("some_field"));
|
|
||||||
assert_eq!(Some(&2), results.get("other_field"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn delete_words_exact_attributes() {
|
fn delete_words_exact_attributes() {
|
||||||
let index = TempIndex::new();
|
let index = TempIndex::new();
|
||||||
|
|||||||
@@ -89,8 +89,7 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
|||||||
.or_default();
|
.or_default();
|
||||||
*entry -= 1;
|
*entry -= 1;
|
||||||
}
|
}
|
||||||
let content =
|
let content = update.updated();
|
||||||
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
|
||||||
let geo_iter =
|
let geo_iter =
|
||||||
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||||
for res in content.iter_top_level_fields().chain(geo_iter) {
|
for res in content.iter_top_level_fields().chain(geo_iter) {
|
||||||
|
|||||||
@@ -283,60 +283,42 @@ impl FacetedDocidsExtractor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct DelAddFacetValue<'doc> {
|
struct DelAddFacetValue<'doc> {
|
||||||
strings: HashMap<
|
strings: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
|
||||||
(FieldId, &'doc str),
|
|
||||||
Option<BVec<'doc, u8>>,
|
|
||||||
hashbrown::DefaultHashBuilder,
|
|
||||||
&'doc Bump,
|
|
||||||
>,
|
|
||||||
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
|
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
|
||||||
doc_alloc: &'doc Bump,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'doc> DelAddFacetValue<'doc> {
|
impl<'doc> DelAddFacetValue<'doc> {
|
||||||
fn new(doc_alloc: &'doc Bump) -> Self {
|
fn new(doc_alloc: &'doc Bump) -> Self {
|
||||||
Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc), doc_alloc }
|
Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc) }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert_add(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
|
fn insert_add(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
|
||||||
match kind {
|
let cache = match kind {
|
||||||
FacetKind::Number => {
|
FacetKind::String => &mut self.strings,
|
||||||
let key = (fid, value);
|
FacetKind::Number => &mut self.f64s,
|
||||||
if let Some(DelAdd::Deletion) = self.f64s.get(&key) {
|
_ => return,
|
||||||
self.f64s.remove(&key);
|
};
|
||||||
} else {
|
|
||||||
self.f64s.insert(key, DelAdd::Addition);
|
let key = (fid, value);
|
||||||
}
|
if let Some(DelAdd::Deletion) = cache.get(&key) {
|
||||||
}
|
cache.remove(&key);
|
||||||
FacetKind::String => {
|
} else {
|
||||||
if let Ok(s) = std::str::from_utf8(&value) {
|
cache.insert(key, DelAdd::Addition);
|
||||||
let normalized = crate::normalize_facet(s);
|
|
||||||
let truncated = self.doc_alloc.alloc_str(truncate_str(&normalized));
|
|
||||||
self.strings.insert((fid, truncated), Some(value));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert_del(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
|
fn insert_del(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
|
||||||
match kind {
|
let cache = match kind {
|
||||||
FacetKind::Number => {
|
FacetKind::String => &mut self.strings,
|
||||||
let key = (fid, value);
|
FacetKind::Number => &mut self.f64s,
|
||||||
if let Some(DelAdd::Addition) = self.f64s.get(&key) {
|
_ => return,
|
||||||
self.f64s.remove(&key);
|
};
|
||||||
} else {
|
|
||||||
self.f64s.insert(key, DelAdd::Deletion);
|
let key = (fid, value);
|
||||||
}
|
if let Some(DelAdd::Addition) = cache.get(&key) {
|
||||||
}
|
cache.remove(&key);
|
||||||
FacetKind::String => {
|
} else {
|
||||||
if let Ok(s) = std::str::from_utf8(&value) {
|
cache.insert(key, DelAdd::Deletion);
|
||||||
let normalized = crate::normalize_facet(s);
|
|
||||||
let truncated = self.doc_alloc.alloc_str(truncate_str(&normalized));
|
|
||||||
self.strings.insert((fid, truncated), None);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -347,14 +329,18 @@ impl<'doc> DelAddFacetValue<'doc> {
|
|||||||
doc_alloc: &Bump,
|
doc_alloc: &Bump,
|
||||||
) -> crate::Result<()> {
|
) -> crate::Result<()> {
|
||||||
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
|
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
|
||||||
for ((fid, truncated), value) in self.strings {
|
for ((fid, value), deladd) in self.strings {
|
||||||
buffer.clear();
|
if let Ok(s) = std::str::from_utf8(&value) {
|
||||||
buffer.extend_from_slice(&fid.to_be_bytes());
|
buffer.clear();
|
||||||
buffer.extend_from_slice(&docid.to_be_bytes());
|
buffer.extend_from_slice(&fid.to_be_bytes());
|
||||||
buffer.extend_from_slice(truncated.as_bytes());
|
buffer.extend_from_slice(&docid.to_be_bytes());
|
||||||
match &value {
|
let normalized = crate::normalize_facet(s);
|
||||||
Some(value) => sender.write_facet_string(&buffer, value)?,
|
let truncated = truncate_str(&normalized);
|
||||||
None => sender.delete_facet_string(&buffer)?,
|
buffer.extend_from_slice(truncated.as_bytes());
|
||||||
|
match deladd {
|
||||||
|
DelAdd::Deletion => sender.delete_facet_string(&buffer)?,
|
||||||
|
DelAdd::Addition => sender.write_facet_string(&buffer, &value)?,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ use std::sync::{OnceLock, RwLock};
|
|||||||
use std::thread::{self, Builder};
|
use std::thread::{self, Builder};
|
||||||
|
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
use bstr::ByteSlice as _;
|
|
||||||
use bumparaw_collections::RawMap;
|
use bumparaw_collections::RawMap;
|
||||||
use document_changes::{extract, DocumentChanges, IndexingContext};
|
use document_changes::{extract, DocumentChanges, IndexingContext};
|
||||||
pub use document_deletion::DocumentDeletion;
|
pub use document_deletion::DocumentDeletion;
|
||||||
@@ -110,7 +109,7 @@ where
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let (extractor_sender, writer_receiver) = pool
|
let (extractor_sender, mut writer_receiver) = pool
|
||||||
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -425,7 +424,6 @@ where
|
|||||||
let mut arroy_writers = arroy_writers?;
|
let mut arroy_writers = arroy_writers?;
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut writer_receiver = writer_receiver;
|
|
||||||
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
|
|
||||||
@@ -538,6 +536,9 @@ where
|
|||||||
drop(fields_ids_map_store);
|
drop(fields_ids_map_store);
|
||||||
|
|
||||||
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
|
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
|
||||||
|
for (fid, name, metadata) in new_fields_ids_map.iter() {
|
||||||
|
tracing::debug!("{fid}:{name},{metadata:?}");
|
||||||
|
}
|
||||||
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
|
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
|
||||||
|
|
||||||
if let Some(new_primary_key) = new_primary_key {
|
if let Some(new_primary_key) = new_primary_key {
|
||||||
@@ -585,12 +586,7 @@ fn write_from_bbqueue(
|
|||||||
}
|
}
|
||||||
(key, None) => match database.delete(wtxn, key) {
|
(key, None) => match database.delete(wtxn, key) {
|
||||||
Ok(false) => {
|
Ok(false) => {
|
||||||
tracing::error!(
|
unreachable!("We tried to delete an unknown key: {key:?}")
|
||||||
database_name,
|
|
||||||
key_bytes = ?key,
|
|
||||||
formatted_key = ?key.as_bstr(),
|
|
||||||
"Attempt to delete an unknown key"
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
|||||||
@@ -269,7 +269,8 @@ impl FacetFieldIdsDelta {
|
|||||||
pub fn consume_facet_string_delta(
|
pub fn consume_facet_string_delta(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
|
) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
|
||||||
self.modified_facet_string_ids.drain()
|
None.into_iter()
|
||||||
|
// self.modified_facet_string_ids.drain()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn consume_facet_number_delta(
|
pub fn consume_facet_number_delta(
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ pub mod indexer;
|
|||||||
mod merger;
|
mod merger;
|
||||||
mod parallel_iterator_ext;
|
mod parallel_iterator_ext;
|
||||||
mod ref_cell_ext;
|
mod ref_cell_ext;
|
||||||
pub mod reindex;
|
|
||||||
pub(crate) mod steps;
|
pub(crate) mod steps;
|
||||||
pub(crate) mod thread_local;
|
pub(crate) mod thread_local;
|
||||||
pub mod vector_document;
|
pub mod vector_document;
|
||||||
|
|||||||
@@ -1,38 +0,0 @@
|
|||||||
use heed::RwTxn;
|
|
||||||
|
|
||||||
use super::document::{Document, DocumentFromDb};
|
|
||||||
use crate::progress::{self, AtomicSubStep, Progress};
|
|
||||||
use crate::{FieldDistribution, Index, Result};
|
|
||||||
|
|
||||||
pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progress) -> Result<()> {
|
|
||||||
let mut distribution = FieldDistribution::new();
|
|
||||||
|
|
||||||
let document_count = index.number_of_documents(wtxn)?;
|
|
||||||
let field_id_map = index.fields_ids_map(wtxn)?;
|
|
||||||
|
|
||||||
let (update_document_count, sub_step) =
|
|
||||||
AtomicSubStep::<progress::Document>::new(document_count as u32);
|
|
||||||
progress.update_progress(sub_step);
|
|
||||||
|
|
||||||
let docids = index.documents_ids(wtxn)?;
|
|
||||||
|
|
||||||
for docid in docids {
|
|
||||||
update_document_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
|
|
||||||
let Some(document) = DocumentFromDb::new(docid, wtxn, index, &field_id_map)? else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let geo_iter = document.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
|
||||||
for res in document.iter_top_level_fields().chain(geo_iter) {
|
|
||||||
let (field_name, _) = res?;
|
|
||||||
if let Some(count) = distribution.get_mut(field_name) {
|
|
||||||
*count += 1;
|
|
||||||
} else {
|
|
||||||
distribution.insert(field_name.to_owned(), 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
index.put_field_distribution(wtxn, &distribution)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user