mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-26 16:21:07 +00:00
fixes a lot of small issue, the test about the cancellation is still failing
This commit is contained in:
@ -24,7 +24,6 @@ use std::fs::{self, File};
|
||||
use std::io::BufWriter;
|
||||
|
||||
use dump::IndexMetadata;
|
||||
use meilisearch_types::batches::BatchId;
|
||||
use meilisearch_types::error::Code;
|
||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||
@ -45,7 +44,7 @@ use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::autobatcher::{self, BatchKind};
|
||||
use crate::utils::{self, swap_index_uid_in_task};
|
||||
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
|
||||
use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId};
|
||||
|
||||
/// Represents a combination of tasks that can all be processed at the same time.
|
||||
@ -280,22 +279,24 @@ impl IndexScheduler {
|
||||
rtxn: &RoTxn,
|
||||
index_uid: String,
|
||||
batch: BatchKind,
|
||||
batch_id: BatchId,
|
||||
current_batch: &mut ProcessingBatch,
|
||||
must_create_index: bool,
|
||||
) -> Result<Option<Batch>> {
|
||||
match batch {
|
||||
BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::DocumentClear {
|
||||
tasks: self.get_existing_tasks_with_batch_id(rtxn, batch_id, ids)?,
|
||||
tasks: self.get_existing_tasks_with_processing_batch(
|
||||
rtxn,
|
||||
current_batch,
|
||||
ids,
|
||||
)?,
|
||||
index_uid,
|
||||
},
|
||||
must_create_index,
|
||||
})),
|
||||
BatchKind::DocumentEdition { id } => {
|
||||
let task = self
|
||||
.get_task(rtxn, id)?
|
||||
.ok_or(Error::CorruptedTaskQueue)?
|
||||
.with_batch_id(batch_id);
|
||||
let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
current_batch.processing(Some(&mut task));
|
||||
match &task.kind {
|
||||
KindWithContent::DocumentEdition { index_uid, .. } => {
|
||||
Ok(Some(Batch::IndexOperation {
|
||||
@ -310,7 +311,11 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
BatchKind::DocumentOperation { method, operation_ids, .. } => {
|
||||
let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, operation_ids)?;
|
||||
let tasks = self.get_existing_tasks_with_processing_batch(
|
||||
rtxn,
|
||||
current_batch,
|
||||
operation_ids,
|
||||
)?;
|
||||
let primary_key = tasks
|
||||
.iter()
|
||||
.find_map(|task| match task.kind {
|
||||
@ -357,7 +362,11 @@ impl IndexScheduler {
|
||||
}))
|
||||
}
|
||||
BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => {
|
||||
let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, deletion_ids)?;
|
||||
let tasks = self.get_existing_tasks_with_processing_batch(
|
||||
rtxn,
|
||||
current_batch,
|
||||
deletion_ids,
|
||||
)?;
|
||||
|
||||
Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::DocumentDeletion { index_uid, tasks },
|
||||
@ -365,7 +374,11 @@ impl IndexScheduler {
|
||||
}))
|
||||
}
|
||||
BatchKind::Settings { settings_ids, .. } => {
|
||||
let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, settings_ids)?;
|
||||
let tasks = self.get_existing_tasks_with_processing_batch(
|
||||
rtxn,
|
||||
current_batch,
|
||||
settings_ids,
|
||||
)?;
|
||||
|
||||
let mut settings = Vec::new();
|
||||
for task in &tasks {
|
||||
@ -388,7 +401,7 @@ impl IndexScheduler {
|
||||
rtxn,
|
||||
index_uid,
|
||||
BatchKind::Settings { settings_ids, allow_index_creation },
|
||||
batch_id,
|
||||
current_batch,
|
||||
must_create_index,
|
||||
)?
|
||||
.unwrap()
|
||||
@ -404,7 +417,7 @@ impl IndexScheduler {
|
||||
rtxn,
|
||||
index_uid,
|
||||
BatchKind::DocumentClear { ids: other },
|
||||
batch_id,
|
||||
current_batch,
|
||||
must_create_index,
|
||||
)?
|
||||
.unwrap()
|
||||
@ -437,7 +450,7 @@ impl IndexScheduler {
|
||||
rtxn,
|
||||
index_uid.clone(),
|
||||
BatchKind::Settings { settings_ids, allow_index_creation },
|
||||
batch_id,
|
||||
current_batch,
|
||||
must_create_index,
|
||||
)?;
|
||||
|
||||
@ -450,7 +463,7 @@ impl IndexScheduler {
|
||||
primary_key,
|
||||
operation_ids,
|
||||
},
|
||||
batch_id,
|
||||
current_batch,
|
||||
must_create_index,
|
||||
)?;
|
||||
|
||||
@ -488,10 +501,8 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
BatchKind::IndexCreation { id } => {
|
||||
let task = self
|
||||
.get_task(rtxn, id)?
|
||||
.ok_or(Error::CorruptedTaskQueue)?
|
||||
.with_batch_id(batch_id);
|
||||
let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
current_batch.processing(Some(&mut task));
|
||||
let (index_uid, primary_key) = match &task.kind {
|
||||
KindWithContent::IndexCreation { index_uid, primary_key } => {
|
||||
(index_uid.clone(), primary_key.clone())
|
||||
@ -501,10 +512,8 @@ impl IndexScheduler {
|
||||
Ok(Some(Batch::IndexCreation { index_uid, primary_key, task }))
|
||||
}
|
||||
BatchKind::IndexUpdate { id } => {
|
||||
let task = self
|
||||
.get_task(rtxn, id)?
|
||||
.ok_or(Error::CorruptedTaskQueue)?
|
||||
.with_batch_id(batch_id);
|
||||
let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
current_batch.processing(Some(&mut task));
|
||||
let primary_key = match &task.kind {
|
||||
KindWithContent::IndexUpdate { primary_key, .. } => primary_key.clone(),
|
||||
_ => unreachable!(),
|
||||
@ -514,13 +523,11 @@ impl IndexScheduler {
|
||||
BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion {
|
||||
index_uid,
|
||||
index_has_been_created: must_create_index,
|
||||
tasks: self.get_existing_tasks_with_batch_id(rtxn, batch_id, ids)?,
|
||||
tasks: self.get_existing_tasks_with_processing_batch(rtxn, current_batch, ids)?,
|
||||
})),
|
||||
BatchKind::IndexSwap { id } => {
|
||||
let task = self
|
||||
.get_task(rtxn, id)?
|
||||
.ok_or(Error::CorruptedTaskQueue)?
|
||||
.with_batch_id(batch_id);
|
||||
let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
current_batch.processing(Some(&mut task));
|
||||
Ok(Some(Batch::IndexSwap { task }))
|
||||
}
|
||||
}
|
||||
@ -533,11 +540,16 @@ impl IndexScheduler {
|
||||
/// 4. We get the *next* dump to process.
|
||||
/// 5. We get the *next* tasks to process for a specific index.
|
||||
#[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")]
|
||||
pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<(Batch, BatchId)>> {
|
||||
pub(crate) fn create_next_batch(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
) -> Result<Option<(Batch, ProcessingBatch)>> {
|
||||
#[cfg(test)]
|
||||
self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?;
|
||||
|
||||
let batch_id = self.next_batch_id(rtxn)?;
|
||||
let mut current_batch = ProcessingBatch::new(batch_id);
|
||||
|
||||
let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
|
||||
let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
|
||||
|
||||
@ -547,63 +559,51 @@ impl IndexScheduler {
|
||||
// We must *not* reset the processing tasks before calling this method.
|
||||
// Displaying the `batch_id` would make a strange error message since this task cancelation is going to
|
||||
// replace the canceled batch. It's better to avoid mentioning it in the error message.
|
||||
let ProcessingTasks { started_at, batch_id: _, processing } =
|
||||
let ProcessingTasks { batch: previous_batch, processing } =
|
||||
&*self.processing_tasks.read().unwrap();
|
||||
let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
current_batch.processing(Some(&mut task));
|
||||
return Ok(Some((
|
||||
Batch::TaskCancelation {
|
||||
task: self
|
||||
.get_task(rtxn, task_id)?
|
||||
.ok_or(Error::CorruptedTaskQueue)?
|
||||
.with_batch_id(batch_id),
|
||||
previous_started_at: *started_at,
|
||||
task,
|
||||
// We should never be in a case where we don't have a previous_batch, but let's not crash if it happens
|
||||
previous_started_at: previous_batch
|
||||
.as_ref()
|
||||
.map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at),
|
||||
previous_processing_tasks: processing.clone(),
|
||||
},
|
||||
batch_id,
|
||||
current_batch,
|
||||
)));
|
||||
}
|
||||
|
||||
// 2. we get the next task to delete
|
||||
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
|
||||
if !to_delete.is_empty() {
|
||||
let tasks = self
|
||||
.get_existing_tasks(rtxn, to_delete)?
|
||||
.into_iter()
|
||||
.map(|task| task.with_batch_id(batch_id))
|
||||
.collect();
|
||||
return Ok(Some((Batch::TaskDeletions(tasks), batch_id)));
|
||||
let mut tasks = self.get_existing_tasks(rtxn, to_delete)?;
|
||||
current_batch.processing(&mut tasks);
|
||||
return Ok(Some((Batch::TaskDeletions(tasks), current_batch)));
|
||||
}
|
||||
|
||||
// 3. we batch the snapshot.
|
||||
let to_snapshot = self.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued;
|
||||
if !to_snapshot.is_empty() {
|
||||
return Ok(Some((
|
||||
Batch::SnapshotCreation(
|
||||
self.get_existing_tasks(rtxn, to_snapshot)?
|
||||
.into_iter()
|
||||
.map(|task| task.with_batch_id(batch_id))
|
||||
.collect(),
|
||||
),
|
||||
batch_id,
|
||||
)));
|
||||
let mut tasks = self.get_existing_tasks(rtxn, to_snapshot)?;
|
||||
current_batch.processing(&mut tasks);
|
||||
return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
|
||||
}
|
||||
|
||||
// 4. we batch the dumps.
|
||||
let to_dump = self.get_kind(rtxn, Kind::DumpCreation)? & enqueued;
|
||||
if let Some(to_dump) = to_dump.min() {
|
||||
return Ok(Some((
|
||||
Batch::Dump(
|
||||
self.get_task(rtxn, to_dump)?
|
||||
.ok_or(Error::CorruptedTaskQueue)?
|
||||
.with_batch_id(batch_id),
|
||||
),
|
||||
batch_id,
|
||||
)));
|
||||
let mut task = self.get_task(rtxn, to_dump)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
current_batch.processing(Some(&mut task));
|
||||
return Ok(Some((Batch::Dump(task), current_batch)));
|
||||
}
|
||||
|
||||
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
|
||||
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
|
||||
let task =
|
||||
self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?.with_batch_id(batch_id);
|
||||
let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
current_batch.processing(Some(&mut task));
|
||||
|
||||
// If the task is not associated with any index, verify that it is an index swap and
|
||||
// create the batch directly. Otherwise, get the index name associated with the task
|
||||
@ -613,7 +613,7 @@ impl IndexScheduler {
|
||||
index_name
|
||||
} else {
|
||||
assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty()));
|
||||
return Ok(Some((Batch::IndexSwap { task }, batch_id)));
|
||||
return Ok(Some((Batch::IndexSwap { task }, current_batch)));
|
||||
};
|
||||
|
||||
let index_already_exists = self.index_mapper.exists(rtxn, index_name)?;
|
||||
@ -649,10 +649,10 @@ impl IndexScheduler {
|
||||
rtxn,
|
||||
index_name.to_string(),
|
||||
batchkind,
|
||||
batch_id,
|
||||
&mut current_batch,
|
||||
create_index,
|
||||
)?
|
||||
.map(|batch| (batch, batch_id)));
|
||||
.map(|batch| (batch, current_batch)));
|
||||
}
|
||||
|
||||
// If we found no tasks then we were notified for something that got autobatched
|
||||
|
Reference in New Issue
Block a user