Fix some tests but not all of them

This commit is contained in:
Clément Renault
2024-11-18 17:39:55 +01:00
parent 670aff5553
commit aba8a0e9e0
20 changed files with 1211 additions and 881 deletions

View File

@ -1680,19 +1680,23 @@ pub(crate) mod tests {
use std::ops::Deref;
use big_s::S;
use bumpalo::Bump;
use heed::{EnvOpenOptions, RwTxn};
use maplit::{btreemap, hashset};
use memmap2::Mmap;
use tempfile::TempDir;
use crate::documents::DocumentsBatchReader;
use crate::error::{Error, InternalError};
use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS};
use crate::update::new::indexer;
use crate::update::{
self, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting,
Settings,
self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting, Settings,
};
use crate::vector::settings::{EmbedderSource, EmbeddingSettings};
use crate::{db_snap, obkv_to_json, Filter, Index, Search, SearchResult};
use crate::vector::EmbeddingConfigs;
use crate::{
db_snap, obkv_to_json, Filter, Index, Search, SearchResult, ThreadPoolNoAbortBuilder,
};
pub(crate) struct TempIndex {
pub inner: Index,
@ -1725,35 +1729,60 @@ pub(crate) mod tests {
pub fn new() -> Self {
Self::new_with_map_size(4096 * 2000)
}
pub fn add_documents_using_wtxn<'t, R>(
pub fn add_documents_using_wtxn<'t>(
&'t self,
wtxn: &mut RwTxn<'t>,
documents: DocumentsBatchReader<R>,
) -> Result<(), crate::error::Error>
where
R: std::io::Read + std::io::Seek,
{
let builder = IndexDocuments::new(
wtxn,
self,
&self.indexer_config,
self.index_documents_config.clone(),
|_| (),
|| false,
)
.unwrap();
let (builder, user_error) = builder.add_documents(documents).unwrap();
user_error?;
builder.execute()?;
documents: Mmap,
) -> Result<(), crate::error::Error> {
let local_pool;
let indexer_config = &self.indexer_config;
let pool = match &indexer_config.thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let rtxn = self.inner.read_txn()?;
let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?;
let mut new_fields_ids_map = db_fields_ids_map.clone();
let embedders = EmbeddingConfigs::default();
let mut indexer =
indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
indexer.add_documents(&documents).unwrap();
let indexer_alloc = Bump::new();
let (document_changes, _operation_stats, primary_key) = indexer.into_changes(
&indexer_alloc,
&self.inner,
&rtxn,
None,
&mut new_fields_ids_map,
)?;
pool.install(|| {
indexer::index(
wtxn,
&self.inner,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
primary_key,
&document_changes,
embedders,
&|| false,
&|_| (),
)
})
.unwrap()?;
Ok(())
}
pub fn add_documents<R>(
&self,
documents: DocumentsBatchReader<R>,
) -> Result<(), crate::error::Error>
where
R: std::io::Read + std::io::Seek,
{
pub fn add_documents(&self, documents: Mmap) -> Result<(), crate::error::Error> {
let mut wtxn = self.write_txn().unwrap();
self.add_documents_using_wtxn(&mut wtxn, documents)?;
wtxn.commit().unwrap();
@ -1769,6 +1798,7 @@ pub(crate) mod tests {
wtxn.commit().unwrap();
Ok(())
}
pub fn update_settings_using_wtxn<'t>(
&'t self,
wtxn: &mut RwTxn<'t>,
@ -1784,19 +1814,54 @@ pub(crate) mod tests {
&'t self,
wtxn: &mut RwTxn<'t>,
external_document_ids: Vec<String>,
) {
let builder = IndexDocuments::new(
wtxn,
self,
&self.indexer_config,
self.index_documents_config.clone(),
|_| (),
|| false,
)
.unwrap();
let (builder, user_error) = builder.remove_documents(external_document_ids).unwrap();
user_error.unwrap();
builder.execute().unwrap();
) -> Result<(), crate::error::Error> {
let local_pool;
let indexer_config = &self.indexer_config;
let pool = match &indexer_config.thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let rtxn = self.inner.read_txn()?;
let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?;
let mut new_fields_ids_map = db_fields_ids_map.clone();
let embedders = EmbeddingConfigs::default();
let mut indexer =
indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
let external_document_ids: Vec<_> =
external_document_ids.iter().map(AsRef::as_ref).collect();
indexer.delete_documents(external_document_ids.as_slice());
let indexer_alloc = Bump::new();
let (document_changes, _operation_stats, primary_key) = indexer.into_changes(
&indexer_alloc,
&self.inner,
&rtxn,
None,
&mut new_fields_ids_map,
)?;
pool.install(|| {
indexer::index(
wtxn,
&self.inner,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
primary_key,
&document_changes,
embedders,
&|| false,
&|_| (),
)
})
.unwrap()?;
Ok(())
}
pub fn delete_documents(&self, external_document_ids: Vec<String>) {
@ -1819,29 +1884,55 @@ pub(crate) mod tests {
let index = TempIndex::new();
let mut wtxn = index.inner.write_txn().unwrap();
let should_abort = AtomicBool::new(false);
let builder = IndexDocuments::new(
&mut wtxn,
&index.inner,
&index.indexer_config,
index.index_documents_config.clone(),
|_| (),
|| should_abort.load(Relaxed),
)
.unwrap();
let (builder, user_error) = builder
.add_documents(documents!([
{ "id": 1, "name": "kevin" },
{ "id": 2, "name": "bob", "age": 20 },
{ "id": 2, "name": "bob", "age": 20 },
]))
let local_pool;
let indexer_config = &index.indexer_config;
let pool = match &indexer_config.thread_pool {
Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let rtxn = index.inner.read_txn().unwrap();
let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap();
let mut new_fields_ids_map = db_fields_ids_map.clone();
let embedders = EmbeddingConfigs::default();
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
let payload = documents!([
{ "id": 1, "name": "kevin" },
{ "id": 2, "name": "bob", "age": 20 },
{ "id": 2, "name": "bob", "age": 20 },
]);
indexer.add_documents(&payload);
let indexer_alloc = Bump::new();
let (document_changes, _operation_stats, primary_key) = indexer
.into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map)
.unwrap();
user_error.unwrap();
should_abort.store(true, Relaxed);
let err = builder.execute().unwrap_err();
let err = pool
.install(|| {
indexer::index(
&mut wtxn,
&index.inner,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
primary_key,
&document_changes,
embedders,
&|| should_abort.load(Relaxed),
&|_| (),
)
})
.unwrap()
.unwrap_err();
assert!(matches!(err, Error::InternalError(InternalError::AbortedIndexation)));
}