Compare commits

..

3 Commits

Author SHA1 Message Date
94b43001db Merge pull request #5492 from meilisearch/accept-cancelation-tasks-when-disk-full
make meilisearch accept cancelation tasks even when the disk is full
2025-04-03 15:46:46 +00:00
796a325972 Fix typos
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-04-03 15:53:42 +02:00
1db550ec7f make meilisearch accept cancelation tasks even when the disk is full 2025-04-03 15:47:56 +02:00
7 changed files with 67 additions and 69 deletions

View File

@ -625,8 +625,8 @@ impl IndexScheduler {
task_id: Option<TaskId>, task_id: Option<TaskId>,
dry_run: bool, dry_run: bool,
) -> Result<Task> { ) -> Result<Task> {
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task // if the task doesn't delete or cancel anything and 40% of the task queue is full, we must refuse to enqueue the incoming task
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } | KindWithContent::TaskCancelation { tasks, .. } if !tasks.is_empty())
&& (self.env.non_free_pages_size()? * 100) / self.env.info().map_size as u64 > 40 && (self.env.non_free_pages_size()? * 100) / self.env.info().map_size as u64 > 40
{ {
return Err(Error::NoSpaceLeftInTaskQueue); return Err(Error::NoSpaceLeftInTaskQueue);

View File

@ -292,8 +292,6 @@ impl Queue {
return Ok(task); return Ok(task);
} }
// Get rid of the mutability.
let task = task;
self.tasks.register(wtxn, &task)?; self.tasks.register(wtxn, &task)?;
Ok(task) Ok(task)

View File

@ -364,7 +364,7 @@ fn test_task_queue_is_full() {
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
// Even the task deletion that doesn't delete anything shouldn't be accepted // Even the task deletion and cancelation that don't delete anything should be refused
let result = index_scheduler let result = index_scheduler
.register( .register(
KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() }, KindWithContent::TaskDeletion { query: S("test"), tasks: RoaringBitmap::new() },
@ -373,10 +373,39 @@ fn test_task_queue_is_full() {
) )
.unwrap_err(); .unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations."); snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
let result = index_scheduler
.register(
KindWithContent::TaskCancelation { query: S("test"), tasks: RoaringBitmap::new() },
None,
false,
)
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
// we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code // we won't be able to test this error in an integration test thus as a best effort test I still ensure the error return the expected error code
snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice"); snapshot!(format!("{:?}", result.error_code()), @"NoSpaceLeftOnDevice");
// But a task deletion that delete something should works // But a task cancelation that cancel something should work
index_scheduler
.register(
KindWithContent::TaskCancelation { query: S("test"), tasks: (0..100).collect() },
None,
false,
)
.unwrap();
handle.advance_one_successful_batch();
// But we should still be forbidden from enqueuing new tasks
let result = index_scheduler
.register(
KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None },
None,
false,
)
.unwrap_err();
snapshot!(result, @"Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.");
// And a task deletion that delete something should works
index_scheduler index_scheduler
.register( .register(
KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() }, KindWithContent::TaskDeletion { query: S("test"), tasks: (0..100).collect() },

View File

@ -144,7 +144,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
let mut merger_iter = builder.build().into_stream_merger_iter()?; let mut merger_iter = builder.build().into_stream_merger_iter()?;
let mut current_field_id = None; let mut current_field_id = None;
let mut fst; let mut fst;
let mut fst_merger_builder: Option<FstMergerBuilder<_>> = None; let mut fst_merger_builder: Option<FstMergerBuilder> = None;
while let Some((key, deladd)) = merger_iter.next()? { while let Some((key, deladd)) = merger_iter.next()? {
let (field_id, normalized_facet_string) = let (field_id, normalized_facet_string) =
BEU16StrCodec::bytes_decode(key).map_err(heed::Error::Encoding)?; BEU16StrCodec::bytes_decode(key).map_err(heed::Error::Encoding)?;
@ -153,13 +153,12 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
if let (Some(current_field_id), Some(fst_merger_builder)) = if let (Some(current_field_id), Some(fst_merger_builder)) =
(current_field_id, fst_merger_builder) (current_field_id, fst_merger_builder)
{ {
if let Some(mmap) = fst_merger_builder.build(&mut callback)? { let mmap = fst_merger_builder.build(&mut callback)?;
index.facet_id_string_fst.remap_data_type::<Bytes>().put( index.facet_id_string_fst.remap_data_type::<Bytes>().put(
wtxn, wtxn,
&current_field_id, &current_field_id,
&mmap, &mmap,
)?; )?;
}
} }
fst = index.facet_id_string_fst.get(rtxn, &field_id)?; fst = index.facet_id_string_fst.get(rtxn, &field_id)?;
@ -210,9 +209,8 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
} }
if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) { if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) {
if let Some(mmap) = fst_merger_builder.build(&mut callback)? { let mmap = fst_merger_builder.build(&mut callback)?;
index.facet_id_string_fst.remap_data_type::<Bytes>().put(wtxn, &field_id, &mmap)?; index.facet_id_string_fst.remap_data_type::<Bytes>().put(wtxn, &field_id, &mmap)?;
}
} }
Ok(()) Ok(())

View File

@ -1,27 +1,25 @@
use std::fs::File; use std::fs::File;
use std::io::BufWriter; use std::io::BufWriter;
use fst::{IntoStreamer, Set, SetBuilder, Streamer}; use fst::{Set, SetBuilder, Streamer};
use memmap2::Mmap; use memmap2::Mmap;
use tempfile::tempfile; use tempfile::tempfile;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::{InternalError, Result}; use crate::{InternalError, Result};
pub struct FstMergerBuilder<'a, D: AsRef<[u8]>> { pub struct FstMergerBuilder<'a> {
fst: Option<&'a Set<D>>,
stream: Option<fst::set::Stream<'a>>, stream: Option<fst::set::Stream<'a>>,
fst_builder: Option<SetBuilder<BufWriter<File>>>, fst_builder: SetBuilder<BufWriter<File>>,
last: Option<Vec<u8>>, last: Option<Vec<u8>>,
inserted_words: usize, inserted_words: usize,
} }
impl<'a, D: AsRef<[u8]>> FstMergerBuilder<'a, D> { impl<'a> FstMergerBuilder<'a> {
pub fn new(fst: Option<&'a Set<D>>) -> Result<Self> { pub fn new<D: AsRef<[u8]>>(fst: Option<&'a Set<D>>) -> Result<Self> {
Ok(Self { Ok(Self {
fst,
stream: fst.map(|fst| fst.stream()), stream: fst.map(|fst| fst.stream()),
fst_builder: None, fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?,
last: None, last: None,
inserted_words: 0, inserted_words: 0,
}) })
@ -112,17 +110,11 @@ impl<'a, D: AsRef<[u8]>> FstMergerBuilder<'a, D> {
is_modified: bool, is_modified: bool,
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
) -> Result<()> { ) -> Result<()> {
if is_modified && self.fst_builder.is_none() { // Addition: We insert the word
self.build_new_fst(bytes)?; // Deletion: We delete the word by not inserting it
} if deladd == DelAdd::Addition {
self.inserted_words += 1;
if let Some(fst_builder) = self.fst_builder.as_mut() { self.fst_builder.insert(bytes)?;
// Addition: We insert the word
// Deletion: We delete the word by not inserting it
if deladd == DelAdd::Addition {
self.inserted_words += 1;
fst_builder.insert(bytes)?;
}
} }
insertion_callback(bytes, deladd, is_modified)?; insertion_callback(bytes, deladd, is_modified)?;
@ -130,19 +122,6 @@ impl<'a, D: AsRef<[u8]>> FstMergerBuilder<'a, D> {
Ok(()) Ok(())
} }
// Lazily build the new fst
fn build_new_fst(&mut self, bytes: &[u8]) -> Result<()> {
let mut fst_builder = SetBuilder::new(BufWriter::new(tempfile()?))?;
if let Some(fst) = self.fst {
fst_builder.extend_stream(fst.range().lt(bytes).into_stream())?;
}
self.fst_builder = Some(fst_builder);
Ok(())
}
fn drain_stream( fn drain_stream(
&mut self, &mut self,
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
@ -163,20 +142,16 @@ impl<'a, D: AsRef<[u8]>> FstMergerBuilder<'a, D> {
pub fn build( pub fn build(
mut self, mut self,
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>, insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
) -> Result<Option<Mmap>> { ) -> Result<Mmap> {
self.drain_stream(insertion_callback)?; self.drain_stream(insertion_callback)?;
match self.fst_builder { let fst_file = self
Some(fst_builder) => { .fst_builder
let fst_file = fst_builder .into_inner()?
.into_inner()? .into_inner()
.into_inner() .map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?;
.map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?; let fst_mmap = unsafe { Mmap::map(&fst_file)? };
let fst_mmap = unsafe { Mmap::map(&fst_file)? };
Ok(Some(fst_mmap)) Ok(fst_mmap)
}
None => Ok(None),
}
} }
} }

View File

@ -118,9 +118,7 @@ fn compute_word_fst(
} }
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?; let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
if let Some(word_fst_mmap) = word_fst_mmap { index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
}
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
index.main.remap_types::<Str, Bytes>().put( index.main.remap_types::<Str, Bytes>().put(
wtxn, wtxn,

View File

@ -10,14 +10,14 @@ use crate::index::PrefixSettings;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::{InternalError, Prefix, Result}; use crate::{InternalError, Prefix, Result};
pub struct WordFstBuilder<'a, D: AsRef<[u8]>> { pub struct WordFstBuilder<'a> {
word_fst_builder: FstMergerBuilder<'a, D>, word_fst_builder: FstMergerBuilder<'a>,
prefix_fst_builder: Option<PrefixFstBuilder>, prefix_fst_builder: Option<PrefixFstBuilder>,
registered_words: usize, registered_words: usize,
} }
impl<'a, D: AsRef<[u8]>> WordFstBuilder<'a, D> { impl<'a> WordFstBuilder<'a> {
pub fn new(words_fst: &'a Set<D>) -> Result<Self> { pub fn new(words_fst: &'a Set<std::borrow::Cow<'a, [u8]>>) -> Result<Self> {
Ok(Self { Ok(Self {
word_fst_builder: FstMergerBuilder::new(Some(words_fst))?, word_fst_builder: FstMergerBuilder::new(Some(words_fst))?,
prefix_fst_builder: None, prefix_fst_builder: None,
@ -50,7 +50,7 @@ impl<'a, D: AsRef<[u8]>> WordFstBuilder<'a, D> {
mut self, mut self,
index: &crate::Index, index: &crate::Index,
rtxn: &heed::RoTxn, rtxn: &heed::RoTxn,
) -> Result<(Option<Mmap>, Option<PrefixData>)> { ) -> Result<(Mmap, Option<PrefixData>)> {
let words_fst_mmap = self.word_fst_builder.build(&mut |bytes, deladd, is_modified| { let words_fst_mmap = self.word_fst_builder.build(&mut |bytes, deladd, is_modified| {
if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder { if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder {
prefix_fst_builder.insert_word(bytes, deladd, is_modified) prefix_fst_builder.insert_word(bytes, deladd, is_modified)