mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-11 22:26:25 +00:00
implement a first version of the clear indexes
This commit is contained in:
@@ -32,6 +32,7 @@ enum AutobatchKind {
|
||||
},
|
||||
IndexCreation,
|
||||
IndexDeletion,
|
||||
IndexClear,
|
||||
IndexUpdate,
|
||||
IndexSwap,
|
||||
}
|
||||
@@ -74,6 +75,7 @@ impl From<KindWithContent> for AutobatchKind {
|
||||
}
|
||||
}
|
||||
KindWithContent::IndexDeletion { .. } => AutobatchKind::IndexDeletion,
|
||||
KindWithContent::IndexClear { .. } => AutobatchKind::IndexClear,
|
||||
KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation,
|
||||
KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate,
|
||||
KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap,
|
||||
@@ -123,6 +125,9 @@ pub enum BatchKind {
|
||||
IndexDeletion {
|
||||
ids: Vec<TaskId>,
|
||||
},
|
||||
IndexClear {
|
||||
id: TaskId,
|
||||
},
|
||||
IndexCreation {
|
||||
id: TaskId,
|
||||
},
|
||||
@@ -173,6 +178,7 @@ impl BatchKind {
|
||||
match AutobatchKind::from(kind) {
|
||||
K::IndexCreation => (Break(BatchKind::IndexCreation { id: task_id }), true),
|
||||
K::IndexDeletion => (Break(BatchKind::IndexDeletion { ids: vec![task_id] }), false),
|
||||
K::IndexClear => (Break(BatchKind::IndexClear { id: task_id }), false),
|
||||
K::IndexUpdate => (Break(BatchKind::IndexUpdate { id: task_id }), false),
|
||||
K::IndexSwap => (Break(BatchKind::IndexSwap { id: task_id }), false),
|
||||
K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false),
|
||||
@@ -222,7 +228,7 @@ impl BatchKind {
|
||||
|
||||
match (self, kind) {
|
||||
// We don't batch any of these operations
|
||||
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentDeletionByFilter) => Break(this),
|
||||
(this, K::IndexCreation | K::IndexUpdate | K::IndexClear | K::IndexSwap | K::DocumentDeletionByFilter) => Break(this),
|
||||
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
|
||||
(this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => {
|
||||
Break(this)
|
||||
@@ -480,6 +486,7 @@ impl BatchKind {
|
||||
(
|
||||
BatchKind::IndexCreation { .. }
|
||||
| BatchKind::IndexDeletion { .. }
|
||||
| BatchKind::IndexClear { .. }
|
||||
| BatchKind::IndexUpdate { .. }
|
||||
| BatchKind::IndexSwap { .. }
|
||||
| BatchKind::DocumentDeletionByFilter { .. },
|
||||
|
@@ -85,6 +85,10 @@ pub(crate) enum Batch {
|
||||
tasks: Vec<Task>,
|
||||
index_has_been_created: bool,
|
||||
},
|
||||
IndexClear {
|
||||
index_uids: Vec<String>,
|
||||
task: Task,
|
||||
},
|
||||
IndexSwap {
|
||||
task: Task,
|
||||
},
|
||||
@@ -154,6 +158,7 @@ impl Batch {
|
||||
| Batch::TaskDeletion(task)
|
||||
| Batch::Dump(task)
|
||||
| Batch::IndexCreation { task, .. }
|
||||
| Batch::IndexClear { task, .. }
|
||||
| Batch::IndexDocumentDeletionByFilter { task, .. }
|
||||
| Batch::IndexUpdate { task, .. } => vec![task.uid],
|
||||
Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => {
|
||||
@@ -189,6 +194,7 @@ impl Batch {
|
||||
| TaskDeletion(_)
|
||||
| SnapshotCreation(_)
|
||||
| Dump(_)
|
||||
| IndexClear { .. }
|
||||
| IndexSwap { .. } => None,
|
||||
IndexOperation { op, .. } => Some(op.index_uid()),
|
||||
IndexCreation { index_uid, .. }
|
||||
@@ -453,6 +459,14 @@ impl IndexScheduler {
|
||||
index_has_been_created: must_create_index,
|
||||
tasks: self.get_existing_tasks(rtxn, ids)?,
|
||||
})),
|
||||
BatchKind::IndexClear { id } => {
|
||||
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
let index_uids = match &task.kind {
|
||||
KindWithContent::IndexClear { index_uids } => index_uids.clone(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
Ok(Some(Batch::IndexClear { index_uids, task }))
|
||||
}
|
||||
BatchKind::IndexSwap { id } => {
|
||||
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
Ok(Some(Batch::IndexSwap { task }))
|
||||
@@ -1017,6 +1031,13 @@ impl IndexScheduler {
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
Batch::IndexClear { index_uids, mut task } => {
|
||||
let wtxn = self.env.write_txn()?;
|
||||
self.index_mapper.delete_indexes(wtxn, index_uids, false)?;
|
||||
task.status = Status::Succeeded;
|
||||
|
||||
Ok(vec![task])
|
||||
}
|
||||
Batch::IndexSwap { mut task } => {
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let swaps = if let KindWithContent::IndexSwap { swaps } = &task.kind {
|
||||
|
@@ -173,19 +173,37 @@ impl IndexMapper {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_index(&self, wtxn: RwTxn, name: &str) -> Result<()> {
|
||||
self.delete_indexes(wtxn, Some(name), true)
|
||||
}
|
||||
|
||||
/// Removes the index from the mapping table and the in-memory index map
|
||||
/// but keeps the associated tasks.
|
||||
pub fn delete_index(&self, mut wtxn: RwTxn, name: &str) -> Result<()> {
|
||||
let uuid = self
|
||||
.index_mapping
|
||||
.get(&wtxn, name)?
|
||||
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
||||
pub fn delete_indexes(
|
||||
&self,
|
||||
mut wtxn: RwTxn,
|
||||
names: impl IntoIterator<Item = impl AsRef<str>>,
|
||||
error_on_missing_index: bool,
|
||||
) -> Result<()> {
|
||||
let indexes = names
|
||||
.into_iter()
|
||||
.map(|name| {
|
||||
let name = name.as_ref().to_string();
|
||||
let uuid = self
|
||||
.index_mapping
|
||||
.get(&wtxn, &name)?
|
||||
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
||||
Ok((name, uuid))
|
||||
})
|
||||
.filter(|res| error_on_missing_index || res.is_ok())
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
// Not an error if the index had no stats in cache.
|
||||
self.index_stats.delete(&mut wtxn, &uuid)?;
|
||||
|
||||
// Once we retrieved the UUID of the index we remove it from the mapping table.
|
||||
assert!(self.index_mapping.delete(&mut wtxn, name)?);
|
||||
for (name, uuid) in indexes.iter() {
|
||||
// Not an error if the index had no stats in cache.
|
||||
self.index_stats.delete(&mut wtxn, uuid)?;
|
||||
// Once we retrieved the UUID of the index we remove it from the mapping table.
|
||||
assert!(self.index_mapping.delete(&mut wtxn, name)?);
|
||||
}
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
@@ -203,51 +221,63 @@ impl IndexMapper {
|
||||
// This can not be caused by indexation because deleting an index happens in the scheduler itself, so cannot be concurrent with indexation.
|
||||
//
|
||||
// In these situations, reporting the error through a panic is in order.
|
||||
let closing_event = loop {
|
||||
let mut lock = self.index_map.write().unwrap();
|
||||
match lock.start_deletion(&uuid) {
|
||||
Ok(env_closing) => break env_closing,
|
||||
Err(Some(reopen)) => {
|
||||
// drop the lock here so that we don't synchronously wait for the index to close.
|
||||
drop(lock);
|
||||
tries += 1;
|
||||
if tries >= 100 {
|
||||
panic!("Too many attempts to close index {name} prior to deletion.")
|
||||
let indexes = indexes
|
||||
.into_iter()
|
||||
.map(|(name, uuid)| {
|
||||
let closing_event = loop {
|
||||
let mut lock = self.index_map.write().unwrap();
|
||||
match lock.start_deletion(&uuid) {
|
||||
Ok(env_closing) => break env_closing,
|
||||
Err(Some(reopen)) => {
|
||||
// drop the lock here so that we don't synchronously wait for the index to close.
|
||||
drop(lock);
|
||||
tries += 1;
|
||||
if tries >= 100 {
|
||||
panic!("Too many attempts to close index {name} prior to deletion.")
|
||||
}
|
||||
let reopen =
|
||||
if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) {
|
||||
reopen
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
reopen.close(&mut self.index_map.write().unwrap());
|
||||
continue;
|
||||
}
|
||||
// TODO: what is this case, what does that mean?
|
||||
Err(None) => return None,
|
||||
}
|
||||
let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) {
|
||||
reopen
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
reopen.close(&mut self.index_map.write().unwrap());
|
||||
continue;
|
||||
}
|
||||
Err(None) => return Ok(()),
|
||||
}
|
||||
};
|
||||
};
|
||||
Some((name, uuid, closing_event))
|
||||
})
|
||||
.filter_map(|thingy| thingy)
|
||||
.map(|(name, uuid, closing)| {
|
||||
(name.to_string(), uuid, self.base_path.join(uuid.to_string()), closing)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let index_map = self.index_map.clone();
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
let index_name = name.to_string();
|
||||
thread::Builder::new()
|
||||
.name(String::from("index_deleter"))
|
||||
.spawn(move || {
|
||||
// We first wait to be sure that the previously opened index is effectively closed.
|
||||
// This can take a lot of time, this is why we do that in a separate thread.
|
||||
if let Some(closing_event) = closing_event {
|
||||
closing_event.wait();
|
||||
}
|
||||
for (name, uuid, index_path, closing_event) in indexes {
|
||||
// We first wait to be sure that the previously opened index is effectively closed.
|
||||
// This can take a lot of time, this is why we do that in a separate thread.
|
||||
if let Some(closing_event) = closing_event {
|
||||
closing_event.wait();
|
||||
}
|
||||
|
||||
// Then we remove the content from disk.
|
||||
if let Err(e) = fs::remove_dir_all(&index_path) {
|
||||
error!(
|
||||
"An error happened when deleting the index {} ({}): {}",
|
||||
index_name, uuid, e
|
||||
);
|
||||
}
|
||||
// Then we remove the content from disk.
|
||||
if let Err(e) = fs::remove_dir_all(&index_path) {
|
||||
error!(
|
||||
"An error happened when deleting the index {} ({}): {}",
|
||||
name, uuid, e
|
||||
);
|
||||
}
|
||||
|
||||
// Finally we remove the entry from the index map.
|
||||
index_map.write().unwrap().end_deletion(&uuid);
|
||||
// Finally we remove the entry from the index map.
|
||||
index_map.write().unwrap().end_deletion(&uuid);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
|
@@ -1284,6 +1284,7 @@ impl<'a> Dump<'a> {
|
||||
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
},
|
||||
KindDump::IndexClear { index_uids } => KindWithContent::IndexClear { index_uids },
|
||||
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
primary_key,
|
||||
|
@@ -258,6 +258,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
|
||||
K::TaskCancelation { .. }
|
||||
| K::TaskDeletion { .. }
|
||||
| K::DumpCreation { .. }
|
||||
| K::IndexClear { .. }
|
||||
| K::SnapshotCreation => (),
|
||||
};
|
||||
if let Some(Details::IndexSwap { swaps }) = &mut task.details {
|
||||
|
Reference in New Issue
Block a user