mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 09:56:28 +00:00 
			
		
		
		
	Revert "Stream documents"
This commit is contained in:
		
							
								
								
									
										38
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										38
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@@ -378,9 +378,9 @@ dependencies = [
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "arroy"
 | 
					name = "arroy"
 | 
				
			||||||
version = "0.3.1"
 | 
					version = "0.2.0"
 | 
				
			||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
					source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
				
			||||||
checksum = "73897699bf04bac935c0b120990d2a511e91e563e0f9769f9c8bb983d98dfbc9"
 | 
					checksum = "efddeb1e7c32a551cc07ef4c3e181e3cd5478fdaf4f0bd799983171c1f6efe57"
 | 
				
			||||||
dependencies = [
 | 
					dependencies = [
 | 
				
			||||||
 "bytemuck",
 | 
					 "bytemuck",
 | 
				
			||||||
 "byteorder",
 | 
					 "byteorder",
 | 
				
			||||||
@@ -1536,9 +1536,9 @@ checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "doxygen-rs"
 | 
					name = "doxygen-rs"
 | 
				
			||||||
version = "0.4.2"
 | 
					version = "0.2.2"
 | 
				
			||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
					source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
				
			||||||
checksum = "415b6ec780d34dcf624666747194393603d0373b7141eef01d12ee58881507d9"
 | 
					checksum = "bff670ea0c9bbb8414e7efa6e23ebde2b8f520a7eef78273a3918cf1903e7505"
 | 
				
			||||||
dependencies = [
 | 
					dependencies = [
 | 
				
			||||||
 "phf",
 | 
					 "phf",
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
@@ -2262,11 +2262,12 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "heed"
 | 
					name = "heed"
 | 
				
			||||||
version = "0.20.1"
 | 
					version = "0.20.0-alpha.9"
 | 
				
			||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
					source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
				
			||||||
checksum = "6f7acb9683d7c7068aa46d47557bfa4e35a277964b350d9504a87b03610163fd"
 | 
					checksum = "9648a50991c86df7d00c56c268c27754fcf4c80be2ba57fc4a00dc928c6fe934"
 | 
				
			||||||
dependencies = [
 | 
					dependencies = [
 | 
				
			||||||
 "bitflags 2.5.0",
 | 
					 "bitflags 2.5.0",
 | 
				
			||||||
 | 
					 "bytemuck",
 | 
				
			||||||
 "byteorder",
 | 
					 "byteorder",
 | 
				
			||||||
 "heed-traits",
 | 
					 "heed-traits",
 | 
				
			||||||
 "heed-types",
 | 
					 "heed-types",
 | 
				
			||||||
@@ -2280,15 +2281,15 @@ dependencies = [
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "heed-traits"
 | 
					name = "heed-traits"
 | 
				
			||||||
version = "0.20.0"
 | 
					version = "0.20.0-alpha.9"
 | 
				
			||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
					source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
				
			||||||
checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff"
 | 
					checksum = "5ab0b7d9cde969ad36dde692e487dc89d97f7168bf6a7bd3b894ad4bf7278298"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "heed-types"
 | 
					name = "heed-types"
 | 
				
			||||||
version = "0.20.0"
 | 
					version = "0.20.0-alpha.9"
 | 
				
			||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
					source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
				
			||||||
checksum = "3cb0d6ba3700c9a57e83c013693e3eddb68a6d9b6781cacafc62a0d992e8ddb3"
 | 
					checksum = "f0cb3567a7363f28b597bf6e9897b9466397951dd0e52df2c8196dd8a71af44a"
 | 
				
			||||||
dependencies = [
 | 
					dependencies = [
 | 
				
			||||||
 "bincode",
 | 
					 "bincode",
 | 
				
			||||||
 "byteorder",
 | 
					 "byteorder",
 | 
				
			||||||
@@ -3188,13 +3189,14 @@ checksum = "f9d642685b028806386b2b6e75685faadd3eb65a85fff7df711ce18446a422da"
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "lmdb-master-sys"
 | 
					name = "lmdb-master-sys"
 | 
				
			||||||
version = "0.2.0"
 | 
					version = "0.1.0"
 | 
				
			||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
					source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
				
			||||||
checksum = "dc9048db3a58c0732d7236abc4909058f9d2708cfb6d7d047eb895fddec6419a"
 | 
					checksum = "629c123f5321b48fa4f8f4d3b868165b748d9ba79c7103fb58e3a94f736bcedd"
 | 
				
			||||||
dependencies = [
 | 
					dependencies = [
 | 
				
			||||||
 "cc",
 | 
					 "cc",
 | 
				
			||||||
 "doxygen-rs",
 | 
					 "doxygen-rs",
 | 
				
			||||||
 "libc",
 | 
					 "libc",
 | 
				
			||||||
 | 
					 "pkg-config",
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
@@ -3346,7 +3348,6 @@ dependencies = [
 | 
				
			|||||||
 "rayon",
 | 
					 "rayon",
 | 
				
			||||||
 "regex",
 | 
					 "regex",
 | 
				
			||||||
 "reqwest",
 | 
					 "reqwest",
 | 
				
			||||||
 "roaring",
 | 
					 | 
				
			||||||
 "rustls 0.21.12",
 | 
					 "rustls 0.21.12",
 | 
				
			||||||
 "rustls-pemfile",
 | 
					 "rustls-pemfile",
 | 
				
			||||||
 "segment",
 | 
					 "segment",
 | 
				
			||||||
@@ -4415,6 +4416,12 @@ dependencies = [
 | 
				
			|||||||
 "winreg",
 | 
					 "winreg",
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[[package]]
 | 
				
			||||||
 | 
					name = "retain_mut"
 | 
				
			||||||
 | 
					version = "0.1.7"
 | 
				
			||||||
 | 
					source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
				
			||||||
 | 
					checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "ring"
 | 
					name = "ring"
 | 
				
			||||||
version = "0.17.8"
 | 
					version = "0.17.8"
 | 
				
			||||||
@@ -4432,12 +4439,13 @@ dependencies = [
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "roaring"
 | 
					name = "roaring"
 | 
				
			||||||
version = "0.10.3"
 | 
					version = "0.10.2"
 | 
				
			||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
					source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
				
			||||||
checksum = "a1c77081a55300e016cb86f2864415b7518741879db925b8d488a0ee0d2da6bf"
 | 
					checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873"
 | 
				
			||||||
dependencies = [
 | 
					dependencies = [
 | 
				
			||||||
 "bytemuck",
 | 
					 "bytemuck",
 | 
				
			||||||
 "byteorder",
 | 
					 "byteorder",
 | 
				
			||||||
 | 
					 "retain_mut",
 | 
				
			||||||
 "serde",
 | 
					 "serde",
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -785,12 +785,10 @@ impl IndexScheduler {
 | 
				
			|||||||
                let dst = temp_snapshot_dir.path().join("auth");
 | 
					                let dst = temp_snapshot_dir.path().join("auth");
 | 
				
			||||||
                fs::create_dir_all(&dst)?;
 | 
					                fs::create_dir_all(&dst)?;
 | 
				
			||||||
                // TODO We can't use the open_auth_store_env function here but we should
 | 
					                // TODO We can't use the open_auth_store_env function here but we should
 | 
				
			||||||
                let auth = unsafe {
 | 
					                let auth = milli::heed::EnvOpenOptions::new()
 | 
				
			||||||
                    milli::heed::EnvOpenOptions::new()
 | 
					                    .map_size(1024 * 1024 * 1024) // 1 GiB
 | 
				
			||||||
                        .map_size(1024 * 1024 * 1024) // 1 GiB
 | 
					                    .max_dbs(2)
 | 
				
			||||||
                        .max_dbs(2)
 | 
					                    .open(&self.auth_path)?;
 | 
				
			||||||
                        .open(&self.auth_path)
 | 
					 | 
				
			||||||
                }?;
 | 
					 | 
				
			||||||
                auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
 | 
					                auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                // 5. Copy and tarball the flat snapshot
 | 
					                // 5. Copy and tarball the flat snapshot
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -453,12 +453,10 @@ impl IndexScheduler {
 | 
				
			|||||||
            )
 | 
					            )
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let env = unsafe {
 | 
					        let env = heed::EnvOpenOptions::new()
 | 
				
			||||||
            heed::EnvOpenOptions::new()
 | 
					            .max_dbs(11)
 | 
				
			||||||
                .max_dbs(11)
 | 
					            .map_size(budget.task_db_size)
 | 
				
			||||||
                .map_size(budget.task_db_size)
 | 
					            .open(options.tasks_path)?;
 | 
				
			||||||
                .open(options.tasks_path)
 | 
					 | 
				
			||||||
        }?;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let features = features::FeatureData::new(&env, options.instance_features)?;
 | 
					        let features = features::FeatureData::new(&env, options.instance_features)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -587,9 +585,9 @@ impl IndexScheduler {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool {
 | 
					    fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool {
 | 
				
			||||||
        if let Ok(env) = unsafe {
 | 
					        if let Ok(env) =
 | 
				
			||||||
            heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path)
 | 
					            heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path)
 | 
				
			||||||
        } {
 | 
					        {
 | 
				
			||||||
            env.prepare_for_closing().wait();
 | 
					            env.prepare_for_closing().wait();
 | 
				
			||||||
            true
 | 
					            true
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -49,7 +49,7 @@ pub fn open_auth_store_env(path: &Path) -> milli::heed::Result<milli::heed::Env>
 | 
				
			|||||||
    let mut options = EnvOpenOptions::new();
 | 
					    let mut options = EnvOpenOptions::new();
 | 
				
			||||||
    options.map_size(AUTH_STORE_SIZE); // 1GB
 | 
					    options.map_size(AUTH_STORE_SIZE); // 1GB
 | 
				
			||||||
    options.max_dbs(2);
 | 
					    options.max_dbs(2);
 | 
				
			||||||
    unsafe { options.open(path) }
 | 
					    options.open(path)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl HeedAuthStore {
 | 
					impl HeedAuthStore {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -423,6 +423,7 @@ impl ErrorCode for HeedError {
 | 
				
			|||||||
            HeedError::Mdb(_)
 | 
					            HeedError::Mdb(_)
 | 
				
			||||||
            | HeedError::Encoding(_)
 | 
					            | HeedError::Encoding(_)
 | 
				
			||||||
            | HeedError::Decoding(_)
 | 
					            | HeedError::Decoding(_)
 | 
				
			||||||
 | 
					            | HeedError::InvalidDatabaseTyping
 | 
				
			||||||
            | HeedError::DatabaseClosing
 | 
					            | HeedError::DatabaseClosing
 | 
				
			||||||
            | HeedError::BadOpenOptions { .. } => Code::Internal,
 | 
					            | HeedError::BadOpenOptions { .. } => Code::Internal,
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -108,7 +108,6 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] }
 | 
				
			|||||||
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
 | 
					tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
 | 
				
			||||||
tracing-actix-web = "0.7.9"
 | 
					tracing-actix-web = "0.7.9"
 | 
				
			||||||
build-info = { version = "1.7.0", path = "../build-info" }
 | 
					build-info = { version = "1.7.0", path = "../build-info" }
 | 
				
			||||||
roaring = "0.10.3"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
[dev-dependencies]
 | 
					[dev-dependencies]
 | 
				
			||||||
actix-rt = "2.9.0"
 | 
					actix-rt = "2.9.0"
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,14 +1,12 @@
 | 
				
			|||||||
use std::io::{ErrorKind, Write};
 | 
					use std::io::ErrorKind;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use actix_web::http::header::CONTENT_TYPE;
 | 
					use actix_web::http::header::CONTENT_TYPE;
 | 
				
			||||||
use actix_web::web::Data;
 | 
					use actix_web::web::Data;
 | 
				
			||||||
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
 | 
					use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
 | 
				
			||||||
use bstr::ByteSlice as _;
 | 
					use bstr::ByteSlice as _;
 | 
				
			||||||
use bytes::Bytes;
 | 
					 | 
				
			||||||
use deserr::actix_web::{AwebJson, AwebQueryParameter};
 | 
					use deserr::actix_web::{AwebJson, AwebQueryParameter};
 | 
				
			||||||
use deserr::Deserr;
 | 
					use deserr::Deserr;
 | 
				
			||||||
use futures::StreamExt;
 | 
					use futures::StreamExt;
 | 
				
			||||||
use futures_util::Stream;
 | 
					 | 
				
			||||||
use index_scheduler::{IndexScheduler, TaskId};
 | 
					use index_scheduler::{IndexScheduler, TaskId};
 | 
				
			||||||
use meilisearch_types::deserr::query_params::Param;
 | 
					use meilisearch_types::deserr::query_params::Param;
 | 
				
			||||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
 | 
					use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
 | 
				
			||||||
@@ -24,9 +22,7 @@ use meilisearch_types::tasks::KindWithContent;
 | 
				
			|||||||
use meilisearch_types::{milli, Document, Index};
 | 
					use meilisearch_types::{milli, Document, Index};
 | 
				
			||||||
use mime::Mime;
 | 
					use mime::Mime;
 | 
				
			||||||
use once_cell::sync::Lazy;
 | 
					use once_cell::sync::Lazy;
 | 
				
			||||||
use roaring::RoaringBitmap;
 | 
					use serde::Deserialize;
 | 
				
			||||||
use serde::ser::SerializeSeq;
 | 
					 | 
				
			||||||
use serde::{Deserialize, Serialize};
 | 
					 | 
				
			||||||
use serde_json::Value;
 | 
					use serde_json::Value;
 | 
				
			||||||
use tempfile::tempfile;
 | 
					use tempfile::tempfile;
 | 
				
			||||||
use tokio::fs::File;
 | 
					use tokio::fs::File;
 | 
				
			||||||
@@ -234,34 +230,6 @@ pub async fn get_documents(
 | 
				
			|||||||
    documents_by_query(&index_scheduler, index_uid, query)
 | 
					    documents_by_query(&index_scheduler, index_uid, query)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct Writer2Streamer {
 | 
					 | 
				
			||||||
    sender: tokio::sync::mpsc::Sender<Result<Bytes, anyhow::Error>>,
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
impl Write for Writer2Streamer {
 | 
					 | 
				
			||||||
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
 | 
					 | 
				
			||||||
        self.sender.blocking_send(Ok(buf.to_vec().into())).map_err(std::io::Error::other)?;
 | 
					 | 
				
			||||||
        Ok(buf.len())
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    fn flush(&mut self) -> std::io::Result<()> {
 | 
					 | 
				
			||||||
        Ok(())
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
pub fn stream(
 | 
					 | 
				
			||||||
    data: impl Serialize + Send + 'static,
 | 
					 | 
				
			||||||
) -> impl Stream<Item = Result<Bytes, anyhow::Error>> {
 | 
					 | 
				
			||||||
    let (sender, receiver) = tokio::sync::mpsc::channel::<Result<Bytes, anyhow::Error>>(1);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    tokio::task::spawn_blocking(move || {
 | 
					 | 
				
			||||||
        serde_json::to_writer(std::io::BufWriter::new(Writer2Streamer { sender }), &data)
 | 
					 | 
				
			||||||
    });
 | 
					 | 
				
			||||||
    futures_util::stream::unfold(receiver, |mut receiver| async {
 | 
					 | 
				
			||||||
        receiver.recv().await.map(|value| (value, receiver))
 | 
					 | 
				
			||||||
    })
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
fn documents_by_query(
 | 
					fn documents_by_query(
 | 
				
			||||||
    index_scheduler: &IndexScheduler,
 | 
					    index_scheduler: &IndexScheduler,
 | 
				
			||||||
    index_uid: web::Path<String>,
 | 
					    index_uid: web::Path<String>,
 | 
				
			||||||
@@ -271,13 +239,12 @@ fn documents_by_query(
 | 
				
			|||||||
    let BrowseQuery { offset, limit, fields, filter } = query;
 | 
					    let BrowseQuery { offset, limit, fields, filter } = query;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let index = index_scheduler.index(&index_uid)?;
 | 
					    let index = index_scheduler.index(&index_uid)?;
 | 
				
			||||||
    let documents = retrieve_documents(index, offset, limit, filter, fields)?;
 | 
					    let (total, documents) = retrieve_documents(&index, offset, limit, filter, fields)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let ret = PaginationView::new(offset, limit, documents.total_documents as usize, documents);
 | 
					    let ret = PaginationView::new(offset, limit, total as usize, documents);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    debug!(returns = ?ret, "Get documents");
 | 
					    debug!(returns = ?ret, "Get documents");
 | 
				
			||||||
 | 
					    Ok(HttpResponse::Ok().json(ret))
 | 
				
			||||||
    Ok(HttpResponse::Ok().streaming(stream(ret)))
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Deserialize, Debug, Deserr)]
 | 
					#[derive(Deserialize, Debug, Deserr)]
 | 
				
			||||||
@@ -623,47 +590,14 @@ fn some_documents<'a, 't: 'a>(
 | 
				
			|||||||
    }))
 | 
					    }))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct DocumentsStreamer {
 | 
					fn retrieve_documents<S: AsRef<str>>(
 | 
				
			||||||
    attributes_to_retrieve: Option<Vec<String>>,
 | 
					    index: &Index,
 | 
				
			||||||
    documents: RoaringBitmap,
 | 
					 | 
				
			||||||
    rtxn: RoTxn<'static>,
 | 
					 | 
				
			||||||
    index: Index,
 | 
					 | 
				
			||||||
    pub total_documents: u64,
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
impl Serialize for DocumentsStreamer {
 | 
					 | 
				
			||||||
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 | 
					 | 
				
			||||||
    where
 | 
					 | 
				
			||||||
        S: serde::Serializer,
 | 
					 | 
				
			||||||
    {
 | 
					 | 
				
			||||||
        let mut seq = serializer.serialize_seq(Some(self.documents.len() as usize)).unwrap();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        let documents = some_documents(&self.index, &self.rtxn, self.documents.iter()).unwrap();
 | 
					 | 
				
			||||||
        for document in documents {
 | 
					 | 
				
			||||||
            let document = document.unwrap();
 | 
					 | 
				
			||||||
            let document = match self.attributes_to_retrieve {
 | 
					 | 
				
			||||||
                Some(ref attributes_to_retrieve) => permissive_json_pointer::select_values(
 | 
					 | 
				
			||||||
                    &document,
 | 
					 | 
				
			||||||
                    attributes_to_retrieve.iter().map(|s| s.as_ref()),
 | 
					 | 
				
			||||||
                ),
 | 
					 | 
				
			||||||
                None => document,
 | 
					 | 
				
			||||||
            };
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            seq.serialize_element(&document)?;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        seq.end()
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
fn retrieve_documents(
 | 
					 | 
				
			||||||
    index: Index,
 | 
					 | 
				
			||||||
    offset: usize,
 | 
					    offset: usize,
 | 
				
			||||||
    limit: usize,
 | 
					    limit: usize,
 | 
				
			||||||
    filter: Option<Value>,
 | 
					    filter: Option<Value>,
 | 
				
			||||||
    attributes_to_retrieve: Option<Vec<String>>,
 | 
					    attributes_to_retrieve: Option<Vec<S>>,
 | 
				
			||||||
) -> Result<DocumentsStreamer, ResponseError> {
 | 
					) -> Result<(u64, Vec<Document>), ResponseError> {
 | 
				
			||||||
    let rtxn = index.static_read_txn()?;
 | 
					    let rtxn = index.read_txn()?;
 | 
				
			||||||
 | 
					 | 
				
			||||||
    let filter = &filter;
 | 
					    let filter = &filter;
 | 
				
			||||||
    let filter = if let Some(filter) = filter {
 | 
					    let filter = if let Some(filter) = filter {
 | 
				
			||||||
        parse_filter(filter)
 | 
					        parse_filter(filter)
 | 
				
			||||||
@@ -673,7 +607,7 @@ fn retrieve_documents(
 | 
				
			|||||||
    };
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let candidates = if let Some(filter) = filter {
 | 
					    let candidates = if let Some(filter) = filter {
 | 
				
			||||||
        filter.evaluate(&rtxn, &index).map_err(|err| match err {
 | 
					        filter.evaluate(&rtxn, index).map_err(|err| match err {
 | 
				
			||||||
            milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
 | 
					            milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
 | 
				
			||||||
                ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter)
 | 
					                ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter)
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
@@ -683,13 +617,27 @@ fn retrieve_documents(
 | 
				
			|||||||
        index.documents_ids(&rtxn)?
 | 
					        index.documents_ids(&rtxn)?
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Ok(DocumentsStreamer {
 | 
					    let (it, number_of_documents) = {
 | 
				
			||||||
        total_documents: candidates.len(),
 | 
					        let number_of_documents = candidates.len();
 | 
				
			||||||
        attributes_to_retrieve,
 | 
					        (
 | 
				
			||||||
        documents: candidates.into_iter().skip(offset).take(limit).collect(),
 | 
					            some_documents(index, &rtxn, candidates.into_iter().skip(offset).take(limit))?,
 | 
				
			||||||
        rtxn,
 | 
					            number_of_documents,
 | 
				
			||||||
        index,
 | 
					        )
 | 
				
			||||||
    })
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    let documents: Result<Vec<_>, ResponseError> = it
 | 
				
			||||||
 | 
					        .map(|document| {
 | 
				
			||||||
 | 
					            Ok(match &attributes_to_retrieve {
 | 
				
			||||||
 | 
					                Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
 | 
				
			||||||
 | 
					                    &document?,
 | 
				
			||||||
 | 
					                    attributes_to_retrieve.iter().map(|s| s.as_ref()),
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					                None => document?,
 | 
				
			||||||
 | 
					            })
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
 | 
					        .collect();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Ok((number_of_documents, documents?))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn retrieve_document<S: AsRef<str>>(
 | 
					fn retrieve_document<S: AsRef<str>>(
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,5 +1,4 @@
 | 
				
			|||||||
use std::collections::BTreeMap;
 | 
					use std::collections::BTreeMap;
 | 
				
			||||||
use std::fmt;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
use actix_web::web::Data;
 | 
					use actix_web::web::Data;
 | 
				
			||||||
use actix_web::{web, HttpRequest, HttpResponse};
 | 
					use actix_web::{web, HttpRequest, HttpResponse};
 | 
				
			||||||
@@ -125,31 +124,20 @@ pub struct Pagination {
 | 
				
			|||||||
    pub limit: usize,
 | 
					    pub limit: usize,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Clone, Serialize)]
 | 
					#[derive(Debug, Clone, Serialize)]
 | 
				
			||||||
pub struct PaginationView<T: Serialize> {
 | 
					pub struct PaginationView<T> {
 | 
				
			||||||
    pub results: T,
 | 
					    pub results: Vec<T>,
 | 
				
			||||||
    pub offset: usize,
 | 
					    pub offset: usize,
 | 
				
			||||||
    pub limit: usize,
 | 
					    pub limit: usize,
 | 
				
			||||||
    pub total: usize,
 | 
					    pub total: usize,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl<T: Serialize> fmt::Debug for PaginationView<T> {
 | 
					 | 
				
			||||||
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 | 
					 | 
				
			||||||
        f.debug_struct("PaginationView")
 | 
					 | 
				
			||||||
            .field("offset", &self.offset)
 | 
					 | 
				
			||||||
            .field("limit", &self.limit)
 | 
					 | 
				
			||||||
            .field("total", &self.total)
 | 
					 | 
				
			||||||
            .field("results", &"[...]")
 | 
					 | 
				
			||||||
            .finish()
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
impl Pagination {
 | 
					impl Pagination {
 | 
				
			||||||
    /// Given the full data to paginate, returns the selected section.
 | 
					    /// Given the full data to paginate, returns the selected section.
 | 
				
			||||||
    pub fn auto_paginate_sized<T>(
 | 
					    pub fn auto_paginate_sized<T>(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        content: impl IntoIterator<Item = T> + ExactSizeIterator,
 | 
					        content: impl IntoIterator<Item = T> + ExactSizeIterator,
 | 
				
			||||||
    ) -> PaginationView<Vec<T>>
 | 
					    ) -> PaginationView<T>
 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        T: Serialize,
 | 
					        T: Serialize,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
@@ -163,7 +151,7 @@ impl Pagination {
 | 
				
			|||||||
        self,
 | 
					        self,
 | 
				
			||||||
        total: usize,
 | 
					        total: usize,
 | 
				
			||||||
        content: impl IntoIterator<Item = T>,
 | 
					        content: impl IntoIterator<Item = T>,
 | 
				
			||||||
    ) -> PaginationView<Vec<T>>
 | 
					    ) -> PaginationView<T>
 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        T: Serialize,
 | 
					        T: Serialize,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
@@ -173,7 +161,7 @@ impl Pagination {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    /// Given the data already paginated + the total number of elements, it stores
 | 
					    /// Given the data already paginated + the total number of elements, it stores
 | 
				
			||||||
    /// everything in a [PaginationResult].
 | 
					    /// everything in a [PaginationResult].
 | 
				
			||||||
    pub fn format_with<T>(self, total: usize, results: Vec<T>) -> PaginationView<Vec<T>>
 | 
					    pub fn format_with<T>(self, total: usize, results: Vec<T>) -> PaginationView<T>
 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        T: Serialize,
 | 
					        T: Serialize,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
@@ -181,8 +169,8 @@ impl Pagination {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl<T: Serialize> PaginationView<T> {
 | 
					impl<T> PaginationView<T> {
 | 
				
			||||||
    pub fn new(offset: usize, limit: usize, total: usize, results: T) -> Self {
 | 
					    pub fn new(offset: usize, limit: usize, total: usize, results: Vec<T>) -> Self {
 | 
				
			||||||
        Self { offset, limit, results, total }
 | 
					        Self { offset, limit, results, total }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -80,7 +80,9 @@ fn main() -> anyhow::Result<()> {
 | 
				
			|||||||
/// Clears the task queue located at `db_path`.
 | 
					/// Clears the task queue located at `db_path`.
 | 
				
			||||||
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
 | 
					fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
 | 
				
			||||||
    let path = db_path.join("tasks");
 | 
					    let path = db_path.join("tasks");
 | 
				
			||||||
    let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&path) }
 | 
					    let env = EnvOpenOptions::new()
 | 
				
			||||||
 | 
					        .max_dbs(100)
 | 
				
			||||||
 | 
					        .open(&path)
 | 
				
			||||||
        .with_context(|| format!("While trying to open {:?}", path.display()))?;
 | 
					        .with_context(|| format!("While trying to open {:?}", path.display()))?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    eprintln!("Deleting tasks from the database...");
 | 
					    eprintln!("Deleting tasks from the database...");
 | 
				
			||||||
@@ -191,7 +193,9 @@ fn export_a_dump(
 | 
				
			|||||||
        FileStore::new(db_path.join("update_files")).context("While opening the FileStore")?;
 | 
					        FileStore::new(db_path.join("update_files")).context("While opening the FileStore")?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let index_scheduler_path = db_path.join("tasks");
 | 
					    let index_scheduler_path = db_path.join("tasks");
 | 
				
			||||||
    let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
 | 
					    let env = EnvOpenOptions::new()
 | 
				
			||||||
 | 
					        .max_dbs(100)
 | 
				
			||||||
 | 
					        .open(&index_scheduler_path)
 | 
				
			||||||
        .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
 | 
					        .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    eprintln!("Dumping the keys...");
 | 
					    eprintln!("Dumping the keys...");
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -30,7 +30,7 @@ grenad = { version = "0.4.6", default-features = false, features = [
 | 
				
			|||||||
    "rayon",
 | 
					    "rayon",
 | 
				
			||||||
    "tempfile",
 | 
					    "tempfile",
 | 
				
			||||||
] }
 | 
					] }
 | 
				
			||||||
heed = { version = "0.20.1", default-features = false, features = [
 | 
					heed = { version = "0.20.0-alpha.9", default-features = false, features = [
 | 
				
			||||||
    "serde-json",
 | 
					    "serde-json",
 | 
				
			||||||
    "serde-bincode",
 | 
					    "serde-bincode",
 | 
				
			||||||
    "read-txn-no-tls",
 | 
					    "read-txn-no-tls",
 | 
				
			||||||
@@ -82,7 +82,7 @@ hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls",
 | 
				
			|||||||
] }
 | 
					] }
 | 
				
			||||||
tiktoken-rs = "0.5.8"
 | 
					tiktoken-rs = "0.5.8"
 | 
				
			||||||
liquid = "0.26.4"
 | 
					liquid = "0.26.4"
 | 
				
			||||||
arroy = "0.3.1"
 | 
					arroy = "0.2.0"
 | 
				
			||||||
rand = "0.8.5"
 | 
					rand = "0.8.5"
 | 
				
			||||||
tracing = "0.1.40"
 | 
					tracing = "0.1.40"
 | 
				
			||||||
ureq = { version = "2.9.7", features = ["json"] }
 | 
					ureq = { version = "2.9.7", features = ["json"] }
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										3
									
								
								milli/fuzz/.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								milli/fuzz/.gitignore
									
									
									
									
										vendored
									
									
								
							@@ -1,3 +0,0 @@
 | 
				
			|||||||
target
 | 
					 | 
				
			||||||
corpus
 | 
					 | 
				
			||||||
artifacts
 | 
					 | 
				
			||||||
@@ -48,6 +48,8 @@ pub enum InternalError {
 | 
				
			|||||||
    GrenadInvalidFormatVersion,
 | 
					    GrenadInvalidFormatVersion,
 | 
				
			||||||
    #[error("Invalid merge while processing {process}")]
 | 
					    #[error("Invalid merge while processing {process}")]
 | 
				
			||||||
    IndexingMergingKeys { process: &'static str },
 | 
					    IndexingMergingKeys { process: &'static str },
 | 
				
			||||||
 | 
					    #[error("{}", HeedError::InvalidDatabaseTyping)]
 | 
				
			||||||
 | 
					    InvalidDatabaseTyping,
 | 
				
			||||||
    #[error(transparent)]
 | 
					    #[error(transparent)]
 | 
				
			||||||
    RayonThreadPool(#[from] ThreadPoolBuildError),
 | 
					    RayonThreadPool(#[from] ThreadPoolBuildError),
 | 
				
			||||||
    #[error(transparent)]
 | 
					    #[error(transparent)]
 | 
				
			||||||
@@ -427,6 +429,7 @@ impl From<HeedError> for Error {
 | 
				
			|||||||
            // TODO use the encoding
 | 
					            // TODO use the encoding
 | 
				
			||||||
            HeedError::Encoding(_) => InternalError(Serialization(Encoding { db_name: None })),
 | 
					            HeedError::Encoding(_) => InternalError(Serialization(Encoding { db_name: None })),
 | 
				
			||||||
            HeedError::Decoding(_) => InternalError(Serialization(Decoding { db_name: None })),
 | 
					            HeedError::Decoding(_) => InternalError(Serialization(Decoding { db_name: None })),
 | 
				
			||||||
 | 
					            HeedError::InvalidDatabaseTyping => InternalError(InvalidDatabaseTyping),
 | 
				
			||||||
            HeedError::DatabaseClosing => InternalError(DatabaseClosing),
 | 
					            HeedError::DatabaseClosing => InternalError(DatabaseClosing),
 | 
				
			||||||
            HeedError::BadOpenOptions { .. } => UserError(InvalidLmdbOpenOptions),
 | 
					            HeedError::BadOpenOptions { .. } => UserError(InvalidLmdbOpenOptions),
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -184,7 +184,7 @@ impl Index {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        options.max_dbs(25);
 | 
					        options.max_dbs(25);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let env = unsafe { options.open(path) }?;
 | 
					        let env = options.open(path)?;
 | 
				
			||||||
        let mut wtxn = env.write_txn()?;
 | 
					        let mut wtxn = env.write_txn()?;
 | 
				
			||||||
        let main = env.database_options().name(MAIN).create(&mut wtxn)?;
 | 
					        let main = env.database_options().name(MAIN).create(&mut wtxn)?;
 | 
				
			||||||
        let word_docids = env.create_database(&mut wtxn, Some(WORD_DOCIDS))?;
 | 
					        let word_docids = env.create_database(&mut wtxn, Some(WORD_DOCIDS))?;
 | 
				
			||||||
@@ -294,11 +294,6 @@ impl Index {
 | 
				
			|||||||
        self.env.read_txn()
 | 
					        self.env.read_txn()
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /// Create a static read transaction to be able to read the index without keeping a reference to it.
 | 
					 | 
				
			||||||
    pub fn static_read_txn(&self) -> heed::Result<RoTxn<'static>> {
 | 
					 | 
				
			||||||
        self.env.clone().static_read_txn()
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    /// Returns the canonicalized path where the heed `Env` of this `Index` lives.
 | 
					    /// Returns the canonicalized path where the heed `Env` of this `Index` lives.
 | 
				
			||||||
    pub fn path(&self) -> &Path {
 | 
					    pub fn path(&self) -> &Path {
 | 
				
			||||||
        self.env.path()
 | 
					        self.env.path()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -379,7 +379,7 @@ pub(crate) mod test_helpers {
 | 
				
			|||||||
            let mut options = heed::EnvOpenOptions::new();
 | 
					            let mut options = heed::EnvOpenOptions::new();
 | 
				
			||||||
            let options = options.map_size(4096 * 4 * 1000 * 100);
 | 
					            let options = options.map_size(4096 * 4 * 1000 * 100);
 | 
				
			||||||
            let tempdir = tempfile::TempDir::new().unwrap();
 | 
					            let tempdir = tempfile::TempDir::new().unwrap();
 | 
				
			||||||
            let env = unsafe { options.open(tempdir.path()) }.unwrap();
 | 
					            let env = options.open(tempdir.path()).unwrap();
 | 
				
			||||||
            let mut wtxn = env.write_txn().unwrap();
 | 
					            let mut wtxn = env.write_txn().unwrap();
 | 
				
			||||||
            let content = env.create_database(&mut wtxn, None).unwrap();
 | 
					            let content = env.create_database(&mut wtxn, None).unwrap();
 | 
				
			||||||
            wtxn.commit().unwrap();
 | 
					            wtxn.commit().unwrap();
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -556,7 +556,7 @@ where
 | 
				
			|||||||
                let writer_index = (embedder_index as u16) << 8;
 | 
					                let writer_index = (embedder_index as u16) << 8;
 | 
				
			||||||
                for k in 0..=u8::MAX {
 | 
					                for k in 0..=u8::MAX {
 | 
				
			||||||
                    let writer =
 | 
					                    let writer =
 | 
				
			||||||
                        arroy::Writer::new(vector_arroy, writer_index | (k as u16), dimension);
 | 
					                        arroy::Writer::new(vector_arroy, writer_index | (k as u16), dimension)?;
 | 
				
			||||||
                    if writer.is_empty(wtxn)? {
 | 
					                    if writer.is_empty(wtxn)? {
 | 
				
			||||||
                        break;
 | 
					                        break;
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -661,7 +661,7 @@ pub(crate) fn write_typed_chunk_into_index(
 | 
				
			|||||||
            )?;
 | 
					            )?;
 | 
				
			||||||
            let writer_index = (embedder_index as u16) << 8;
 | 
					            let writer_index = (embedder_index as u16) << 8;
 | 
				
			||||||
            // FIXME: allow customizing distance
 | 
					            // FIXME: allow customizing distance
 | 
				
			||||||
            let writers: Vec<_> = (0..=u8::MAX)
 | 
					            let writers: std::result::Result<Vec<_>, _> = (0..=u8::MAX)
 | 
				
			||||||
                .map(|k| {
 | 
					                .map(|k| {
 | 
				
			||||||
                    arroy::Writer::new(
 | 
					                    arroy::Writer::new(
 | 
				
			||||||
                        index.vector_arroy,
 | 
					                        index.vector_arroy,
 | 
				
			||||||
@@ -670,6 +670,7 @@ pub(crate) fn write_typed_chunk_into_index(
 | 
				
			|||||||
                    )
 | 
					                    )
 | 
				
			||||||
                })
 | 
					                })
 | 
				
			||||||
                .collect();
 | 
					                .collect();
 | 
				
			||||||
 | 
					            let writers = writers?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            // remove vectors for docids we want them removed
 | 
					            // remove vectors for docids we want them removed
 | 
				
			||||||
            let merger = remove_vectors_builder.build();
 | 
					            let merger = remove_vectors_builder.build();
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user