mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-04 19:55:43 +00:00
Compare commits
30 Commits
update-ver
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc81559748 | ||
|
|
0a242f4c4a | ||
|
|
f7e0a7a388 | ||
|
|
ff16dc2643 | ||
|
|
c81b991bd4 | ||
|
|
0025421471 | ||
|
|
151450b5b6 | ||
|
|
11084d5fce | ||
|
|
45a7dd24c3 | ||
|
|
c1bacd53a7 | ||
|
|
d161a0d7b4 | ||
|
|
2a01504ba0 | ||
|
|
dee31c279f | ||
|
|
3c143139bd | ||
|
|
811be520c0 | ||
|
|
ad77aaff20 | ||
|
|
0760a506eb | ||
|
|
5780197ab5 | ||
|
|
59b9266ae0 | ||
|
|
69aa3a9976 | ||
|
|
636c072bf4 | ||
|
|
f7fbdbcc88 | ||
|
|
367ec3e967 | ||
|
|
72a8e46495 | ||
|
|
f9d0d384eb | ||
|
|
63442516b7 | ||
|
|
61249887c0 | ||
|
|
032e02057c | ||
|
|
31d6075777 | ||
|
|
cd74ed300c |
923
Cargo.lock
generated
923
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -23,7 +23,7 @@ members = [
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "1.17.1"
|
||||
version = "1.18.0"
|
||||
authors = [
|
||||
"Quentin de Quelen <quentin@dequelen.me>",
|
||||
"Clément Renault <clement@meilisearch.com>",
|
||||
|
||||
@@ -143,10 +143,10 @@ impl IndexStats {
|
||||
///
|
||||
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
|
||||
pub fn new(index: &Index, rtxn: &RoTxn) -> milli::Result<Self> {
|
||||
let arroy_stats = index.arroy_stats(rtxn)?;
|
||||
let hannoy_stats = index.hannoy_stats(rtxn)?;
|
||||
Ok(IndexStats {
|
||||
number_of_embeddings: Some(arroy_stats.number_of_embeddings),
|
||||
number_of_embedded_documents: Some(arroy_stats.documents.len()),
|
||||
number_of_embeddings: Some(hannoy_stats.number_of_embeddings),
|
||||
number_of_embedded_documents: Some(hannoy_stats.documents.len()),
|
||||
documents_database_stats: index.documents_stats(rtxn)?.unwrap_or_default(),
|
||||
number_of_documents: None,
|
||||
database_size: index.on_disk_size()?,
|
||||
|
||||
@@ -146,7 +146,6 @@ impl IndexScheduler {
|
||||
};
|
||||
|
||||
let mut index_wtxn = index.write_txn()?;
|
||||
|
||||
let index_version = index.get_version(&index_wtxn)?.unwrap_or((1, 12, 0));
|
||||
let package_version = (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH);
|
||||
if index_version != package_version {
|
||||
|
||||
@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 17, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 18, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
|
||||
3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
|
||||
@@ -57,7 +57,7 @@ girafo: { number_of_documents: 0, field_distribution: {} }
|
||||
[timestamp] [4,]
|
||||
----------------------------------------------------------------------
|
||||
### All Batches:
|
||||
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.17.1"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
|
||||
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.18.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
|
||||
1 {uid: 1, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, stop reason: "created batch containing only task with id 1 of type `indexCreation` that cannot be batched with any other task.", }
|
||||
2 {uid: 2, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 2 of type `indexCreation` that cannot be batched with any other task.", }
|
||||
3 {uid: 3, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 3 of type `indexCreation` that cannot be batched with any other task.", }
|
||||
|
||||
@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 17, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 18, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [0,]
|
||||
|
||||
@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 17, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 18, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
|
||||
@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 17, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 18, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
@@ -37,7 +37,7 @@ catto [1,]
|
||||
[timestamp] [0,]
|
||||
----------------------------------------------------------------------
|
||||
### All Batches:
|
||||
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.17.1"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
|
||||
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.18.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
|
||||
----------------------------------------------------------------------
|
||||
### Batch to tasks mapping:
|
||||
0 [0,]
|
||||
|
||||
@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 17, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 18, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
|
||||
----------------------------------------------------------------------
|
||||
@@ -40,7 +40,7 @@ doggo [2,]
|
||||
[timestamp] [0,]
|
||||
----------------------------------------------------------------------
|
||||
### All Batches:
|
||||
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.17.1"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
|
||||
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.18.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
|
||||
----------------------------------------------------------------------
|
||||
### Batch to tasks mapping:
|
||||
0 [0,]
|
||||
|
||||
@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 17, 1) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 18, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
|
||||
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
|
||||
3 {uid: 3, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
|
||||
@@ -43,7 +43,7 @@ doggo [2,3,]
|
||||
[timestamp] [0,]
|
||||
----------------------------------------------------------------------
|
||||
### All Batches:
|
||||
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.17.1"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
|
||||
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.18.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
|
||||
----------------------------------------------------------------------
|
||||
### Batch to tasks mapping:
|
||||
0 [0,]
|
||||
|
||||
@@ -43,7 +43,7 @@ async fn version_too_old() {
|
||||
std::fs::write(db_path.join("VERSION"), "1.11.9999").unwrap();
|
||||
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
|
||||
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
|
||||
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.17.1");
|
||||
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.18.0");
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
@@ -58,7 +58,7 @@ async fn version_requires_downgrade() {
|
||||
std::fs::write(db_path.join("VERSION"), format!("{major}.{minor}.{patch}")).unwrap();
|
||||
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
|
||||
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
|
||||
snapshot!(err, @"Database version 1.17.2 is higher than the Meilisearch version 1.17.1. Downgrade is not supported");
|
||||
snapshot!(err, @"Database version 1.18.1 is higher than the Meilisearch version 1.18.0. Downgrade is not supported");
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
|
||||
@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
|
||||
"progress": null,
|
||||
"details": {
|
||||
"upgradeFrom": "v1.12.0",
|
||||
"upgradeTo": "v1.17.1"
|
||||
"upgradeTo": "v1.18.0"
|
||||
},
|
||||
"stats": {
|
||||
"totalNbTasks": 1,
|
||||
|
||||
@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
|
||||
"progress": null,
|
||||
"details": {
|
||||
"upgradeFrom": "v1.12.0",
|
||||
"upgradeTo": "v1.17.1"
|
||||
"upgradeTo": "v1.18.0"
|
||||
},
|
||||
"stats": {
|
||||
"totalNbTasks": 1,
|
||||
|
||||
@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
|
||||
"progress": null,
|
||||
"details": {
|
||||
"upgradeFrom": "v1.12.0",
|
||||
"upgradeTo": "v1.17.1"
|
||||
"upgradeTo": "v1.18.0"
|
||||
},
|
||||
"stats": {
|
||||
"totalNbTasks": 1,
|
||||
|
||||
@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"upgradeFrom": "v1.12.0",
|
||||
"upgradeTo": "v1.17.1"
|
||||
"upgradeTo": "v1.18.0"
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
|
||||
@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"upgradeFrom": "v1.12.0",
|
||||
"upgradeTo": "v1.17.1"
|
||||
"upgradeTo": "v1.18.0"
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
|
||||
@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"upgradeFrom": "v1.12.0",
|
||||
"upgradeTo": "v1.17.1"
|
||||
"upgradeTo": "v1.18.0"
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
|
||||
@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
|
||||
"progress": null,
|
||||
"details": {
|
||||
"upgradeFrom": "v1.12.0",
|
||||
"upgradeTo": "v1.17.1"
|
||||
"upgradeTo": "v1.18.0"
|
||||
},
|
||||
"stats": {
|
||||
"totalNbTasks": 1,
|
||||
|
||||
@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"upgradeFrom": "v1.12.0",
|
||||
"upgradeTo": "v1.17.1"
|
||||
"upgradeTo": "v1.18.0"
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
|
||||
@@ -104,8 +104,8 @@ async fn binary_quantize_before_sending_documents() {
|
||||
"manual": {
|
||||
"embeddings": [
|
||||
[
|
||||
-1.0,
|
||||
-1.0,
|
||||
0.0,
|
||||
0.0,
|
||||
1.0
|
||||
]
|
||||
],
|
||||
@@ -122,7 +122,7 @@ async fn binary_quantize_before_sending_documents() {
|
||||
[
|
||||
1.0,
|
||||
1.0,
|
||||
-1.0
|
||||
0.0
|
||||
]
|
||||
],
|
||||
"regenerate": false
|
||||
@@ -191,8 +191,8 @@ async fn binary_quantize_after_sending_documents() {
|
||||
"manual": {
|
||||
"embeddings": [
|
||||
[
|
||||
-1.0,
|
||||
-1.0,
|
||||
0.0,
|
||||
0.0,
|
||||
1.0
|
||||
]
|
||||
],
|
||||
@@ -209,7 +209,7 @@ async fn binary_quantize_after_sending_documents() {
|
||||
[
|
||||
1.0,
|
||||
1.0,
|
||||
-1.0
|
||||
0.0
|
||||
]
|
||||
],
|
||||
"regenerate": false
|
||||
@@ -320,7 +320,7 @@ async fn binary_quantize_clear_documents() {
|
||||
}
|
||||
"###);
|
||||
|
||||
// Make sure the arroy DB has been cleared
|
||||
// Make sure the hannoy DB has been cleared
|
||||
let (documents, _code) =
|
||||
index.search_post(json!({ "hybrid": { "embedder": "manual" }, "vector": [1, 1, 1] })).await;
|
||||
snapshot!(documents, @r#"
|
||||
|
||||
@@ -684,7 +684,7 @@ async fn clear_documents() {
|
||||
}
|
||||
"###);
|
||||
|
||||
// Make sure the arroy DB has been cleared
|
||||
// Make sure the hannoy DB has been cleared
|
||||
let (documents, _code) =
|
||||
index.search_post(json!({ "vector": [1, 1, 1], "hybrid": {"embedder": "manual"} })).await;
|
||||
snapshot!(documents, @r#"
|
||||
|
||||
@@ -236,7 +236,7 @@ async fn reset_embedder_documents() {
|
||||
}
|
||||
"###);
|
||||
|
||||
// Make sure the arroy DB has been cleared
|
||||
// Make sure the hannoy DB has been cleared
|
||||
let (documents, _code) =
|
||||
index.search_post(json!({ "vector": [1, 1, 1], "hybrid": {"embedder": "default"} })).await;
|
||||
snapshot!(json_string!(documents), @r###"
|
||||
|
||||
@@ -142,8 +142,8 @@ enum Command {
|
||||
|
||||
#[derive(Clone, ValueEnum)]
|
||||
enum IndexPart {
|
||||
/// Will make the arroy index hot.
|
||||
Arroy,
|
||||
/// Will make the hannoy index hot.
|
||||
Hannoy,
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
@@ -658,12 +658,12 @@ fn hair_dryer(
|
||||
let rtxn = index.read_txn()?;
|
||||
for part in index_parts {
|
||||
match part {
|
||||
IndexPart::Arroy => {
|
||||
IndexPart::Hannoy => {
|
||||
let mut count = 0;
|
||||
let total = index.vector_arroy.len(&rtxn)?;
|
||||
eprintln!("Hair drying arroy for {uid}...");
|
||||
let total = index.vector_store.len(&rtxn)?;
|
||||
eprintln!("Hair drying hannoy for {uid}...");
|
||||
for (i, result) in index
|
||||
.vector_arroy
|
||||
.vector_store
|
||||
.remap_types::<Bytes, Bytes>()
|
||||
.iter(&rtxn)?
|
||||
.enumerate()
|
||||
|
||||
@@ -68,7 +68,7 @@ pub fn v1_10_to_v1_11(
|
||||
)
|
||||
})?;
|
||||
let index_read_database =
|
||||
try_opening_poly_database(&index_env, &index_rtxn, db_name::VECTOR_ARROY)
|
||||
try_opening_poly_database(&index_env, &index_rtxn, db_name::VECTOR_STORE)
|
||||
.with_context(|| format!("while updating date format for index `{uid}`"))?;
|
||||
|
||||
let mut index_wtxn = index_env.write_txn().with_context(|| {
|
||||
@@ -79,7 +79,7 @@ pub fn v1_10_to_v1_11(
|
||||
})?;
|
||||
|
||||
let index_write_database =
|
||||
try_opening_poly_database(&index_env, &index_wtxn, db_name::VECTOR_ARROY)
|
||||
try_opening_poly_database(&index_env, &index_wtxn, db_name::VECTOR_STORE)
|
||||
.with_context(|| format!("while updating date format for index `{uid}`"))?;
|
||||
|
||||
meilisearch_types::milli::arroy::upgrade::cosine_from_0_4_to_0_5(
|
||||
|
||||
@@ -88,6 +88,7 @@ rhai = { version = "1.22.2", features = [
|
||||
"sync",
|
||||
] }
|
||||
arroy = "0.6.1"
|
||||
hannoy = "0.0.4"
|
||||
rand = "0.8.5"
|
||||
tracing = "0.1.41"
|
||||
ureq = { version = "2.12.1", features = ["json"] }
|
||||
@@ -95,6 +96,7 @@ url = "2.5.4"
|
||||
hashbrown = "0.15.4"
|
||||
bumpalo = "3.18.1"
|
||||
bumparaw-collections = "0.1.4"
|
||||
steppe = { version = "0.4.0", default-features = false }
|
||||
thread_local = "1.1.9"
|
||||
allocator-api2 = "0.3.0"
|
||||
rustc-hash = "2.1.1"
|
||||
|
||||
@@ -1,17 +1,13 @@
|
||||
use crate::{
|
||||
distance_between_two_points,
|
||||
heed_codec::facet::{FieldDocIdFacetCodec, OrderedF64Codec},
|
||||
lat_lng_to_xyz,
|
||||
search::new::{facet_string_values, facet_values_prefix_key},
|
||||
GeoPoint, Index,
|
||||
};
|
||||
use heed::{
|
||||
types::{Bytes, Unit},
|
||||
RoPrefix, RoTxn,
|
||||
};
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use heed::types::{Bytes, Unit};
|
||||
use heed::{RoPrefix, RoTxn};
|
||||
use roaring::RoaringBitmap;
|
||||
use rstar::RTree;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use crate::heed_codec::facet::{FieldDocIdFacetCodec, OrderedF64Codec};
|
||||
use crate::search::new::{facet_string_values, facet_values_prefix_key};
|
||||
use crate::{distance_between_two_points, lat_lng_to_xyz, GeoPoint, Index};
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct GeoSortParameter {
|
||||
|
||||
@@ -1,19 +1,16 @@
|
||||
use std::collections::{BTreeSet, VecDeque};
|
||||
|
||||
use crate::{
|
||||
constants::RESERVED_GEO_FIELD_NAME,
|
||||
documents::{geo_sort::next_bucket, GeoSortParameter},
|
||||
heed_codec::{
|
||||
facet::{FacetGroupKeyCodec, FacetGroupValueCodec},
|
||||
BytesRefCodec,
|
||||
},
|
||||
is_faceted,
|
||||
search::facet::{ascending_facet_sort, descending_facet_sort},
|
||||
AscDesc, DocumentId, Member, UserError,
|
||||
};
|
||||
use heed::Database;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::constants::RESERVED_GEO_FIELD_NAME;
|
||||
use crate::documents::geo_sort::next_bucket;
|
||||
use crate::documents::GeoSortParameter;
|
||||
use crate::heed_codec::facet::{FacetGroupKeyCodec, FacetGroupValueCodec};
|
||||
use crate::heed_codec::BytesRefCodec;
|
||||
use crate::search::facet::{ascending_facet_sort, descending_facet_sort};
|
||||
use crate::{is_faceted, AscDesc, DocumentId, Member, UserError};
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum AscDescId {
|
||||
Facet { field_id: u16, ascending: bool },
|
||||
|
||||
@@ -78,6 +78,8 @@ pub enum InternalError {
|
||||
#[error(transparent)]
|
||||
ArroyError(#[from] arroy::Error),
|
||||
#[error(transparent)]
|
||||
HannoyError(#[from] hannoy::Error),
|
||||
#[error(transparent)]
|
||||
VectorEmbeddingError(#[from] crate::vector::Error),
|
||||
}
|
||||
|
||||
@@ -441,6 +443,29 @@ impl From<arroy::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hannoy::Error> for Error {
|
||||
fn from(value: hannoy::Error) -> Self {
|
||||
match value {
|
||||
hannoy::Error::Heed(heed) => heed.into(),
|
||||
hannoy::Error::Io(io) => io.into(),
|
||||
hannoy::Error::InvalidVecDimension { expected, received } => {
|
||||
Error::UserError(UserError::InvalidVectorDimensions { expected, found: received })
|
||||
}
|
||||
hannoy::Error::BuildCancelled => Error::InternalError(InternalError::AbortedIndexation),
|
||||
hannoy::Error::DatabaseFull
|
||||
| hannoy::Error::InvalidItemAppend
|
||||
| hannoy::Error::UnmatchingDistance { .. }
|
||||
| hannoy::Error::NeedBuild(_)
|
||||
| hannoy::Error::MissingKey { .. }
|
||||
| hannoy::Error::MissingMetadata(_)
|
||||
| hannoy::Error::UnknownVersion { .. }
|
||||
| hannoy::Error::CannotDecodeKeyMode { .. } => {
|
||||
Error::InternalError(InternalError::HannoyError(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum GeoError {
|
||||
#[error("The `_geo` field in the document with the id: `{document_id}` is not an object. Was expecting an object with the `_geo.lat` and `_geo.lng` fields but instead got `{value}`.")]
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::prompt::PromptData;
|
||||
use crate::proximity::ProximityPrecision;
|
||||
use crate::update::new::StdResult;
|
||||
use crate::vector::db::IndexEmbeddingConfigs;
|
||||
use crate::vector::{ArroyStats, ArroyWrapper, Embedding};
|
||||
use crate::vector::{Embedding, HannoyStats, VectorStore};
|
||||
use crate::{
|
||||
default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds,
|
||||
FacetDistribution, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldIdWordCountCodec,
|
||||
@@ -113,7 +113,7 @@ pub mod db_name {
|
||||
pub const FIELD_ID_DOCID_FACET_F64S: &str = "field-id-docid-facet-f64s";
|
||||
pub const FIELD_ID_DOCID_FACET_STRINGS: &str = "field-id-docid-facet-strings";
|
||||
pub const VECTOR_EMBEDDER_CATEGORY_ID: &str = "vector-embedder-category-id";
|
||||
pub const VECTOR_ARROY: &str = "vector-arroy";
|
||||
pub const VECTOR_STORE: &str = "vector-arroy";
|
||||
pub const DOCUMENTS: &str = "documents";
|
||||
}
|
||||
const NUMBER_OF_DBS: u32 = 25;
|
||||
@@ -177,10 +177,10 @@ pub struct Index {
|
||||
/// Maps the document id, the facet field id and the strings.
|
||||
pub field_id_docid_facet_strings: Database<FieldDocIdFacetStringCodec, Str>,
|
||||
|
||||
/// Maps an embedder name to its id in the arroy store.
|
||||
/// Maps an embedder name to its id in the hannoy store.
|
||||
pub(crate) embedder_category_id: Database<Unspecified, Unspecified>,
|
||||
/// Vector store based on arroy™.
|
||||
pub vector_arroy: arroy::Database<Unspecified>,
|
||||
/// Vector store based on hannoy™.
|
||||
pub vector_store: hannoy::Database<Unspecified>,
|
||||
|
||||
/// Maps the document id to the document as an obkv store.
|
||||
pub(crate) documents: Database<BEU32, ObkvCodec>,
|
||||
@@ -237,7 +237,7 @@ impl Index {
|
||||
// vector stuff
|
||||
let embedder_category_id =
|
||||
env.create_database(&mut wtxn, Some(VECTOR_EMBEDDER_CATEGORY_ID))?;
|
||||
let vector_arroy = env.create_database(&mut wtxn, Some(VECTOR_ARROY))?;
|
||||
let vector_store = env.create_database(&mut wtxn, Some(VECTOR_STORE))?;
|
||||
|
||||
let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?;
|
||||
|
||||
@@ -264,7 +264,7 @@ impl Index {
|
||||
facet_id_is_empty_docids,
|
||||
field_id_docid_facet_f64s,
|
||||
field_id_docid_facet_strings,
|
||||
vector_arroy,
|
||||
vector_store,
|
||||
embedder_category_id,
|
||||
documents,
|
||||
};
|
||||
@@ -1769,11 +1769,13 @@ impl Index {
|
||||
) -> Result<BTreeMap<String, EmbeddingsWithMetadata>> {
|
||||
let mut res = BTreeMap::new();
|
||||
let embedders = self.embedding_configs();
|
||||
let index_version = self.get_version(rtxn)?.unwrap();
|
||||
for config in embedders.embedding_configs(rtxn)? {
|
||||
let embedder_info = embedders.embedder_info(rtxn, &config.name)?.unwrap();
|
||||
let has_fragments = config.config.embedder_options.has_fragments();
|
||||
let reader = ArroyWrapper::new(
|
||||
self.vector_arroy,
|
||||
let reader = VectorStore::new(
|
||||
index_version,
|
||||
self.vector_store,
|
||||
embedder_info.embedder_id,
|
||||
config.config.quantized(),
|
||||
);
|
||||
@@ -1792,13 +1794,18 @@ impl Index {
|
||||
Ok(PrefixSettings { compute_prefixes, max_prefix_length: 4, prefix_count_threshold: 100 })
|
||||
}
|
||||
|
||||
pub fn arroy_stats(&self, rtxn: &RoTxn<'_>) -> Result<ArroyStats> {
|
||||
let mut stats = ArroyStats::default();
|
||||
pub fn hannoy_stats(&self, rtxn: &RoTxn<'_>) -> Result<HannoyStats> {
|
||||
let mut stats = HannoyStats::default();
|
||||
let embedding_configs = self.embedding_configs();
|
||||
let index_version = self.get_version(rtxn)?.unwrap();
|
||||
for config in embedding_configs.embedding_configs(rtxn)? {
|
||||
let embedder_id = embedding_configs.embedder_id(rtxn, &config.name)?.unwrap();
|
||||
let reader =
|
||||
ArroyWrapper::new(self.vector_arroy, embedder_id, config.config.quantized());
|
||||
let reader = VectorStore::new(
|
||||
index_version,
|
||||
self.vector_store,
|
||||
embedder_id,
|
||||
config.config.quantized(),
|
||||
);
|
||||
reader.aggregate_stats(rtxn, &mut stats)?;
|
||||
}
|
||||
Ok(stats)
|
||||
@@ -1842,7 +1849,7 @@ impl Index {
|
||||
facet_id_is_empty_docids,
|
||||
field_id_docid_facet_f64s,
|
||||
field_id_docid_facet_strings,
|
||||
vector_arroy,
|
||||
vector_store: vector_hannoy,
|
||||
embedder_category_id,
|
||||
documents,
|
||||
} = self;
|
||||
@@ -1913,7 +1920,7 @@ impl Index {
|
||||
"field_id_docid_facet_strings",
|
||||
field_id_docid_facet_strings.stat(rtxn).map(compute_size)?,
|
||||
);
|
||||
sizes.insert("vector_arroy", vector_arroy.stat(rtxn).map(compute_size)?);
|
||||
sizes.insert("vector_hannoy", vector_hannoy.stat(rtxn).map(compute_size)?);
|
||||
sizes.insert("embedder_category_id", embedder_category_id.stat(rtxn).map(compute_size)?);
|
||||
sizes.insert("documents", documents.stat(rtxn).map(compute_size)?);
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ pub use search::new::{
|
||||
};
|
||||
use serde_json::Value;
|
||||
pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||
pub use {arroy, charabia as tokenizer, heed, rhai};
|
||||
pub use {arroy, charabia as tokenizer, hannoy, heed, rhai};
|
||||
|
||||
pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError};
|
||||
pub use self::attribute_patterns::{AttributePatterns, PatternMatch};
|
||||
|
||||
@@ -5,7 +5,6 @@ use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use enum_iterator::Sequence;
|
||||
use indexmap::IndexMap;
|
||||
use itertools::Itertools;
|
||||
use serde::Serialize;
|
||||
@@ -96,14 +95,6 @@ impl Progress {
|
||||
|
||||
durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect()
|
||||
}
|
||||
|
||||
// TODO: ideally we should expose the progress in a way that let arroy use it directly
|
||||
pub(crate) fn update_progress_from_arroy(&self, progress: arroy::WriterProgress) {
|
||||
self.update_progress(progress.main);
|
||||
if let Some(sub) = progress.sub {
|
||||
self.update_progress(sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate the names associated with the durations and push them.
|
||||
@@ -277,43 +268,26 @@ impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
|
||||
}
|
||||
}
|
||||
|
||||
impl Step for arroy::MainStep {
|
||||
fn name(&self) -> Cow<'static, str> {
|
||||
match self {
|
||||
arroy::MainStep::PreProcessingTheItems => "pre processing the items",
|
||||
arroy::MainStep::WritingTheDescendantsAndMetadata => {
|
||||
"writing the descendants and metadata"
|
||||
}
|
||||
arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
|
||||
arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes",
|
||||
arroy::MainStep::UpdatingTheTrees => "updating the trees",
|
||||
arroy::MainStep::CreateNewTrees => "create new trees",
|
||||
arroy::MainStep::WritingNodesToDatabase => "writing nodes to database",
|
||||
arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees",
|
||||
arroy::MainStep::WriteTheMetadata => "write the metadata",
|
||||
}
|
||||
.into()
|
||||
}
|
||||
// Integration with steppe
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
*self as u32
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
Self::CARDINALITY as u32
|
||||
impl steppe::Progress for Progress {
|
||||
fn update(&self, sub_progress: impl steppe::Step) {
|
||||
self.update_progress(Compat(sub_progress));
|
||||
}
|
||||
}
|
||||
|
||||
impl Step for arroy::SubStep {
|
||||
struct Compat<T: steppe::Step>(T);
|
||||
|
||||
impl<T: steppe::Step> Step for Compat<T> {
|
||||
fn name(&self) -> Cow<'static, str> {
|
||||
self.unit.into()
|
||||
self.0.name()
|
||||
}
|
||||
|
||||
fn current(&self) -> u32 {
|
||||
self.current.load(Ordering::Relaxed)
|
||||
self.0.current().try_into().unwrap_or(u32::MAX)
|
||||
}
|
||||
|
||||
fn total(&self) -> u32 {
|
||||
self.max
|
||||
self.0.total().try_into().unwrap_or(u32::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use roaring::{MultiOps, RoaringBitmap};
|
||||
|
||||
use crate::error::{DidYouMean, Error};
|
||||
use crate::vector::db::IndexEmbeddingConfig;
|
||||
use crate::vector::{ArroyStats, ArroyWrapper};
|
||||
use crate::vector::{HannoyStats, VectorStore};
|
||||
use crate::Index;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -82,6 +82,7 @@ fn evaluate_inner(
|
||||
embedding_configs: &[IndexEmbeddingConfig],
|
||||
filter: &VectorFilter<'_>,
|
||||
) -> crate::Result<RoaringBitmap> {
|
||||
let index_version = index.get_version(rtxn)?.unwrap();
|
||||
let embedder_name = embedder.value();
|
||||
let available_embedders =
|
||||
|| embedding_configs.iter().map(|c| c.name.clone()).collect::<Vec<_>>();
|
||||
@@ -96,8 +97,9 @@ fn evaluate_inner(
|
||||
.embedder_info(rtxn, embedder_name)?
|
||||
.ok_or_else(|| EmbedderDoesNotExist { embedder, available: available_embedders() })?;
|
||||
|
||||
let arroy_wrapper = ArroyWrapper::new(
|
||||
index.vector_arroy,
|
||||
let vector_store = VectorStore::new(
|
||||
index_version,
|
||||
index.vector_store,
|
||||
embedder_info.embedder_id,
|
||||
embedding_config.config.quantized(),
|
||||
);
|
||||
@@ -122,7 +124,7 @@ fn evaluate_inner(
|
||||
})?;
|
||||
|
||||
let user_provided_docids = embedder_info.embedding_status.user_provided_docids();
|
||||
arroy_wrapper.items_in_store(rtxn, fragment_config.id, |bitmap| {
|
||||
vector_store.items_in_store(rtxn, fragment_config.id, |bitmap| {
|
||||
bitmap.clone() - user_provided_docids
|
||||
})?
|
||||
}
|
||||
@@ -132,8 +134,8 @@ fn evaluate_inner(
|
||||
}
|
||||
|
||||
let user_provided_docids = embedder_info.embedding_status.user_provided_docids();
|
||||
let mut stats = ArroyStats::default();
|
||||
arroy_wrapper.aggregate_stats(rtxn, &mut stats)?;
|
||||
let mut stats = HannoyStats::default();
|
||||
vector_store.aggregate_stats(rtxn, &mut stats)?;
|
||||
stats.documents - user_provided_docids.clone()
|
||||
}
|
||||
VectorFilter::UserProvided => {
|
||||
@@ -141,14 +143,14 @@ fn evaluate_inner(
|
||||
user_provided_docids.clone()
|
||||
}
|
||||
VectorFilter::Regenerate => {
|
||||
let mut stats = ArroyStats::default();
|
||||
arroy_wrapper.aggregate_stats(rtxn, &mut stats)?;
|
||||
let mut stats = HannoyStats::default();
|
||||
vector_store.aggregate_stats(rtxn, &mut stats)?;
|
||||
let skip_regenerate = embedder_info.embedding_status.skip_regenerate_docids();
|
||||
stats.documents - skip_regenerate
|
||||
}
|
||||
VectorFilter::None => {
|
||||
let mut stats = ArroyStats::default();
|
||||
arroy_wrapper.aggregate_stats(rtxn, &mut stats)?;
|
||||
let mut stats = HannoyStats::default();
|
||||
vector_store.aggregate_stats(rtxn, &mut stats)?;
|
||||
stats.documents
|
||||
}
|
||||
};
|
||||
|
||||
@@ -6,7 +6,7 @@ use roaring::RoaringBitmap;
|
||||
use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait};
|
||||
use super::VectorStoreStats;
|
||||
use crate::score_details::{self, ScoreDetails};
|
||||
use crate::vector::{ArroyWrapper, DistributionShift, Embedder};
|
||||
use crate::vector::{DistributionShift, Embedder, VectorStore};
|
||||
use crate::{DocumentId, Result, SearchContext, SearchLogger};
|
||||
|
||||
pub struct VectorSort<Q: RankingRuleQueryTrait> {
|
||||
@@ -56,7 +56,12 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
|
||||
let target = &self.target;
|
||||
|
||||
let before = Instant::now();
|
||||
let reader = ArroyWrapper::new(ctx.index.vector_arroy, self.embedder_index, self.quantized);
|
||||
let reader = VectorStore::new(
|
||||
ctx.index.get_version(ctx.txn)?.unwrap(),
|
||||
ctx.index.vector_store,
|
||||
self.embedder_index,
|
||||
self.quantized,
|
||||
);
|
||||
let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?;
|
||||
self.cached_sorted_docids = results.into_iter();
|
||||
*ctx.vector_store_stats.get_or_insert_default() += VectorStoreStats {
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::score_details::{self, ScoreDetails};
|
||||
use crate::vector::{ArroyWrapper, Embedder};
|
||||
use crate::vector::{Embedder, VectorStore};
|
||||
use crate::{filtered_universe, DocumentId, Filter, Index, Result, SearchResult};
|
||||
|
||||
pub struct Similar<'a> {
|
||||
@@ -72,7 +72,12 @@ impl<'a> Similar<'a> {
|
||||
crate::UserError::InvalidSimilarEmbedder(self.embedder_name.to_owned())
|
||||
})?;
|
||||
|
||||
let reader = ArroyWrapper::new(self.index.vector_arroy, embedder_index, self.quantized);
|
||||
let reader = VectorStore::new(
|
||||
self.index.get_version(self.rtxn)?.unwrap(),
|
||||
self.index.vector_store,
|
||||
embedder_index,
|
||||
self.quantized,
|
||||
);
|
||||
let results = reader.nns_by_item(
|
||||
self.rtxn,
|
||||
self.id,
|
||||
|
||||
@@ -2,7 +2,8 @@ use heed::RwTxn;
|
||||
use roaring::RoaringBitmap;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::{database_stats::DatabaseStats, FieldDistribution, Index, Result};
|
||||
use crate::database_stats::DatabaseStats;
|
||||
use crate::{FieldDistribution, Index, Result};
|
||||
|
||||
pub struct ClearDocuments<'t, 'i> {
|
||||
wtxn: &'t mut RwTxn<'i>,
|
||||
@@ -45,7 +46,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
|
||||
facet_id_is_empty_docids,
|
||||
field_id_docid_facet_f64s,
|
||||
field_id_docid_facet_strings,
|
||||
vector_arroy,
|
||||
vector_store,
|
||||
embedder_category_id: _,
|
||||
documents,
|
||||
} = self.index;
|
||||
@@ -88,7 +89,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
|
||||
field_id_docid_facet_f64s.clear(self.wtxn)?;
|
||||
field_id_docid_facet_strings.clear(self.wtxn)?;
|
||||
// vector
|
||||
vector_arroy.clear(self.wtxn)?;
|
||||
vector_store.clear(self.wtxn)?;
|
||||
|
||||
documents.clear(self.wtxn)?;
|
||||
|
||||
|
||||
@@ -2,9 +2,8 @@ use std::collections::BTreeSet;
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufReader};
|
||||
|
||||
use heed::{BytesDecode, BytesEncode};
|
||||
use heed::BytesDecode;
|
||||
use obkv::KvReaderU16;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use super::helpers::{
|
||||
create_sorter, create_writer, try_split_array_at, writer_into_reader, GrenadParameters,
|
||||
@@ -16,7 +15,7 @@ use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::index_documents::helpers::sorter_into_reader;
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result};
|
||||
use crate::{DocumentId, FieldId, Result};
|
||||
|
||||
/// Extracts the word and the documents ids where this word appear.
|
||||
///
|
||||
@@ -201,45 +200,3 @@ fn words_into_sorter(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
|
||||
fn docids_into_writers<W>(
|
||||
word: &str,
|
||||
deletions: &RoaringBitmap,
|
||||
additions: &RoaringBitmap,
|
||||
writer: &mut grenad::Writer<W>,
|
||||
) -> Result<()>
|
||||
where
|
||||
W: std::io::Write,
|
||||
{
|
||||
if deletions == additions {
|
||||
// if the same value is deleted and added, do nothing.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Write each value in the same KvDelAdd before inserting it in the final writer.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
// deletions:
|
||||
if !deletions.is_empty() && !deletions.is_subset(additions) {
|
||||
obkv.insert(
|
||||
DelAdd::Deletion,
|
||||
CboRoaringBitmapCodec::bytes_encode(deletions).map_err(|_| {
|
||||
SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) }
|
||||
})?,
|
||||
)?;
|
||||
}
|
||||
// additions:
|
||||
if !additions.is_empty() {
|
||||
obkv.insert(
|
||||
DelAdd::Addition,
|
||||
CboRoaringBitmapCodec::bytes_encode(additions).map_err(|_| {
|
||||
SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) }
|
||||
})?,
|
||||
)?;
|
||||
}
|
||||
|
||||
// insert everything in the same writer.
|
||||
writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::update::{
|
||||
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
|
||||
};
|
||||
use crate::vector::db::EmbedderInfo;
|
||||
use crate::vector::{ArroyWrapper, RuntimeEmbedders};
|
||||
use crate::vector::{RuntimeEmbedders, VectorStore};
|
||||
use crate::{CboRoaringBitmapCodec, Index, Result, UserError};
|
||||
|
||||
static MERGED_DATABASE_COUNT: usize = 7;
|
||||
@@ -485,6 +485,7 @@ where
|
||||
|
||||
// If an embedder wasn't used in the typedchunk but must be binary quantized
|
||||
// we should insert it in `dimension`
|
||||
let index_version = self.index.get_version(self.wtxn)?.unwrap();
|
||||
for (name, action) in settings_diff.embedding_config_updates.iter() {
|
||||
if action.is_being_quantized && !dimension.contains_key(name.as_str()) {
|
||||
let index = self.index.embedding_configs().embedder_id(self.wtxn, name)?.ok_or(
|
||||
@@ -493,8 +494,12 @@ where
|
||||
key: None,
|
||||
},
|
||||
)?;
|
||||
let reader =
|
||||
ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized);
|
||||
let reader = VectorStore::new(
|
||||
index_version,
|
||||
self.index.vector_store,
|
||||
index,
|
||||
action.was_quantized,
|
||||
);
|
||||
let Some(dim) = reader.dimensions(self.wtxn)? else {
|
||||
continue;
|
||||
};
|
||||
@@ -504,7 +509,7 @@ where
|
||||
|
||||
for (embedder_name, dimension) in dimension {
|
||||
let wtxn = &mut *self.wtxn;
|
||||
let vector_arroy = self.index.vector_arroy;
|
||||
let vector_hannoy = self.index.vector_store;
|
||||
let cancel = &self.should_abort;
|
||||
|
||||
let embedder_index =
|
||||
@@ -523,11 +528,12 @@ where
|
||||
let is_quantizing = embedder_config.is_some_and(|action| action.is_being_quantized);
|
||||
|
||||
pool.install(|| {
|
||||
let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized);
|
||||
let mut writer =
|
||||
VectorStore::new(index_version, vector_hannoy, embedder_index, was_quantized);
|
||||
writer.build_and_quantize(
|
||||
wtxn,
|
||||
// In the settings we don't have any progress to share
|
||||
&Progress::default(),
|
||||
Progress::default(),
|
||||
&mut rng,
|
||||
dimension,
|
||||
is_quantizing,
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
|
||||
use crate::update::{AvailableIds, UpdateIndexingStep};
|
||||
use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
|
||||
use crate::vector::settings::{RemoveFragments, WriteBackToDocuments};
|
||||
use crate::vector::ArroyWrapper;
|
||||
use crate::vector::VectorStore;
|
||||
use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, Index, Result};
|
||||
|
||||
pub struct TransformOutput {
|
||||
@@ -834,15 +834,17 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
None
|
||||
};
|
||||
|
||||
let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff
|
||||
let index_version = self.index.get_version(wtxn)?.unwrap();
|
||||
let readers: BTreeMap<&str, (VectorStore, &RoaringBitmap)> = settings_diff
|
||||
.embedding_config_updates
|
||||
.iter()
|
||||
.filter_map(|(name, action)| {
|
||||
if let Some(WriteBackToDocuments { embedder_id, user_provided }) =
|
||||
action.write_back()
|
||||
{
|
||||
let reader = ArroyWrapper::new(
|
||||
self.index.vector_arroy,
|
||||
let reader = VectorStore::new(
|
||||
index_version,
|
||||
self.index.vector_store,
|
||||
*embedder_id,
|
||||
action.was_quantized,
|
||||
);
|
||||
@@ -882,10 +884,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
||||
)?;
|
||||
|
||||
let injected_vectors: std::result::Result<
|
||||
serde_json::Map<String, serde_json::Value>,
|
||||
arroy::Error,
|
||||
> = readers
|
||||
let injected_vectors: crate::Result<_> = readers
|
||||
.iter()
|
||||
.filter_map(|(name, (reader, user_provided))| {
|
||||
if !user_provided.contains(docid) {
|
||||
@@ -949,9 +948,13 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let arroy =
|
||||
ArroyWrapper::new(self.index.vector_arroy, infos.embedder_id, was_quantized);
|
||||
let Some(dimensions) = arroy.dimensions(wtxn)? else {
|
||||
let hannoy = VectorStore::new(
|
||||
index_version,
|
||||
self.index.vector_store,
|
||||
infos.embedder_id,
|
||||
was_quantized,
|
||||
);
|
||||
let Some(dimensions) = hannoy.dimensions(wtxn)? else {
|
||||
continue;
|
||||
};
|
||||
for fragment_id in fragment_ids {
|
||||
@@ -959,17 +962,17 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
|
||||
if infos.embedding_status.user_provided_docids().is_empty() {
|
||||
// no user provided: clear store
|
||||
arroy.clear_store(wtxn, *fragment_id, dimensions)?;
|
||||
hannoy.clear_store(wtxn, *fragment_id, dimensions)?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// some user provided, remove only the ids that are not user provided
|
||||
let to_delete = arroy.items_in_store(wtxn, *fragment_id, |items| {
|
||||
let to_delete = hannoy.items_in_store(wtxn, *fragment_id, |items| {
|
||||
items - infos.embedding_status.user_provided_docids()
|
||||
})?;
|
||||
|
||||
for to_delete in to_delete {
|
||||
arroy.del_item_in_store(wtxn, to_delete, *fragment_id, dimensions)?;
|
||||
hannoy.del_item_in_store(wtxn, to_delete, *fragment_id, dimensions)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::update::index_documents::helpers::{
|
||||
};
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::vector::db::{EmbeddingStatusDelta, IndexEmbeddingConfig};
|
||||
use crate::vector::ArroyWrapper;
|
||||
use crate::vector::VectorStore;
|
||||
use crate::{
|
||||
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
|
||||
Result, SerializationError, U8StrStrCodec,
|
||||
@@ -619,6 +619,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
let _entered = span.enter();
|
||||
|
||||
let embedders = index.embedding_configs();
|
||||
let index_version = index.get_version(wtxn)?.unwrap();
|
||||
|
||||
let mut remove_vectors_builder = MergerBuilder::new(KeepFirst);
|
||||
let mut manual_vectors_builder = MergerBuilder::new(KeepFirst);
|
||||
@@ -677,7 +678,12 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
.get(&embedder_name)
|
||||
.is_some_and(|conf| conf.is_quantized);
|
||||
// FIXME: allow customizing distance
|
||||
let writer = ArroyWrapper::new(index.vector_arroy, infos.embedder_id, binary_quantized);
|
||||
let writer = VectorStore::new(
|
||||
index_version,
|
||||
index.vector_store,
|
||||
infos.embedder_id,
|
||||
binary_quantized,
|
||||
);
|
||||
|
||||
// remove vectors for docids we want them removed
|
||||
let merger = remove_vectors_builder.build();
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use grenad::CompressionType;
|
||||
|
||||
use super::GrenadParameters;
|
||||
use crate::{thread_pool_no_abort::ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||
use crate::thread_pool_no_abort::ThreadPoolNoAbort;
|
||||
use crate::ThreadPoolNoAbortBuilder;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IndexerConfig {
|
||||
|
||||
@@ -255,9 +255,9 @@ impl<'a> From<FrameGrantR<'a>> for FrameWithHeader<'a> {
|
||||
#[repr(u8)]
|
||||
pub enum EntryHeader {
|
||||
DbOperation(DbOperation),
|
||||
ArroyDeleteVector(ArroyDeleteVector),
|
||||
ArroySetVectors(ArroySetVectors),
|
||||
ArroySetVector(ArroySetVector),
|
||||
HannoyDeleteVector(HannoyDeleteVector),
|
||||
HannoySetVectors(HannoySetVectors),
|
||||
HannoySetVector(HannoySetVector),
|
||||
}
|
||||
|
||||
impl EntryHeader {
|
||||
@@ -268,9 +268,9 @@ impl EntryHeader {
|
||||
const fn variant_id(&self) -> u8 {
|
||||
match self {
|
||||
EntryHeader::DbOperation(_) => 0,
|
||||
EntryHeader::ArroyDeleteVector(_) => 1,
|
||||
EntryHeader::ArroySetVectors(_) => 2,
|
||||
EntryHeader::ArroySetVector(_) => 3,
|
||||
EntryHeader::HannoyDeleteVector(_) => 1,
|
||||
EntryHeader::HannoySetVectors(_) => 2,
|
||||
EntryHeader::HannoySetVector(_) => 3,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,26 +286,26 @@ impl EntryHeader {
|
||||
}
|
||||
|
||||
const fn total_delete_vector_size() -> usize {
|
||||
Self::variant_size() + mem::size_of::<ArroyDeleteVector>()
|
||||
Self::variant_size() + mem::size_of::<HannoyDeleteVector>()
|
||||
}
|
||||
|
||||
/// The `dimensions` corresponds to the number of `f32` in the embedding.
|
||||
fn total_set_vectors_size(count: usize, dimensions: usize) -> usize {
|
||||
let embedding_size = dimensions * mem::size_of::<f32>();
|
||||
Self::variant_size() + mem::size_of::<ArroySetVectors>() + embedding_size * count
|
||||
Self::variant_size() + mem::size_of::<HannoySetVectors>() + embedding_size * count
|
||||
}
|
||||
|
||||
fn total_set_vector_size(dimensions: usize) -> usize {
|
||||
let embedding_size = dimensions * mem::size_of::<f32>();
|
||||
Self::variant_size() + mem::size_of::<ArroySetVector>() + embedding_size
|
||||
Self::variant_size() + mem::size_of::<HannoySetVector>() + embedding_size
|
||||
}
|
||||
|
||||
fn header_size(&self) -> usize {
|
||||
let payload_size = match self {
|
||||
EntryHeader::DbOperation(op) => mem::size_of_val(op),
|
||||
EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv),
|
||||
EntryHeader::ArroySetVectors(asvs) => mem::size_of_val(asvs),
|
||||
EntryHeader::ArroySetVector(asv) => mem::size_of_val(asv),
|
||||
EntryHeader::HannoyDeleteVector(adv) => mem::size_of_val(adv),
|
||||
EntryHeader::HannoySetVectors(asvs) => mem::size_of_val(asvs),
|
||||
EntryHeader::HannoySetVector(asv) => mem::size_of_val(asv),
|
||||
};
|
||||
Self::variant_size() + payload_size
|
||||
}
|
||||
@@ -319,19 +319,19 @@ impl EntryHeader {
|
||||
EntryHeader::DbOperation(header)
|
||||
}
|
||||
1 => {
|
||||
let header_bytes = &remaining[..mem::size_of::<ArroyDeleteVector>()];
|
||||
let header_bytes = &remaining[..mem::size_of::<HannoyDeleteVector>()];
|
||||
let header = checked::pod_read_unaligned(header_bytes);
|
||||
EntryHeader::ArroyDeleteVector(header)
|
||||
EntryHeader::HannoyDeleteVector(header)
|
||||
}
|
||||
2 => {
|
||||
let header_bytes = &remaining[..mem::size_of::<ArroySetVectors>()];
|
||||
let header_bytes = &remaining[..mem::size_of::<HannoySetVectors>()];
|
||||
let header = checked::pod_read_unaligned(header_bytes);
|
||||
EntryHeader::ArroySetVectors(header)
|
||||
EntryHeader::HannoySetVectors(header)
|
||||
}
|
||||
3 => {
|
||||
let header_bytes = &remaining[..mem::size_of::<ArroySetVector>()];
|
||||
let header_bytes = &remaining[..mem::size_of::<HannoySetVector>()];
|
||||
let header = checked::pod_read_unaligned(header_bytes);
|
||||
EntryHeader::ArroySetVector(header)
|
||||
EntryHeader::HannoySetVector(header)
|
||||
}
|
||||
id => panic!("invalid variant id: {id}"),
|
||||
}
|
||||
@@ -341,9 +341,9 @@ impl EntryHeader {
|
||||
let (first, remaining) = header_bytes.split_first_mut().unwrap();
|
||||
let payload_bytes = match self {
|
||||
EntryHeader::DbOperation(op) => bytemuck::bytes_of(op),
|
||||
EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv),
|
||||
EntryHeader::ArroySetVectors(asvs) => bytemuck::bytes_of(asvs),
|
||||
EntryHeader::ArroySetVector(asv) => bytemuck::bytes_of(asv),
|
||||
EntryHeader::HannoyDeleteVector(adv) => bytemuck::bytes_of(adv),
|
||||
EntryHeader::HannoySetVectors(asvs) => bytemuck::bytes_of(asvs),
|
||||
EntryHeader::HannoySetVector(asv) => bytemuck::bytes_of(asv),
|
||||
};
|
||||
*first = self.variant_id();
|
||||
remaining.copy_from_slice(payload_bytes);
|
||||
@@ -378,7 +378,7 @@ impl DbOperation {
|
||||
|
||||
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
|
||||
#[repr(transparent)]
|
||||
pub struct ArroyDeleteVector {
|
||||
pub struct HannoyDeleteVector {
|
||||
pub docid: DocumentId,
|
||||
}
|
||||
|
||||
@@ -386,13 +386,13 @@ pub struct ArroyDeleteVector {
|
||||
#[repr(C)]
|
||||
/// The embeddings are in the remaining space and represents
|
||||
/// non-aligned [f32] each with dimensions f32s.
|
||||
pub struct ArroySetVectors {
|
||||
pub struct HannoySetVectors {
|
||||
pub docid: DocumentId,
|
||||
pub embedder_id: u8,
|
||||
_padding: [u8; 3],
|
||||
}
|
||||
|
||||
impl ArroySetVectors {
|
||||
impl HannoySetVectors {
|
||||
fn embeddings_bytes<'a>(frame: &'a FrameGrantR<'_>) -> &'a [u8] {
|
||||
let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
|
||||
&frame[skip..]
|
||||
@@ -416,14 +416,14 @@ impl ArroySetVectors {
|
||||
#[repr(C)]
|
||||
/// The embeddings are in the remaining space and represents
|
||||
/// non-aligned [f32] each with dimensions f32s.
|
||||
pub struct ArroySetVector {
|
||||
pub struct HannoySetVector {
|
||||
pub docid: DocumentId,
|
||||
pub embedder_id: u8,
|
||||
pub extractor_id: u8,
|
||||
_padding: [u8; 2],
|
||||
}
|
||||
|
||||
impl ArroySetVector {
|
||||
impl HannoySetVector {
|
||||
fn embeddings_bytes<'a>(frame: &'a FrameGrantR<'_>) -> &'a [u8] {
|
||||
let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
|
||||
&frame[skip..]
|
||||
@@ -553,7 +553,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
|
||||
let payload_header = EntryHeader::HannoyDeleteVector(HannoyDeleteVector { docid });
|
||||
let total_length = EntryHeader::total_delete_vector_size();
|
||||
if total_length > max_grant {
|
||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
|
||||
@@ -589,8 +589,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
// to zero to allocate no extra space at all
|
||||
let dimensions = embeddings.first().map_or(0, |emb| emb.len());
|
||||
|
||||
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
|
||||
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
|
||||
let hannoy_set_vector = HannoySetVectors { docid, embedder_id, _padding: [0; 3] };
|
||||
let payload_header = EntryHeader::HannoySetVectors(hannoy_set_vector);
|
||||
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
|
||||
if total_length > max_grant {
|
||||
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
|
||||
@@ -650,9 +650,9 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
// to zero to allocate no extra space at all
|
||||
let dimensions = embedding.as_ref().map_or(0, |emb| emb.len());
|
||||
|
||||
let arroy_set_vector =
|
||||
ArroySetVector { docid, embedder_id, extractor_id, _padding: [0; 2] };
|
||||
let payload_header = EntryHeader::ArroySetVector(arroy_set_vector);
|
||||
let hannoy_set_vector =
|
||||
HannoySetVector { docid, embedder_id, extractor_id, _padding: [0; 2] };
|
||||
let payload_header = EntryHeader::HannoySetVector(hannoy_set_vector);
|
||||
let total_length = EntryHeader::total_set_vector_size(dimensions);
|
||||
if total_length > max_grant {
|
||||
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
|
||||
|
||||
@@ -240,12 +240,12 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE
|
||||
/// modifies them by adding or removing vector fields based on embedder actions,
|
||||
/// and then updates the database.
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")]
|
||||
pub fn update_database_documents<'indexer, 'extractor, MSP, SD>(
|
||||
pub fn update_database_documents<'indexer, MSP, SD>(
|
||||
documents: &'indexer DocumentsIndentifiers<'indexer>,
|
||||
indexing_context: IndexingContext<MSP>,
|
||||
extractor_sender: &ExtractorBbqueueSender,
|
||||
settings_delta: &SD,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
|
||||
) -> Result<()>
|
||||
where
|
||||
MSP: Fn() -> bool + Sync,
|
||||
|
||||
@@ -8,7 +8,7 @@ use document_changes::{DocumentChanges, IndexingContext};
|
||||
pub use document_deletion::DocumentDeletion;
|
||||
pub use document_operation::{DocumentOperation, PayloadStats};
|
||||
use hashbrown::HashMap;
|
||||
use heed::RwTxn;
|
||||
use heed::{RoTxn, RwTxn};
|
||||
pub use partial_dump::PartialDump;
|
||||
pub use post_processing::recompute_word_fst_from_word_docids_database;
|
||||
pub use update_by_function::UpdateByFunction;
|
||||
@@ -24,7 +24,7 @@ use crate::progress::{EmbedderStats, Progress};
|
||||
use crate::update::settings::SettingsDelta;
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::vector::settings::{EmbedderAction, RemoveFragments, WriteBackToDocuments};
|
||||
use crate::vector::{ArroyWrapper, Embedder, RuntimeEmbedders};
|
||||
use crate::vector::{Embedder, RuntimeEmbedders, VectorStore};
|
||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
|
||||
|
||||
pub(crate) mod de;
|
||||
@@ -66,7 +66,7 @@ where
|
||||
let mut bbbuffers = Vec::new();
|
||||
let finished_extraction = AtomicBool::new(false);
|
||||
|
||||
let arroy_memory = grenad_parameters.max_memory;
|
||||
let hannoy_memory = grenad_parameters.max_memory;
|
||||
|
||||
let (grenad_parameters, total_bbbuffer_capacity) =
|
||||
indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
|
||||
@@ -129,8 +129,9 @@ where
|
||||
|
||||
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
|
||||
|
||||
let vector_arroy = index.vector_arroy;
|
||||
let arroy_writers: Result<HashMap<_, _>> = embedders
|
||||
let vector_arroy = index.vector_store;
|
||||
let index_version = index.get_version(wtxn)?.unwrap();
|
||||
let hannoy_writers: Result<HashMap<_, _>> = embedders
|
||||
.inner_as_ref()
|
||||
.iter()
|
||||
.map(|(embedder_name, runtime)| {
|
||||
@@ -143,7 +144,12 @@ where
|
||||
})?;
|
||||
|
||||
let dimensions = runtime.embedder.dimensions();
|
||||
let writer = ArroyWrapper::new(vector_arroy, embedder_index, runtime.is_quantized);
|
||||
let writer = VectorStore::new(
|
||||
index_version,
|
||||
vector_arroy,
|
||||
embedder_index,
|
||||
runtime.is_quantized,
|
||||
);
|
||||
|
||||
Ok((
|
||||
embedder_index,
|
||||
@@ -152,10 +158,10 @@ where
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut arroy_writers = arroy_writers?;
|
||||
let mut hannoy_writers = hannoy_writers?;
|
||||
|
||||
let congestion =
|
||||
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
|
||||
write_to_db(writer_receiver, finished_extraction, index, wtxn, &hannoy_writers)?;
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
|
||||
|
||||
@@ -169,8 +175,8 @@ where
|
||||
wtxn,
|
||||
indexing_context.progress,
|
||||
index_embeddings,
|
||||
arroy_memory,
|
||||
&mut arroy_writers,
|
||||
hannoy_memory,
|
||||
&mut hannoy_writers,
|
||||
None,
|
||||
&indexing_context.must_stop_processing,
|
||||
)
|
||||
@@ -226,7 +232,7 @@ where
|
||||
let mut bbbuffers = Vec::new();
|
||||
let finished_extraction = AtomicBool::new(false);
|
||||
|
||||
let arroy_memory = grenad_parameters.max_memory;
|
||||
let hannoy_memory = grenad_parameters.max_memory;
|
||||
|
||||
let (grenad_parameters, total_bbbuffer_capacity) =
|
||||
indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
|
||||
@@ -283,15 +289,16 @@ where
|
||||
let new_embedders = settings_delta.new_embedders();
|
||||
let embedder_actions = settings_delta.embedder_actions();
|
||||
let index_embedder_category_ids = settings_delta.new_embedder_category_id();
|
||||
let mut arroy_writers = arroy_writers_from_embedder_actions(
|
||||
let mut hannoy_writers = hannoy_writers_from_embedder_actions(
|
||||
index,
|
||||
wtxn,
|
||||
embedder_actions,
|
||||
new_embedders,
|
||||
index_embedder_category_ids,
|
||||
)?;
|
||||
|
||||
let congestion =
|
||||
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
|
||||
write_to_db(writer_receiver, finished_extraction, index, wtxn, &hannoy_writers)?;
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
|
||||
|
||||
@@ -305,8 +312,8 @@ where
|
||||
wtxn,
|
||||
indexing_context.progress,
|
||||
index_embeddings,
|
||||
arroy_memory,
|
||||
&mut arroy_writers,
|
||||
hannoy_memory,
|
||||
&mut hannoy_writers,
|
||||
Some(embedder_actions),
|
||||
&indexing_context.must_stop_processing,
|
||||
)
|
||||
@@ -336,13 +343,15 @@ where
|
||||
Ok(congestion)
|
||||
}
|
||||
|
||||
fn arroy_writers_from_embedder_actions<'indexer>(
|
||||
fn hannoy_writers_from_embedder_actions<'indexer>(
|
||||
index: &Index,
|
||||
rtxn: &RoTxn,
|
||||
embedder_actions: &'indexer BTreeMap<String, EmbedderAction>,
|
||||
embedders: &'indexer RuntimeEmbedders,
|
||||
index_embedder_category_ids: &'indexer std::collections::HashMap<String, u8>,
|
||||
) -> Result<HashMap<u8, (&'indexer str, &'indexer Embedder, ArroyWrapper, usize)>> {
|
||||
let vector_arroy = index.vector_arroy;
|
||||
) -> Result<HashMap<u8, (&'indexer str, &'indexer Embedder, VectorStore, usize)>> {
|
||||
let vector_arroy = index.vector_store;
|
||||
let index_version = index.get_version(rtxn)?.unwrap();
|
||||
|
||||
embedders
|
||||
.inner_as_ref()
|
||||
@@ -360,8 +369,12 @@ fn arroy_writers_from_embedder_actions<'indexer>(
|
||||
},
|
||||
)));
|
||||
};
|
||||
let writer =
|
||||
ArroyWrapper::new(vector_arroy, embedder_category_id, action.was_quantized);
|
||||
let writer = VectorStore::new(
|
||||
index_version,
|
||||
vector_arroy,
|
||||
embedder_category_id,
|
||||
action.was_quantized,
|
||||
);
|
||||
let dimensions = runtime.embedder.dimensions();
|
||||
Some(Ok((
|
||||
embedder_category_id,
|
||||
@@ -384,7 +397,12 @@ where
|
||||
let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() else {
|
||||
continue;
|
||||
};
|
||||
let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized);
|
||||
let reader = VectorStore::new(
|
||||
index.get_version(wtxn)?.unwrap(),
|
||||
index.vector_store,
|
||||
*embedder_id,
|
||||
action.was_quantized,
|
||||
);
|
||||
let Some(dimensions) = reader.dimensions(wtxn)? else {
|
||||
continue;
|
||||
};
|
||||
@@ -400,7 +418,12 @@ where
|
||||
let Some(infos) = index.embedding_configs().embedder_info(wtxn, embedder_name)? else {
|
||||
continue;
|
||||
};
|
||||
let arroy = ArroyWrapper::new(index.vector_arroy, infos.embedder_id, was_quantized);
|
||||
let arroy = VectorStore::new(
|
||||
index.get_version(wtxn)?.unwrap(),
|
||||
index.vector_store,
|
||||
infos.embedder_id,
|
||||
was_quantized,
|
||||
);
|
||||
let Some(dimensions) = arroy.dimensions(wtxn)? else {
|
||||
continue;
|
||||
};
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::progress::Progress;
|
||||
use crate::update::settings::InnerIndexSettings;
|
||||
use crate::vector::db::IndexEmbeddingConfig;
|
||||
use crate::vector::settings::EmbedderAction;
|
||||
use crate::vector::{ArroyWrapper, Embedder, Embeddings, RuntimeEmbedders};
|
||||
use crate::vector::{Embedder, Embeddings, RuntimeEmbedders, VectorStore};
|
||||
use crate::{Error, Index, InternalError, Result, UserError};
|
||||
|
||||
pub fn write_to_db(
|
||||
@@ -23,9 +23,9 @@ pub fn write_to_db(
|
||||
finished_extraction: &AtomicBool,
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
arroy_writers: &HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
|
||||
hannoy_writers: &HashMap<u8, (&str, &Embedder, VectorStore, usize)>,
|
||||
) -> Result<ChannelCongestion> {
|
||||
// Used by by the ArroySetVector to copy the embedding into an
|
||||
// Used by by the HannoySetVector to copy the embedding into an
|
||||
// aligned memory area, required by arroy to accept a new vector.
|
||||
let mut aligned_embedding = Vec::new();
|
||||
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
||||
@@ -56,7 +56,7 @@ pub fn write_to_db(
|
||||
ReceiverAction::LargeVectors(large_vectors) => {
|
||||
let LargeVectors { docid, embedder_id, .. } = large_vectors;
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
hannoy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
let mut embeddings = Embeddings::new(*dimensions);
|
||||
for embedding in large_vectors.read_embeddings(*dimensions) {
|
||||
embeddings.push(embedding.to_vec()).unwrap();
|
||||
@@ -68,7 +68,7 @@ pub fn write_to_db(
|
||||
large_vector @ LargeVector { docid, embedder_id, extractor_id, .. },
|
||||
) => {
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
hannoy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
let embedding = large_vector.read_embedding(*dimensions);
|
||||
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
|
||||
}
|
||||
@@ -80,12 +80,12 @@ pub fn write_to_db(
|
||||
&mut writer_receiver,
|
||||
index,
|
||||
wtxn,
|
||||
arroy_writers,
|
||||
hannoy_writers,
|
||||
&mut aligned_embedding,
|
||||
)?;
|
||||
}
|
||||
|
||||
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
|
||||
write_from_bbqueue(&mut writer_receiver, index, wtxn, hannoy_writers, &mut aligned_embedding)?;
|
||||
|
||||
Ok(ChannelCongestion {
|
||||
attempts: writer_receiver.sent_messages_attempts(),
|
||||
@@ -115,8 +115,8 @@ pub fn build_vectors<MSP>(
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
progress: &Progress,
|
||||
index_embeddings: Vec<IndexEmbeddingConfig>,
|
||||
arroy_memory: Option<usize>,
|
||||
arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
|
||||
hannoy_memory: Option<usize>,
|
||||
hannoy_writers: &mut HashMap<u8, (&str, &Embedder, VectorStore, usize)>,
|
||||
embeder_actions: Option<&BTreeMap<String, EmbedderAction>>,
|
||||
must_stop_processing: &MSP,
|
||||
) -> Result<()>
|
||||
@@ -129,18 +129,18 @@ where
|
||||
|
||||
let seed = rand::random();
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
|
||||
for (_index, (embedder_name, _embedder, writer, dimensions)) in arroy_writers {
|
||||
for (_index, (embedder_name, _embedder, writer, dimensions)) in hannoy_writers {
|
||||
let dimensions = *dimensions;
|
||||
let is_being_quantized = embeder_actions
|
||||
.and_then(|actions| actions.get(*embedder_name).map(|action| action.is_being_quantized))
|
||||
.unwrap_or(false);
|
||||
writer.build_and_quantize(
|
||||
wtxn,
|
||||
progress,
|
||||
progress.clone(),
|
||||
&mut rng,
|
||||
dimensions,
|
||||
is_being_quantized,
|
||||
arroy_memory,
|
||||
hannoy_memory,
|
||||
must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
@@ -181,7 +181,7 @@ pub fn write_from_bbqueue(
|
||||
writer_receiver: &mut WriterBbqueueReceiver<'_>,
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>,
|
||||
hannoy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, VectorStore, usize)>,
|
||||
aligned_embedding: &mut Vec<f32>,
|
||||
) -> crate::Result<()> {
|
||||
while let Some(frame_with_header) = writer_receiver.recv_frame() {
|
||||
@@ -221,17 +221,17 @@ pub fn write_from_bbqueue(
|
||||
},
|
||||
}
|
||||
}
|
||||
EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
|
||||
for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers {
|
||||
EntryHeader::HannoyDeleteVector(HannoyDeleteVector { docid }) => {
|
||||
for (_index, (_name, _embedder, writer, dimensions)) in hannoy_writers {
|
||||
let dimensions = *dimensions;
|
||||
writer.del_items(wtxn, dimensions, docid)?;
|
||||
}
|
||||
}
|
||||
EntryHeader::ArroySetVectors(asvs) => {
|
||||
let ArroySetVectors { docid, embedder_id, .. } = asvs;
|
||||
EntryHeader::HannoySetVectors(asvs) => {
|
||||
let HannoySetVectors { docid, embedder_id, .. } = asvs;
|
||||
let frame = frame_with_header.frame();
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
hannoy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
let mut embeddings = Embeddings::new(*dimensions);
|
||||
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);
|
||||
writer.del_items(wtxn, *dimensions, docid)?;
|
||||
@@ -245,12 +245,12 @@ pub fn write_from_bbqueue(
|
||||
writer.add_items(wtxn, docid, &embeddings)?;
|
||||
}
|
||||
}
|
||||
EntryHeader::ArroySetVector(
|
||||
asv @ ArroySetVector { docid, embedder_id, extractor_id, .. },
|
||||
EntryHeader::HannoySetVector(
|
||||
asv @ HannoySetVector { docid, embedder_id, extractor_id, .. },
|
||||
) => {
|
||||
let frame = frame_with_header.frame();
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
hannoy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
let embedding = asv.read_all_embeddings_into_vec(frame, aligned_embedding);
|
||||
|
||||
if embedding.is_empty() {
|
||||
|
||||
@@ -63,8 +63,8 @@ where
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
|
||||
pub fn merge_and_send_docids<'extractor, MSP, D>(
|
||||
mut caches: Vec<BalancedCaches<'extractor>>,
|
||||
pub fn merge_and_send_docids<MSP, D>(
|
||||
mut caches: Vec<BalancedCaches<'_>>,
|
||||
database: Database<Bytes, Bytes>,
|
||||
index: &Index,
|
||||
docids_sender: WordDocidsSender<D>,
|
||||
@@ -91,8 +91,8 @@ where
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
|
||||
pub fn merge_and_send_facet_docids<'extractor>(
|
||||
mut caches: Vec<BalancedCaches<'extractor>>,
|
||||
pub fn merge_and_send_facet_docids(
|
||||
mut caches: Vec<BalancedCaches<'_>>,
|
||||
database: FacetDatabases,
|
||||
index: &Index,
|
||||
rtxn: &RoTxn,
|
||||
|
||||
@@ -14,7 +14,7 @@ use crate::constants::RESERVED_VECTORS_FIELD_NAME;
|
||||
use crate::documents::FieldIdMapper;
|
||||
use crate::vector::db::{EmbeddingStatus, IndexEmbeddingConfig};
|
||||
use crate::vector::parsed_vectors::{RawVectors, RawVectorsError, VectorOrArrayOfVectors};
|
||||
use crate::vector::{ArroyWrapper, Embedding, RuntimeEmbedders};
|
||||
use crate::vector::{Embedding, RuntimeEmbedders, VectorStore};
|
||||
use crate::{DocumentId, Index, InternalError, Result, UserError};
|
||||
|
||||
#[derive(Serialize)]
|
||||
@@ -120,8 +120,13 @@ impl<'t> VectorDocumentFromDb<'t> {
|
||||
config: &IndexEmbeddingConfig,
|
||||
status: &EmbeddingStatus,
|
||||
) -> Result<VectorEntry<'t>> {
|
||||
let reader =
|
||||
ArroyWrapper::new(self.index.vector_arroy, embedder_id, config.config.quantized());
|
||||
let index_version = self.index.get_version(self.rtxn)?.unwrap();
|
||||
let reader = VectorStore::new(
|
||||
index_version,
|
||||
self.index.vector_store,
|
||||
embedder_id,
|
||||
config.config.quantized(),
|
||||
);
|
||||
let vectors = reader.item_vectors(self.rtxn, self.docid)?;
|
||||
|
||||
Ok(VectorEntry {
|
||||
@@ -149,7 +154,7 @@ impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
|
||||
name,
|
||||
entry_from_raw_value(value, false).map_err(|_| {
|
||||
InternalError::Serialization(crate::SerializationError::Decoding {
|
||||
db_name: Some(crate::index::db_name::VECTOR_ARROY),
|
||||
db_name: Some(crate::index::db_name::VECTOR_STORE),
|
||||
})
|
||||
})?,
|
||||
))
|
||||
@@ -167,7 +172,7 @@ impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
|
||||
Some(embedding_from_doc) => {
|
||||
Some(entry_from_raw_value(embedding_from_doc, false).map_err(|_| {
|
||||
InternalError::Serialization(crate::SerializationError::Decoding {
|
||||
db_name: Some(crate::index::db_name::VECTOR_ARROY),
|
||||
db_name: Some(crate::index::db_name::VECTOR_STORE),
|
||||
})
|
||||
})?)
|
||||
}
|
||||
|
||||
@@ -3,16 +3,20 @@ mod v1_13;
|
||||
mod v1_14;
|
||||
mod v1_15;
|
||||
mod v1_16;
|
||||
mod v1_17;
|
||||
mod v1_18;
|
||||
|
||||
use heed::RwTxn;
|
||||
use v1_12::{V1_12_3_To_V1_13_0, V1_12_To_V1_12_3};
|
||||
use v1_13::{V1_13_0_To_V1_13_1, V1_13_1_To_Latest_V1_13};
|
||||
use v1_14::Latest_V1_13_To_Latest_V1_14;
|
||||
use v1_15::Latest_V1_14_To_Latest_V1_15;
|
||||
use v1_16::Latest_V1_16_To_V1_17_0;
|
||||
use v1_16::Latest_V1_15_To_V1_16_0;
|
||||
use v1_17::Latest_V1_16_To_V1_17_0;
|
||||
use v1_18::Latest_V1_17_To_V1_18_0;
|
||||
|
||||
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||
use crate::progress::{Progress, VariableNameStep};
|
||||
use crate::update::upgrade::v1_16::Latest_V1_15_To_V1_16_0;
|
||||
use crate::{Index, InternalError, Result};
|
||||
|
||||
trait UpgradeIndex {
|
||||
@@ -36,6 +40,7 @@ const UPGRADE_FUNCTIONS: &[&dyn UpgradeIndex] = &[
|
||||
&Latest_V1_14_To_Latest_V1_15 {},
|
||||
&Latest_V1_15_To_V1_16_0 {},
|
||||
&Latest_V1_16_To_V1_17_0 {},
|
||||
&Latest_V1_17_To_V1_18_0 {},
|
||||
// This is the last upgrade function, it will be called when the index is up to date.
|
||||
// any other upgrade function should be added before this one.
|
||||
&ToCurrentNoOp {},
|
||||
@@ -65,6 +70,7 @@ const fn start(from: (u32, u32, u32)) -> Option<usize> {
|
||||
(1, 15, _) => function_index!(6),
|
||||
(1, 16, _) => function_index!(7),
|
||||
(1, 17, _) => function_index!(8),
|
||||
(1, 18, _) => function_index!(9),
|
||||
// We deliberately don't add a placeholder with (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) here to force manually
|
||||
// considering dumpless upgrade.
|
||||
(_major, _minor, _patch) => return None,
|
||||
|
||||
@@ -27,9 +27,9 @@ impl UpgradeIndex for Latest_V1_13_To_Latest_V1_14 {
|
||||
let rtxn = index.read_txn()?;
|
||||
arroy::upgrade::from_0_5_to_0_6::<Cosine>(
|
||||
&rtxn,
|
||||
index.vector_arroy.remap_data_type(),
|
||||
index.vector_store.remap_types(),
|
||||
wtxn,
|
||||
index.vector_arroy.remap_data_type(),
|
||||
index.vector_store.remap_types(),
|
||||
)?;
|
||||
|
||||
Ok(false)
|
||||
|
||||
@@ -46,22 +46,3 @@ impl UpgradeIndex for Latest_V1_15_To_V1_16_0 {
|
||||
(1, 16, 0)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub(super) struct Latest_V1_16_To_V1_17_0();
|
||||
|
||||
impl UpgradeIndex for Latest_V1_16_To_V1_17_0 {
|
||||
fn upgrade(
|
||||
&self,
|
||||
_wtxn: &mut RwTxn,
|
||||
_index: &Index,
|
||||
_original: (u32, u32, u32),
|
||||
_progress: Progress,
|
||||
) -> Result<bool> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn target_version(&self) -> (u32, u32, u32) {
|
||||
(1, 17, 0)
|
||||
}
|
||||
}
|
||||
|
||||
24
crates/milli/src/update/upgrade/v1_17.rs
Normal file
24
crates/milli/src/update/upgrade/v1_17.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use heed::RwTxn;
|
||||
|
||||
use super::UpgradeIndex;
|
||||
use crate::progress::Progress;
|
||||
use crate::{Index, Result};
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub(super) struct Latest_V1_16_To_V1_17_0();
|
||||
|
||||
impl UpgradeIndex for Latest_V1_16_To_V1_17_0 {
|
||||
fn upgrade(
|
||||
&self,
|
||||
_wtxn: &mut RwTxn,
|
||||
_index: &Index,
|
||||
_original: (u32, u32, u32),
|
||||
_progress: Progress,
|
||||
) -> Result<bool> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn target_version(&self) -> (u32, u32, u32) {
|
||||
(1, 17, 0)
|
||||
}
|
||||
}
|
||||
36
crates/milli/src/update/upgrade/v1_18.rs
Normal file
36
crates/milli/src/update/upgrade/v1_18.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use heed::RwTxn;
|
||||
|
||||
use super::UpgradeIndex;
|
||||
use crate::progress::Progress;
|
||||
use crate::vector::VectorStore;
|
||||
use crate::{Index, Result};
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub(super) struct Latest_V1_17_To_V1_18_0();
|
||||
|
||||
impl UpgradeIndex for Latest_V1_17_To_V1_18_0 {
|
||||
fn upgrade(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
index: &Index,
|
||||
_original: (u32, u32, u32),
|
||||
progress: Progress,
|
||||
) -> Result<bool> {
|
||||
let embedding_configs = index.embedding_configs();
|
||||
let index_version = index.get_version(wtxn)?.unwrap();
|
||||
for config in embedding_configs.embedding_configs(wtxn)? {
|
||||
// TODO use the embedder name to display progress
|
||||
let quantized = config.config.quantized();
|
||||
let embedder_id = embedding_configs.embedder_id(wtxn, &config.name)?.unwrap();
|
||||
let vector_store =
|
||||
VectorStore::new(index_version, index.vector_store, embedder_id, quantized);
|
||||
vector_store.convert_from_arroy(wtxn, progress.clone())?;
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn target_version(&self) -> (u32, u32, u32) {
|
||||
(1, 18, 0)
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::time::Instant;
|
||||
|
||||
use arroy::Distance;
|
||||
use hannoy::Distance;
|
||||
|
||||
use super::error::CompositeEmbedderContainsHuggingFace;
|
||||
use super::{
|
||||
@@ -324,19 +324,18 @@ fn check_similarity(
|
||||
}
|
||||
|
||||
for (left, right) in left.into_iter().zip(right) {
|
||||
let left = arroy::internals::UnalignedVector::from_slice(&left);
|
||||
let right = arroy::internals::UnalignedVector::from_slice(&right);
|
||||
let left = arroy::internals::Leaf {
|
||||
header: arroy::distances::Cosine::new_header(&left),
|
||||
let left = hannoy::internals::UnalignedVector::from_slice(&left);
|
||||
let right = hannoy::internals::UnalignedVector::from_slice(&right);
|
||||
let left = hannoy::internals::Item {
|
||||
header: hannoy::distances::Cosine::new_header(&left),
|
||||
vector: left,
|
||||
};
|
||||
let right = arroy::internals::Leaf {
|
||||
header: arroy::distances::Cosine::new_header(&right),
|
||||
let right = hannoy::internals::Item {
|
||||
header: hannoy::distances::Cosine::new_header(&right),
|
||||
vector: right,
|
||||
};
|
||||
|
||||
let distance = arroy::distances::Cosine::built_distance(&left, &right);
|
||||
|
||||
let distance = hannoy::distances::Cosine::distance(&left, &right);
|
||||
if distance > super::MAX_COMPOSITE_DISTANCE {
|
||||
return Err(NewEmbedderError::composite_embedding_value_mismatch(distance, hint));
|
||||
}
|
||||
|
||||
@@ -3,11 +3,12 @@ use std::num::NonZeroUsize;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use arroy::distances::{BinaryQuantizedCosine, Cosine};
|
||||
use arroy::ItemId;
|
||||
use deserr::{DeserializeError, Deserr};
|
||||
use hannoy::distances::{Cosine, Hamming};
|
||||
use hannoy::ItemId;
|
||||
use heed::{RoTxn, RwTxn, Unspecified};
|
||||
use ordered_float::OrderedFloat;
|
||||
use rand::SeedableRng as _;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utoipa::ToSchema;
|
||||
@@ -41,31 +42,43 @@ pub type Embedding = Vec<f32>;
|
||||
pub const REQUEST_PARALLELISM: usize = 40;
|
||||
pub const MAX_COMPOSITE_DISTANCE: f32 = 0.01;
|
||||
|
||||
pub struct ArroyWrapper {
|
||||
quantized: bool,
|
||||
const HANNOY_EF_CONSTRUCTION: usize = 125;
|
||||
const HANNOY_M: usize = 16;
|
||||
const HANNOY_M0: usize = 32;
|
||||
|
||||
pub struct VectorStore {
|
||||
version: (u32, u32, u32),
|
||||
database: hannoy::Database<Unspecified>,
|
||||
embedder_index: u8,
|
||||
database: arroy::Database<Unspecified>,
|
||||
quantized: bool,
|
||||
}
|
||||
|
||||
impl ArroyWrapper {
|
||||
impl VectorStore {
|
||||
pub fn new(
|
||||
database: arroy::Database<Unspecified>,
|
||||
version: (u32, u32, u32),
|
||||
database: hannoy::Database<Unspecified>,
|
||||
embedder_index: u8,
|
||||
quantized: bool,
|
||||
) -> Self {
|
||||
Self { database, embedder_index, quantized }
|
||||
Self { version, database, embedder_index, quantized }
|
||||
}
|
||||
|
||||
pub fn embedder_index(&self) -> u8 {
|
||||
self.embedder_index
|
||||
}
|
||||
|
||||
fn readers<'a, D: arroy::Distance>(
|
||||
/// Whether we must use the arroy to read the vector store.
|
||||
pub fn version_uses_arroy(&self) -> bool {
|
||||
let (major, minor, _patch) = self.version;
|
||||
major == 1 && minor < 18
|
||||
}
|
||||
|
||||
fn arroy_readers<'a, D: arroy::Distance>(
|
||||
&'a self,
|
||||
rtxn: &'a RoTxn<'a>,
|
||||
db: arroy::Database<D>,
|
||||
) -> impl Iterator<Item = Result<arroy::Reader<'a, D>, arroy::Error>> + 'a {
|
||||
arroy_store_range_for_embedder(self.embedder_index).filter_map(move |index| {
|
||||
vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| {
|
||||
match arroy::Reader::open(rtxn, index, db) {
|
||||
Ok(reader) => match reader.is_empty(rtxn) {
|
||||
Ok(false) => Some(Ok(reader)),
|
||||
@@ -78,6 +91,24 @@ impl ArroyWrapper {
|
||||
})
|
||||
}
|
||||
|
||||
fn readers<'a, D: hannoy::Distance>(
|
||||
&'a self,
|
||||
rtxn: &'a RoTxn<'a>,
|
||||
db: hannoy::Database<D>,
|
||||
) -> impl Iterator<Item = Result<hannoy::Reader<'a, D>, hannoy::Error>> + 'a {
|
||||
vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| {
|
||||
match hannoy::Reader::open(rtxn, index, db) {
|
||||
Ok(reader) => match reader.is_empty(rtxn) {
|
||||
Ok(false) => Some(Ok(reader)),
|
||||
Ok(true) => None,
|
||||
Err(e) => Some(Err(e)),
|
||||
},
|
||||
Err(hannoy::Error::MissingMetadata(_)) => None,
|
||||
Err(e) => Some(Err(e)),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// The item ids that are present in the store specified by its id.
|
||||
///
|
||||
/// The ids are accessed via a lambda to avoid lifetime shenanigans.
|
||||
@@ -86,18 +117,27 @@ impl ArroyWrapper {
|
||||
rtxn: &RoTxn,
|
||||
store_id: u8,
|
||||
with_items: F,
|
||||
) -> Result<O, arroy::Error>
|
||||
) -> crate::Result<O>
|
||||
where
|
||||
F: FnOnce(&RoaringBitmap) -> O,
|
||||
{
|
||||
if self.quantized {
|
||||
if self.version_uses_arroy() {
|
||||
if self.quantized {
|
||||
self._arroy_items_in_store(rtxn, self.arroy_quantized_db(), store_id, with_items)
|
||||
.map_err(Into::into)
|
||||
} else {
|
||||
self._arroy_items_in_store(rtxn, self.arroy_angular_db(), store_id, with_items)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
} else if self.quantized {
|
||||
self._items_in_store(rtxn, self.quantized_db(), store_id, with_items)
|
||||
.map_err(Into::into)
|
||||
} else {
|
||||
self._items_in_store(rtxn, self.angular_db(), store_id, with_items)
|
||||
self._items_in_store(rtxn, self.angular_db(), store_id, with_items).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
fn _items_in_store<D: arroy::Distance, F, O>(
|
||||
fn _arroy_items_in_store<D: arroy::Distance, F, O>(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
db: arroy::Database<D>,
|
||||
@@ -107,7 +147,7 @@ impl ArroyWrapper {
|
||||
where
|
||||
F: FnOnce(&RoaringBitmap) -> O,
|
||||
{
|
||||
let index = arroy_store_for_embedder(self.embedder_index, store_id);
|
||||
let index = vector_store_for_embedder(self.embedder_index, store_id);
|
||||
let reader = arroy::Reader::open(rtxn, index, db);
|
||||
match reader {
|
||||
Ok(reader) => Ok(with_items(reader.item_ids())),
|
||||
@@ -116,8 +156,41 @@ impl ArroyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<Option<usize>, arroy::Error> {
|
||||
if self.quantized {
|
||||
fn _items_in_store<D: hannoy::Distance, F, O>(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
db: hannoy::Database<D>,
|
||||
store_id: u8,
|
||||
with_items: F,
|
||||
) -> Result<O, hannoy::Error>
|
||||
where
|
||||
F: FnOnce(&RoaringBitmap) -> O,
|
||||
{
|
||||
let index = vector_store_for_embedder(self.embedder_index, store_id);
|
||||
let reader = hannoy::Reader::open(rtxn, index, db);
|
||||
match reader {
|
||||
Ok(reader) => Ok(with_items(reader.item_ids())),
|
||||
Err(hannoy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dimensions(&self, rtxn: &RoTxn) -> crate::Result<Option<usize>> {
|
||||
if self.version_uses_arroy() {
|
||||
if self.quantized {
|
||||
Ok(self
|
||||
.arroy_readers(rtxn, self.arroy_quantized_db())
|
||||
.next()
|
||||
.transpose()?
|
||||
.map(|reader| reader.dimensions()))
|
||||
} else {
|
||||
Ok(self
|
||||
.arroy_readers(rtxn, self.arroy_angular_db())
|
||||
.next()
|
||||
.transpose()?
|
||||
.map(|reader| reader.dimensions()))
|
||||
}
|
||||
} else if self.quantized {
|
||||
Ok(self
|
||||
.readers(rtxn, self.quantized_db())
|
||||
.next()
|
||||
@@ -132,47 +205,92 @@ impl ArroyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn convert_from_arroy(&self, wtxn: &mut RwTxn, progress: Progress) -> crate::Result<()> {
|
||||
if self.quantized {
|
||||
let dimensions = self
|
||||
.arroy_readers(wtxn, self.arroy_quantized_db())
|
||||
.next()
|
||||
.transpose()?
|
||||
.map(|reader| reader.dimensions());
|
||||
|
||||
let Some(dimensions) = dimensions else { return Ok(()) };
|
||||
|
||||
for index in vector_store_range_for_embedder(self.embedder_index) {
|
||||
let mut rng = rand::rngs::StdRng::from_entropy();
|
||||
let writer = hannoy::Writer::new(self.quantized_db(), index, dimensions);
|
||||
let mut builder = writer.builder(&mut rng).progress(progress.clone());
|
||||
builder.prepare_arroy_conversion(wtxn)?;
|
||||
builder.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
let dimensions = self
|
||||
.arroy_readers(wtxn, self.arroy_angular_db())
|
||||
.next()
|
||||
.transpose()?
|
||||
.map(|reader| reader.dimensions());
|
||||
|
||||
let Some(dimensions) = dimensions else { return Ok(()) };
|
||||
|
||||
for index in vector_store_range_for_embedder(self.embedder_index) {
|
||||
let mut rng = rand::rngs::StdRng::from_entropy();
|
||||
let writer = hannoy::Writer::new(self.angular_db(), index, dimensions);
|
||||
let mut builder = writer.builder(&mut rng).progress(progress.clone());
|
||||
builder.prepare_arroy_conversion(wtxn)?;
|
||||
builder.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng>(
|
||||
&mut self,
|
||||
wtxn: &mut RwTxn,
|
||||
progress: &Progress,
|
||||
progress: Progress,
|
||||
rng: &mut R,
|
||||
dimension: usize,
|
||||
quantizing: bool,
|
||||
arroy_memory: Option<usize>,
|
||||
hannoy_memory: Option<usize>,
|
||||
cancel: &(impl Fn() -> bool + Sync + Send),
|
||||
) -> Result<(), arroy::Error> {
|
||||
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||
) -> Result<(), hannoy::Error> {
|
||||
for index in vector_store_range_for_embedder(self.embedder_index) {
|
||||
if self.quantized {
|
||||
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||
let writer = hannoy::Writer::new(self.quantized_db(), index, dimension);
|
||||
if writer.need_build(wtxn)? {
|
||||
writer.builder(rng).build(wtxn)?
|
||||
let mut builder = writer.builder(rng).progress(progress.clone());
|
||||
builder
|
||||
.available_memory(hannoy_memory.unwrap_or(usize::MAX))
|
||||
.cancel(cancel)
|
||||
.ef_construction(HANNOY_EF_CONSTRUCTION)
|
||||
.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
|
||||
} else if writer.is_empty(wtxn)? {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||
let writer = hannoy::Writer::new(self.angular_db(), index, dimension);
|
||||
// If we are quantizing the databases, we can't know from meilisearch
|
||||
// if the db was empty but still contained the wrong metadata, thus we need
|
||||
// to quantize everything and can't stop early. Since this operation can
|
||||
// only happens once in the life of an embedder, it's not very performances
|
||||
// sensitive.
|
||||
if quantizing && !self.quantized {
|
||||
let writer = writer.prepare_changing_distance::<BinaryQuantizedCosine>(wtxn)?;
|
||||
writer
|
||||
.builder(rng)
|
||||
.available_memory(arroy_memory.unwrap_or(usize::MAX))
|
||||
.progress(|step| progress.update_progress_from_arroy(step))
|
||||
let writer = writer.prepare_changing_distance::<Hamming>(wtxn)?;
|
||||
let mut builder = writer.builder(rng).progress(progress.clone());
|
||||
builder
|
||||
.available_memory(hannoy_memory.unwrap_or(usize::MAX))
|
||||
.cancel(cancel)
|
||||
.build(wtxn)?;
|
||||
.ef_construction(HANNOY_EF_CONSTRUCTION)
|
||||
.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
|
||||
} else if writer.need_build(wtxn)? {
|
||||
writer
|
||||
.builder(rng)
|
||||
.available_memory(arroy_memory.unwrap_or(usize::MAX))
|
||||
.progress(|step| progress.update_progress_from_arroy(step))
|
||||
let mut builder = writer.builder(rng).progress(progress.clone());
|
||||
builder
|
||||
.available_memory(hannoy_memory.unwrap_or(usize::MAX))
|
||||
.cancel(cancel)
|
||||
.build(wtxn)?;
|
||||
.ef_construction(HANNOY_EF_CONSTRUCTION)
|
||||
.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
|
||||
} else if writer.is_empty(wtxn)? {
|
||||
continue;
|
||||
}
|
||||
@@ -188,18 +306,18 @@ impl ArroyWrapper {
|
||||
pub fn add_items(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
item_id: arroy::ItemId,
|
||||
item_id: hannoy::ItemId,
|
||||
embeddings: &Embeddings<f32>,
|
||||
) -> Result<(), arroy::Error> {
|
||||
) -> Result<(), hannoy::Error> {
|
||||
let dimension = embeddings.dimension();
|
||||
for (index, vector) in
|
||||
arroy_store_range_for_embedder(self.embedder_index).zip(embeddings.iter())
|
||||
vector_store_range_for_embedder(self.embedder_index).zip(embeddings.iter())
|
||||
{
|
||||
if self.quantized {
|
||||
arroy::Writer::new(self.quantized_db(), index, dimension)
|
||||
hannoy::Writer::new(self.quantized_db(), index, dimension)
|
||||
.add_item(wtxn, item_id, vector)?
|
||||
} else {
|
||||
arroy::Writer::new(self.angular_db(), index, dimension)
|
||||
hannoy::Writer::new(self.angular_db(), index, dimension)
|
||||
.add_item(wtxn, item_id, vector)?
|
||||
}
|
||||
}
|
||||
@@ -210,9 +328,9 @@ impl ArroyWrapper {
|
||||
pub fn add_item(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
item_id: arroy::ItemId,
|
||||
item_id: hannoy::ItemId,
|
||||
vector: &[f32],
|
||||
) -> Result<(), arroy::Error> {
|
||||
) -> Result<(), hannoy::Error> {
|
||||
if self.quantized {
|
||||
self._add_item(wtxn, self.quantized_db(), item_id, vector)
|
||||
} else {
|
||||
@@ -220,17 +338,17 @@ impl ArroyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
fn _add_item<D: arroy::Distance>(
|
||||
fn _add_item<D: hannoy::Distance>(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
db: arroy::Database<D>,
|
||||
item_id: arroy::ItemId,
|
||||
db: hannoy::Database<D>,
|
||||
item_id: hannoy::ItemId,
|
||||
vector: &[f32],
|
||||
) -> Result<(), arroy::Error> {
|
||||
) -> Result<(), hannoy::Error> {
|
||||
let dimension = vector.len();
|
||||
|
||||
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||
let writer = arroy::Writer::new(db, index, dimension);
|
||||
for index in vector_store_range_for_embedder(self.embedder_index) {
|
||||
let writer = hannoy::Writer::new(db, index, dimension);
|
||||
if !writer.contains_item(wtxn, item_id)? {
|
||||
writer.add_item(wtxn, item_id, vector)?;
|
||||
break;
|
||||
@@ -245,10 +363,10 @@ impl ArroyWrapper {
|
||||
pub fn add_item_in_store(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
item_id: arroy::ItemId,
|
||||
item_id: hannoy::ItemId,
|
||||
store_id: u8,
|
||||
vector: &[f32],
|
||||
) -> Result<(), arroy::Error> {
|
||||
) -> Result<(), hannoy::Error> {
|
||||
if self.quantized {
|
||||
self._add_item_in_store(wtxn, self.quantized_db(), item_id, store_id, vector)
|
||||
} else {
|
||||
@@ -256,18 +374,18 @@ impl ArroyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
fn _add_item_in_store<D: arroy::Distance>(
|
||||
fn _add_item_in_store<D: hannoy::Distance>(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
db: arroy::Database<D>,
|
||||
item_id: arroy::ItemId,
|
||||
db: hannoy::Database<D>,
|
||||
item_id: hannoy::ItemId,
|
||||
store_id: u8,
|
||||
vector: &[f32],
|
||||
) -> Result<(), arroy::Error> {
|
||||
) -> Result<(), hannoy::Error> {
|
||||
let dimension = vector.len();
|
||||
|
||||
let index = arroy_store_for_embedder(self.embedder_index, store_id);
|
||||
let writer = arroy::Writer::new(db, index, dimension);
|
||||
let index = vector_store_for_embedder(self.embedder_index, store_id);
|
||||
let writer = hannoy::Writer::new(db, index, dimension);
|
||||
writer.add_item(wtxn, item_id, vector)
|
||||
}
|
||||
|
||||
@@ -276,14 +394,14 @@ impl ArroyWrapper {
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
dimension: usize,
|
||||
item_id: arroy::ItemId,
|
||||
) -> Result<(), arroy::Error> {
|
||||
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||
item_id: hannoy::ItemId,
|
||||
) -> Result<(), hannoy::Error> {
|
||||
for index in vector_store_range_for_embedder(self.embedder_index) {
|
||||
if self.quantized {
|
||||
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||
let writer = hannoy::Writer::new(self.quantized_db(), index, dimension);
|
||||
writer.del_item(wtxn, item_id)?;
|
||||
} else {
|
||||
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||
let writer = hannoy::Writer::new(self.angular_db(), index, dimension);
|
||||
writer.del_item(wtxn, item_id)?;
|
||||
}
|
||||
}
|
||||
@@ -301,10 +419,10 @@ impl ArroyWrapper {
|
||||
pub fn del_item_in_store(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
item_id: arroy::ItemId,
|
||||
item_id: hannoy::ItemId,
|
||||
store_id: u8,
|
||||
dimensions: usize,
|
||||
) -> Result<bool, arroy::Error> {
|
||||
) -> Result<bool, hannoy::Error> {
|
||||
if self.quantized {
|
||||
self._del_item_in_store(wtxn, self.quantized_db(), item_id, store_id, dimensions)
|
||||
} else {
|
||||
@@ -312,16 +430,16 @@ impl ArroyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
fn _del_item_in_store<D: arroy::Distance>(
|
||||
fn _del_item_in_store<D: hannoy::Distance>(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
db: arroy::Database<D>,
|
||||
item_id: arroy::ItemId,
|
||||
db: hannoy::Database<D>,
|
||||
item_id: hannoy::ItemId,
|
||||
store_id: u8,
|
||||
dimensions: usize,
|
||||
) -> Result<bool, arroy::Error> {
|
||||
let index = arroy_store_for_embedder(self.embedder_index, store_id);
|
||||
let writer = arroy::Writer::new(db, index, dimensions);
|
||||
) -> Result<bool, hannoy::Error> {
|
||||
let index = vector_store_for_embedder(self.embedder_index, store_id);
|
||||
let writer = hannoy::Writer::new(db, index, dimensions);
|
||||
writer.del_item(wtxn, item_id)
|
||||
}
|
||||
|
||||
@@ -335,7 +453,7 @@ impl ArroyWrapper {
|
||||
wtxn: &mut RwTxn,
|
||||
store_id: u8,
|
||||
dimensions: usize,
|
||||
) -> Result<(), arroy::Error> {
|
||||
) -> Result<(), hannoy::Error> {
|
||||
if self.quantized {
|
||||
self._clear_store(wtxn, self.quantized_db(), store_id, dimensions)
|
||||
} else {
|
||||
@@ -343,15 +461,15 @@ impl ArroyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
fn _clear_store<D: arroy::Distance>(
|
||||
fn _clear_store<D: hannoy::Distance>(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
db: arroy::Database<D>,
|
||||
db: hannoy::Database<D>,
|
||||
store_id: u8,
|
||||
dimensions: usize,
|
||||
) -> Result<(), arroy::Error> {
|
||||
let index = arroy_store_for_embedder(self.embedder_index, store_id);
|
||||
let writer = arroy::Writer::new(db, index, dimensions);
|
||||
) -> Result<(), hannoy::Error> {
|
||||
let index = vector_store_for_embedder(self.embedder_index, store_id);
|
||||
let writer = hannoy::Writer::new(db, index, dimensions);
|
||||
writer.clear(wtxn)
|
||||
}
|
||||
|
||||
@@ -359,9 +477,9 @@ impl ArroyWrapper {
|
||||
pub fn del_item(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
item_id: arroy::ItemId,
|
||||
item_id: hannoy::ItemId,
|
||||
vector: &[f32],
|
||||
) -> Result<bool, arroy::Error> {
|
||||
) -> Result<bool, hannoy::Error> {
|
||||
if self.quantized {
|
||||
self._del_item(wtxn, self.quantized_db(), item_id, vector)
|
||||
} else {
|
||||
@@ -369,37 +487,34 @@ impl ArroyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
fn _del_item<D: arroy::Distance>(
|
||||
fn _del_item<D: hannoy::Distance>(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
db: arroy::Database<D>,
|
||||
item_id: arroy::ItemId,
|
||||
db: hannoy::Database<D>,
|
||||
item_id: hannoy::ItemId,
|
||||
vector: &[f32],
|
||||
) -> Result<bool, arroy::Error> {
|
||||
) -> Result<bool, hannoy::Error> {
|
||||
let dimension = vector.len();
|
||||
|
||||
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||
let writer = arroy::Writer::new(db, index, dimension);
|
||||
let Some(candidate) = writer.item_vector(wtxn, item_id)? else {
|
||||
continue;
|
||||
};
|
||||
if candidate == vector {
|
||||
for index in vector_store_range_for_embedder(self.embedder_index) {
|
||||
let writer = hannoy::Writer::new(db, index, dimension);
|
||||
if writer.contains_item(wtxn, item_id)? {
|
||||
return writer.del_item(wtxn, item_id);
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> {
|
||||
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||
pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), hannoy::Error> {
|
||||
for index in vector_store_range_for_embedder(self.embedder_index) {
|
||||
if self.quantized {
|
||||
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||
let writer = hannoy::Writer::new(self.quantized_db(), index, dimension);
|
||||
if writer.is_empty(wtxn)? {
|
||||
continue;
|
||||
}
|
||||
writer.clear(wtxn)?;
|
||||
} else {
|
||||
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||
let writer = hannoy::Writer::new(self.angular_db(), index, dimension);
|
||||
if writer.is_empty(wtxn)? {
|
||||
continue;
|
||||
}
|
||||
@@ -413,17 +528,31 @@ impl ArroyWrapper {
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
dimension: usize,
|
||||
item: arroy::ItemId,
|
||||
) -> Result<bool, arroy::Error> {
|
||||
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||
let contains = if self.quantized {
|
||||
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||
item: hannoy::ItemId,
|
||||
) -> crate::Result<bool> {
|
||||
for index in vector_store_range_for_embedder(self.embedder_index) {
|
||||
let contains = if self.version_uses_arroy() {
|
||||
if self.quantized {
|
||||
let writer = arroy::Writer::new(self.arroy_quantized_db(), index, dimension);
|
||||
if writer.is_empty(rtxn)? {
|
||||
continue;
|
||||
}
|
||||
writer.contains_item(rtxn, item)?
|
||||
} else {
|
||||
let writer = arroy::Writer::new(self.arroy_angular_db(), index, dimension);
|
||||
if writer.is_empty(rtxn)? {
|
||||
continue;
|
||||
}
|
||||
writer.contains_item(rtxn, item)?
|
||||
}
|
||||
} else if self.quantized {
|
||||
let writer = hannoy::Writer::new(self.quantized_db(), index, dimension);
|
||||
if writer.is_empty(rtxn)? {
|
||||
continue;
|
||||
}
|
||||
writer.contains_item(rtxn, item)?
|
||||
} else {
|
||||
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||
let writer = hannoy::Writer::new(self.angular_db(), index, dimension);
|
||||
if writer.is_empty(rtxn)? {
|
||||
continue;
|
||||
}
|
||||
@@ -442,15 +571,23 @@ impl ArroyWrapper {
|
||||
item: ItemId,
|
||||
limit: usize,
|
||||
filter: Option<&RoaringBitmap>,
|
||||
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||
if self.quantized {
|
||||
self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter)
|
||||
) -> crate::Result<Vec<(ItemId, f32)>> {
|
||||
if self.version_uses_arroy() {
|
||||
if self.quantized {
|
||||
self._arroy_nns_by_item(rtxn, self.arroy_quantized_db(), item, limit, filter)
|
||||
.map_err(Into::into)
|
||||
} else {
|
||||
self._arroy_nns_by_item(rtxn, self.arroy_angular_db(), item, limit, filter)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
} else if self.quantized {
|
||||
self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter).map_err(Into::into)
|
||||
} else {
|
||||
self._nns_by_item(rtxn, self.angular_db(), item, limit, filter)
|
||||
self._nns_by_item(rtxn, self.angular_db(), item, limit, filter).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
fn _nns_by_item<D: arroy::Distance>(
|
||||
fn _arroy_nns_by_item<D: arroy::Distance>(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
db: arroy::Database<D>,
|
||||
@@ -460,7 +597,7 @@ impl ArroyWrapper {
|
||||
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||
let mut results = Vec::new();
|
||||
|
||||
for reader in self.readers(rtxn, db) {
|
||||
for reader in self.arroy_readers(rtxn, db) {
|
||||
let reader = reader?;
|
||||
let mut searcher = reader.nns(limit);
|
||||
if let Some(filter) = filter {
|
||||
@@ -478,21 +615,56 @@ impl ArroyWrapper {
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn _nns_by_item<D: hannoy::Distance>(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
db: hannoy::Database<D>,
|
||||
item: ItemId,
|
||||
limit: usize,
|
||||
filter: Option<&RoaringBitmap>,
|
||||
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> {
|
||||
let mut results = Vec::new();
|
||||
|
||||
for reader in self.readers(rtxn, db) {
|
||||
let reader = reader?;
|
||||
let mut searcher = reader.nns(limit);
|
||||
searcher.ef_search((limit * 10).max(100)); // TODO find better ef
|
||||
if let Some(filter) = filter {
|
||||
searcher.candidates(filter);
|
||||
}
|
||||
|
||||
if let Some(mut ret) = searcher.by_item(rtxn, item)? {
|
||||
results.append(&mut ret);
|
||||
}
|
||||
}
|
||||
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub fn nns_by_vector(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
vector: &[f32],
|
||||
limit: usize,
|
||||
filter: Option<&RoaringBitmap>,
|
||||
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||
if self.quantized {
|
||||
) -> crate::Result<Vec<(ItemId, f32)>> {
|
||||
if self.version_uses_arroy() {
|
||||
if self.quantized {
|
||||
self._arroy_nns_by_vector(rtxn, self.arroy_quantized_db(), vector, limit, filter)
|
||||
.map_err(Into::into)
|
||||
} else {
|
||||
self._arroy_nns_by_vector(rtxn, self.arroy_angular_db(), vector, limit, filter)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
} else if self.quantized {
|
||||
self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter)
|
||||
.map_err(Into::into)
|
||||
} else {
|
||||
self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter)
|
||||
self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
fn _nns_by_vector<D: arroy::Distance>(
|
||||
fn _arroy_nns_by_vector<D: arroy::Distance>(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
db: arroy::Database<D>,
|
||||
@@ -502,7 +674,7 @@ impl ArroyWrapper {
|
||||
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||
let mut results = Vec::new();
|
||||
|
||||
for reader in self.readers(rtxn, db) {
|
||||
for reader in self.arroy_readers(rtxn, db) {
|
||||
let reader = reader?;
|
||||
let mut searcher = reader.nns(limit);
|
||||
if let Some(filter) = filter {
|
||||
@@ -520,10 +692,50 @@ impl ArroyWrapper {
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result<Vec<Vec<f32>>, arroy::Error> {
|
||||
fn _nns_by_vector<D: hannoy::Distance>(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
db: hannoy::Database<D>,
|
||||
vector: &[f32],
|
||||
limit: usize,
|
||||
filter: Option<&RoaringBitmap>,
|
||||
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> {
|
||||
let mut results = Vec::new();
|
||||
|
||||
for reader in self.readers(rtxn, db) {
|
||||
let reader = reader?;
|
||||
let mut searcher = reader.nns(limit);
|
||||
searcher.ef_search((limit * 10).max(100)); // TODO find better ef
|
||||
if let Some(filter) = filter {
|
||||
searcher.candidates(filter);
|
||||
}
|
||||
|
||||
results.append(&mut searcher.by_vector(rtxn, vector)?);
|
||||
}
|
||||
|
||||
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result<Vec<Vec<f32>>> {
|
||||
let mut vectors = Vec::new();
|
||||
|
||||
if self.quantized {
|
||||
if self.version_uses_arroy() {
|
||||
if self.quantized {
|
||||
for reader in self.arroy_readers(rtxn, self.arroy_quantized_db()) {
|
||||
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
|
||||
vectors.push(vec);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for reader in self.arroy_readers(rtxn, self.arroy_angular_db()) {
|
||||
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
|
||||
vectors.push(vec);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if self.quantized {
|
||||
for reader in self.readers(rtxn, self.quantized_db()) {
|
||||
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
|
||||
vectors.push(vec);
|
||||
@@ -536,22 +748,31 @@ impl ArroyWrapper {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(vectors)
|
||||
}
|
||||
|
||||
fn angular_db(&self) -> arroy::Database<Cosine> {
|
||||
fn arroy_angular_db(&self) -> arroy::Database<arroy::distances::Cosine> {
|
||||
self.database.remap_types()
|
||||
}
|
||||
|
||||
fn arroy_quantized_db(&self) -> arroy::Database<arroy::distances::BinaryQuantizedCosine> {
|
||||
self.database.remap_types()
|
||||
}
|
||||
|
||||
fn angular_db(&self) -> hannoy::Database<Cosine> {
|
||||
self.database.remap_data_type()
|
||||
}
|
||||
|
||||
fn quantized_db(&self) -> arroy::Database<BinaryQuantizedCosine> {
|
||||
fn quantized_db(&self) -> hannoy::Database<Hamming> {
|
||||
self.database.remap_data_type()
|
||||
}
|
||||
|
||||
pub fn aggregate_stats(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
stats: &mut ArroyStats,
|
||||
) -> Result<(), arroy::Error> {
|
||||
stats: &mut HannoyStats,
|
||||
) -> Result<(), hannoy::Error> {
|
||||
if self.quantized {
|
||||
for reader in self.readers(rtxn, self.quantized_db()) {
|
||||
let reader = reader?;
|
||||
@@ -573,10 +794,11 @@ impl ArroyWrapper {
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ArroyStats {
|
||||
pub struct HannoyStats {
|
||||
pub number_of_embeddings: u64,
|
||||
pub documents: RoaringBitmap,
|
||||
}
|
||||
|
||||
/// One or multiple embeddings stored consecutively in a flat vector.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct Embeddings<F> {
|
||||
@@ -1221,11 +1443,11 @@ pub const fn is_cuda_enabled() -> bool {
|
||||
cfg!(feature = "cuda")
|
||||
}
|
||||
|
||||
fn arroy_store_range_for_embedder(embedder_id: u8) -> impl Iterator<Item = u16> {
|
||||
(0..=u8::MAX).map(move |store_id| arroy_store_for_embedder(embedder_id, store_id))
|
||||
fn vector_store_range_for_embedder(embedder_id: u8) -> impl Iterator<Item = u16> {
|
||||
(0..=u8::MAX).map(move |store_id| vector_store_for_embedder(embedder_id, store_id))
|
||||
}
|
||||
|
||||
fn arroy_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 {
|
||||
fn vector_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 {
|
||||
let embedder_id = (embedder_id as u16) << 8;
|
||||
embedder_id | (store_id as u16)
|
||||
}
|
||||
|
||||
@@ -321,7 +321,14 @@ impl Embedder {
|
||||
pub fn prompt_count_in_chunk_hint(&self) -> usize {
|
||||
match self.data.request.input_type() {
|
||||
InputType::Text => 1,
|
||||
InputType::TextArray => 10,
|
||||
InputType::TextArray => {
|
||||
let chunk_size = std::env::var("MEILI_EMBEDDINGS_CHUNK_SIZE")
|
||||
.ok()
|
||||
.and_then(|chunk_size| chunk_size.parse().ok())
|
||||
.unwrap_or(10);
|
||||
assert!(chunk_size <= 100, "Embedding chunk size cannot exceed 100");
|
||||
chunk_size
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ fn fibo_recursive(n: u32) -> u32 {
|
||||
if n == 1 {
|
||||
return 2;
|
||||
}
|
||||
return fibo_recursive(n - 1) - fibo_recursive(n - 2);
|
||||
fibo_recursive(n - 1) - fibo_recursive(n - 2)
|
||||
}
|
||||
|
||||
use tracing_error::ExtractSpanTrace as _;
|
||||
|
||||
Reference in New Issue
Block a user