mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-21 13:51:05 +00:00
Compare commits
3 Commits
lazy-word-
...
v1.14.0-rc
Author | SHA1 | Date | |
---|---|---|---|
94b43001db | |||
796a325972 | |||
1db550ec7f |
@ -625,8 +625,8 @@ impl IndexScheduler {
|
|||||||
task_id: Option<TaskId>,
|
task_id: Option<TaskId>,
|
||||||
dry_run: bool,
|
dry_run: bool,
|
||||||
) -> Result<Task> {
|
) -> Result<Task> {
|
||||||
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
|
// if the task doesn't delete or cancel anything and 40% of the task queue is full, we must refuse to enqueue the incoming task
|
||||||
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
|
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } | KindWithContent::TaskCancelation { tasks, .. } if !tasks.is_empty())
|
||||||
&& (self.env.non_free_pages_size()? * 100) / self.env.info().map_size as u64 > 40
|
&& (self.env.non_free_pages_size()? * 100) / self.env.info().map_size as u64 > 40
|
||||||
{
|
{
|
||||||
return Err(Error::NoSpaceLeftInTaskQueue);
|
return Err(Error::NoSpaceLeftInTaskQueue);
|
||||||
|
@ -292,8 +292,6 @@ impl Queue {
|
|||||||
return Ok(task);
|
return Ok(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get rid of the mutability.
|
|
||||||
let task = task;
|
|
||||||
self.tasks.register(wtxn, &task)?;
|
self.tasks.register(wtxn, &task)?;
|
||||||
|
|
||||||
Ok(task)
|
Ok(task)
|
||||||
|
@ -364,7 +364,7 @@ fn test_task_queue_is_full() {
|
|||||||
// we won't be able to test this error in an integration test thus as a best effort test IÂ still ensure the error return the expected error code
|
// we won't be able to test this error in an integration test thus as a best effort test IÂ still ensure the error return the expected error code
|
||||||
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
|
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
|
||||||
|
|
||||||
// Even the task deletion that doesn't delete anything shouldn't be accepted
|
// Even the task deletion and cancelation that don't delete anything should be refused
|
||||||
let result = index_scheduler
|
let result = index_scheduler
|
||||||
.register(
|
.register(
|
||||||
KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() },
|
KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() },
|
||||||
@ -373,10 +373,39 @@ fn test_task_queue_is_full() {
|
|||||||
)
|
)
|
||||||
.unwrap_err();
|
.unwrap_err();
|
||||||
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
|
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
|
||||||
|
let result = index_scheduler
|
||||||
|
.register(
|
||||||
|
KindWithContent::TaskCancelation { query: S("test"), tasks: RoaringBitmap::new() },
|
||||||
|
None,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.unwrap_err();
|
||||||
|
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
|
||||||
|
|
||||||
// we won't be able to test this error in an integration test thus as a best effort test IÂ still ensure the error return the expected error code
|
// we won't be able to test this error in an integration test thus as a best effort test IÂ still ensure the error return the expected error code
|
||||||
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
|
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
|
||||||
|
|
||||||
// But a task deletion that delete something should works
|
// But a task cancelation that cancel something should work
|
||||||
|
index_scheduler
|
||||||
|
.register(
|
||||||
|
KindWithContent::TaskCancelation { query: S("test"), tasks: (0..100).collect() },
|
||||||
|
None,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
handle.advance_one_successful_batch();
|
||||||
|
|
||||||
|
// But we should still be forbidden from enqueuing new tasks
|
||||||
|
let result = index_scheduler
|
||||||
|
.register(
|
||||||
|
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
|
||||||
|
None,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.unwrap_err();
|
||||||
|
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
|
||||||
|
|
||||||
|
// And a task deletion that delete something should works
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(
|
.register(
|
||||||
KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() },
|
KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() },
|
||||||
|
@ -144,7 +144,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
let mut merger_iter = builder.build().into_stream_merger_iter()?;
|
let mut merger_iter = builder.build().into_stream_merger_iter()?;
|
||||||
let mut current_field_id = None;
|
let mut current_field_id = None;
|
||||||
let mut fst;
|
let mut fst;
|
||||||
let mut fst_merger_builder: Option<FstMergerBuilder<_>> = None;
|
let mut fst_merger_builder: Option<FstMergerBuilder> = None;
|
||||||
while let Some((key, deladd)) = merger_iter.next()? {
|
while let Some((key, deladd)) = merger_iter.next()? {
|
||||||
let (field_id, normalized_facet_string) =
|
let (field_id, normalized_facet_string) =
|
||||||
BEU16StrCodec::bytes_decode(key).map_err(heed::Error::Encoding)?;
|
BEU16StrCodec::bytes_decode(key).map_err(heed::Error::Encoding)?;
|
||||||
@ -153,14 +153,13 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
if let (Some(current_field_id), Some(fst_merger_builder)) =
|
if let (Some(current_field_id), Some(fst_merger_builder)) =
|
||||||
(current_field_id, fst_merger_builder)
|
(current_field_id, fst_merger_builder)
|
||||||
{
|
{
|
||||||
if let Some(mmap) = fst_merger_builder.build(&mut callback)? {
|
let mmap = fst_merger_builder.build(&mut callback)?;
|
||||||
index.facet_id_string_fst.remap_data_type::<Bytes>().put(
|
index.facet_id_string_fst.remap_data_type::<Bytes>().put(
|
||||||
wtxn,
|
wtxn,
|
||||||
¤t_field_id,
|
¤t_field_id,
|
||||||
&mmap,
|
&mmap,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
fst = index.facet_id_string_fst.get(rtxn, &field_id)?;
|
fst = index.facet_id_string_fst.get(rtxn, &field_id)?;
|
||||||
fst_merger_builder = Some(FstMergerBuilder::new(fst.as_ref())?);
|
fst_merger_builder = Some(FstMergerBuilder::new(fst.as_ref())?);
|
||||||
@ -210,10 +209,9 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) {
|
if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) {
|
||||||
if let Some(mmap) = fst_merger_builder.build(&mut callback)? {
|
let mmap = fst_merger_builder.build(&mut callback)?;
|
||||||
index.facet_id_string_fst.remap_data_type::<Bytes>().put(wtxn, &field_id, &mmap)?;
|
index.facet_id_string_fst.remap_data_type::<Bytes>().put(wtxn, &field_id, &mmap)?;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1,27 +1,25 @@
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::BufWriter;
|
use std::io::BufWriter;
|
||||||
|
|
||||||
use fst::{IntoStreamer, Set, SetBuilder, Streamer};
|
use fst::{Set, SetBuilder, Streamer};
|
||||||
use memmap2::Mmap;
|
use memmap2::Mmap;
|
||||||
use tempfile::tempfile;
|
use tempfile::tempfile;
|
||||||
|
|
||||||
use crate::update::del_add::DelAdd;
|
use crate::update::del_add::DelAdd;
|
||||||
use crate::{InternalError, Result};
|
use crate::{InternalError, Result};
|
||||||
|
|
||||||
pub struct FstMergerBuilder<'a, D: AsRef<[u8]>> {
|
pub struct FstMergerBuilder<'a> {
|
||||||
fst: Option<&'a Set<D>>,
|
|
||||||
stream: Option<fst::set::Stream<'a>>,
|
stream: Option<fst::set::Stream<'a>>,
|
||||||
fst_builder: Option<SetBuilder<BufWriter<File>>>,
|
fst_builder: SetBuilder<BufWriter<File>>,
|
||||||
last: Option<Vec<u8>>,
|
last: Option<Vec<u8>>,
|
||||||
inserted_words: usize,
|
inserted_words: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, D: AsRef<[u8]>> FstMergerBuilder<'a, D> {
|
impl<'a> FstMergerBuilder<'a> {
|
||||||
pub fn new(fst: Option<&'a Set<D>>) -> Result<Self> {
|
pub fn new<D: AsRef<[u8]>>(fst: Option<&'a Set<D>>) -> Result<Self> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
fst,
|
|
||||||
stream: fst.map(|fst| fst.stream()),
|
stream: fst.map(|fst| fst.stream()),
|
||||||
fst_builder: None,
|
fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?,
|
||||||
last: None,
|
last: None,
|
||||||
inserted_words: 0,
|
inserted_words: 0,
|
||||||
})
|
})
|
||||||
@ -112,17 +110,11 @@ impl<'a, D: AsRef<[u8]>> FstMergerBuilder<'a, D> {
|
|||||||
is_modified: bool,
|
is_modified: bool,
|
||||||
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if is_modified && self.fst_builder.is_none() {
|
|
||||||
self.build_new_fst(bytes)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(fst_builder) = self.fst_builder.as_mut() {
|
|
||||||
// Addition: We insert the word
|
// Addition: We insert the word
|
||||||
// Deletion: We delete the word by not inserting it
|
// Deletion: We delete the word by not inserting it
|
||||||
if deladd == DelAdd::Addition {
|
if deladd == DelAdd::Addition {
|
||||||
self.inserted_words += 1;
|
self.inserted_words += 1;
|
||||||
fst_builder.insert(bytes)?;
|
self.fst_builder.insert(bytes)?;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
insertion_callback(bytes, deladd, is_modified)?;
|
insertion_callback(bytes, deladd, is_modified)?;
|
||||||
@ -130,19 +122,6 @@ impl<'a, D: AsRef<[u8]>> FstMergerBuilder<'a, D> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lazily build the new fst
|
|
||||||
fn build_new_fst(&mut self, bytes: &[u8]) -> Result<()> {
|
|
||||||
let mut fst_builder = SetBuilder::new(BufWriter::new(tempfile()?))?;
|
|
||||||
|
|
||||||
if let Some(fst) = self.fst {
|
|
||||||
fst_builder.extend_stream(fst.range().lt(bytes).into_stream())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.fst_builder = Some(fst_builder);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn drain_stream(
|
fn drain_stream(
|
||||||
&mut self,
|
&mut self,
|
||||||
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
||||||
@ -163,20 +142,16 @@ impl<'a, D: AsRef<[u8]>> FstMergerBuilder<'a, D> {
|
|||||||
pub fn build(
|
pub fn build(
|
||||||
mut self,
|
mut self,
|
||||||
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
|
||||||
) -> Result<Option<Mmap>> {
|
) -> Result<Mmap> {
|
||||||
self.drain_stream(insertion_callback)?;
|
self.drain_stream(insertion_callback)?;
|
||||||
|
|
||||||
match self.fst_builder {
|
let fst_file = self
|
||||||
Some(fst_builder) => {
|
.fst_builder
|
||||||
let fst_file = fst_builder
|
|
||||||
.into_inner()?
|
.into_inner()?
|
||||||
.into_inner()
|
.into_inner()
|
||||||
.map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?;
|
.map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?;
|
||||||
let fst_mmap = unsafe { Mmap::map(&fst_file)? };
|
let fst_mmap = unsafe { Mmap::map(&fst_file)? };
|
||||||
|
|
||||||
Ok(Some(fst_mmap))
|
Ok(fst_mmap)
|
||||||
}
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -118,9 +118,7 @@ fn compute_word_fst(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
|
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
|
||||||
if let Some(word_fst_mmap) = word_fst_mmap {
|
|
||||||
index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
|
index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
|
||||||
}
|
|
||||||
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
|
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
|
||||||
index.main.remap_types::<Str, Bytes>().put(
|
index.main.remap_types::<Str, Bytes>().put(
|
||||||
wtxn,
|
wtxn,
|
||||||
|
@ -10,14 +10,14 @@ use crate::index::PrefixSettings;
|
|||||||
use crate::update::del_add::DelAdd;
|
use crate::update::del_add::DelAdd;
|
||||||
use crate::{InternalError, Prefix, Result};
|
use crate::{InternalError, Prefix, Result};
|
||||||
|
|
||||||
pub struct WordFstBuilder<'a, D: AsRef<[u8]>> {
|
pub struct WordFstBuilder<'a> {
|
||||||
word_fst_builder: FstMergerBuilder<'a, D>,
|
word_fst_builder: FstMergerBuilder<'a>,
|
||||||
prefix_fst_builder: Option<PrefixFstBuilder>,
|
prefix_fst_builder: Option<PrefixFstBuilder>,
|
||||||
registered_words: usize,
|
registered_words: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, D: AsRef<[u8]>> WordFstBuilder<'a, D> {
|
impl<'a> WordFstBuilder<'a> {
|
||||||
pub fn new(words_fst: &'a Set<D>) -> Result<Self> {
|
pub fn new(words_fst: &'a Set<std::borrow::Cow<'a, [u8]>>) -> Result<Self> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
word_fst_builder: FstMergerBuilder::new(Some(words_fst))?,
|
word_fst_builder: FstMergerBuilder::new(Some(words_fst))?,
|
||||||
prefix_fst_builder: None,
|
prefix_fst_builder: None,
|
||||||
@ -50,7 +50,7 @@ impl<'a, D: AsRef<[u8]>> WordFstBuilder<'a, D> {
|
|||||||
mut self,
|
mut self,
|
||||||
index: &crate::Index,
|
index: &crate::Index,
|
||||||
rtxn: &heed::RoTxn,
|
rtxn: &heed::RoTxn,
|
||||||
) -> Result<(Option<Mmap>, Option<PrefixData>)> {
|
) -> Result<(Mmap, Option<PrefixData>)> {
|
||||||
let words_fst_mmap = self.word_fst_builder.build(&mut |bytes, deladd, is_modified| {
|
let words_fst_mmap = self.word_fst_builder.build(&mut |bytes, deladd, is_modified| {
|
||||||
if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder {
|
if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder {
|
||||||
prefix_fst_builder.insert_word(bytes, deladd, is_modified)
|
prefix_fst_builder.insert_word(bytes, deladd, is_modified)
|
||||||
|
Reference in New Issue
Block a user