mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-18 04:11:07 +00:00
Compare commits
4 Commits
release-v1
...
prototype-
Author | SHA1 | Date | |
---|---|---|---|
b7ed3308bb | |||
579a96adc7 | |||
e6ff45e3b9 | |||
e4f8ee00c8 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -2148,8 +2148,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "grenad"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a007932af5475ebb5c63bef8812bb1c36f317983bb4ca663e9d6dd58d6a0f8c"
|
||||
source = "git+https://github.com/meilisearch/grenad.git?branch=keep-source-index-in-merger#5a7c10fcd689f5967a8979f6b66da1e0939439ff"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
|
@ -961,7 +961,22 @@ impl IndexScheduler {
|
||||
.set_currently_updating_index(Some((index_uid.clone(), index.clone())));
|
||||
|
||||
let mut index_wtxn = index.write_txn()?;
|
||||
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
||||
|
||||
let mut tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
||||
|
||||
if index.is_corrupted(&index_wtxn)? {
|
||||
tracing::error!("Aborting task due to corrupted index");
|
||||
index_wtxn.abort();
|
||||
for task in tasks.iter_mut() {
|
||||
task.status = Status::Failed;
|
||||
task.error = Some(Error::CorruptedIndex.into());
|
||||
}
|
||||
|
||||
return Ok(tasks);
|
||||
}
|
||||
|
||||
index.check_document_facet_consistency(&index_wtxn)?.check();
|
||||
|
||||
index_wtxn.commit()?;
|
||||
|
||||
// if the update processed successfully, we're going to store the new
|
||||
@ -1339,6 +1354,7 @@ impl IndexScheduler {
|
||||
} else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let deleted_documents = delete_document_by_filter(
|
||||
index_wtxn,
|
||||
filter,
|
||||
|
@ -138,6 +138,8 @@ pub enum Error {
|
||||
CreateBatch(Box<Self>),
|
||||
#[error("Corrupted task queue.")]
|
||||
CorruptedTaskQueue,
|
||||
#[error("Corrupted index.")]
|
||||
CorruptedIndex,
|
||||
#[error(transparent)]
|
||||
TaskDatabaseUpdate(Box<Self>),
|
||||
#[error(transparent)]
|
||||
@ -192,6 +194,7 @@ impl Error {
|
||||
| Error::Anyhow(_) => true,
|
||||
Error::CreateBatch(_)
|
||||
| Error::CorruptedTaskQueue
|
||||
| Error::CorruptedIndex
|
||||
| Error::TaskDatabaseUpdate(_)
|
||||
| Error::HeedTransaction(_) => false,
|
||||
#[cfg(test)]
|
||||
@ -242,6 +245,7 @@ impl ErrorCode for Error {
|
||||
Error::CorruptedDump => Code::Internal,
|
||||
Error::TaskDatabaseUpdate(_) => Code::Internal,
|
||||
Error::CreateBatch(_) => Code::Internal,
|
||||
Error::CorruptedIndex => Code::Internal,
|
||||
|
||||
// This one should never be seen by the end user
|
||||
Error::AbortedTask => Code::Internal,
|
||||
|
@ -1196,6 +1196,88 @@ impl IndexScheduler {
|
||||
// Reset the currently updating index to relinquish the index handle
|
||||
self.index_mapper.set_currently_updating_index(None);
|
||||
|
||||
if let Err(_error) = &res {
|
||||
let dump_batch = batch::Batch::Dump(Task {
|
||||
uid: u32::MAX,
|
||||
enqueued_at: OffsetDateTime::now_utc(),
|
||||
started_at: Some(OffsetDateTime::now_utc()),
|
||||
finished_at: None,
|
||||
error: None,
|
||||
canceled_by: None,
|
||||
details: None,
|
||||
status: Status::Processing,
|
||||
kind: KindWithContent::DumpCreation { keys: vec![], instance_uid: None },
|
||||
});
|
||||
|
||||
let res = {
|
||||
let cloned_index_scheduler = self.private_clone();
|
||||
let handle = std::thread::Builder::new()
|
||||
.name(String::from("batch-operation"))
|
||||
.spawn(move || cloned_index_scheduler.process_batch(dump_batch))
|
||||
.unwrap();
|
||||
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(_) => tracing::info!("Created a dump after panicked task"),
|
||||
Err(error) => {
|
||||
tracing::error!(%error, "Could not create a dump after panicked task")
|
||||
}
|
||||
}
|
||||
|
||||
let snap_batch = batch::Batch::SnapshotCreation(vec![Task {
|
||||
uid: u32::MAX,
|
||||
enqueued_at: OffsetDateTime::now_utc(),
|
||||
started_at: Some(OffsetDateTime::now_utc()),
|
||||
finished_at: None,
|
||||
error: None,
|
||||
canceled_by: None,
|
||||
details: None,
|
||||
status: Status::Processing,
|
||||
kind: KindWithContent::SnapshotCreation,
|
||||
}]);
|
||||
|
||||
let res = {
|
||||
let cloned_index_scheduler = self.private_clone();
|
||||
let handle = std::thread::Builder::new()
|
||||
.name(String::from("batch-operation"))
|
||||
.spawn(move || cloned_index_scheduler.process_batch(snap_batch))
|
||||
.unwrap();
|
||||
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(_) => tracing::info!("Created a snapshot after panicked task"),
|
||||
Err(error) => {
|
||||
tracing::error!(%error, "Could not create a snapshot after panicked task")
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
if let Some(index_uid) = index_uid.as_deref() {
|
||||
if let Ok(index) = self.index(index_uid) {
|
||||
let mut index_wtxn = index.write_txn()?;
|
||||
index.mark_as_corrupted(&mut index_wtxn)?;
|
||||
index_wtxn.commit()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let user = std::env::var("MEILI_LOUIS_PUSHOVER_USER").unwrap();
|
||||
let app = std::env::var("MEILI_LOUIS_PUSHOVER_APP").unwrap();
|
||||
|
||||
if let Err(error) = ureq::post("https://api.pushover.net/1/messages.json").send_json(
|
||||
serde_json::json!({
|
||||
"token": app,
|
||||
"user": user,
|
||||
"title": "Issue 138 db inconsistency",
|
||||
"message": "Dump and snapshot created, the index has been marked as corrupted",
|
||||
}),
|
||||
) {
|
||||
tracing::error!(%error, "could not send pushover")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
|
||||
|
||||
|
@ -74,6 +74,9 @@ fn on_panic(info: &std::panic::PanicInfo) {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let (opt, config_read_from) = Opt::try_build()?;
|
||||
|
||||
std::env::var("MEILI_LOUIS_PUSHOVER_USER").expect("MEILI_LOUIS_PUSHOVER_USER not set");
|
||||
std::env::var("MEILI_LOUIS_PUSHOVER_APP").expect("MEILI_LOUIS_PUSHOVER_APP not set");
|
||||
|
||||
std::panic::set_hook(Box::new(on_panic));
|
||||
|
||||
anyhow::ensure!(
|
||||
|
@ -26,7 +26,7 @@ flatten-serde-json = { path = "../flatten-serde-json" }
|
||||
fst = "0.4.7"
|
||||
fxhash = "0.2.1"
|
||||
geoutils = "0.5.1"
|
||||
grenad = { version = "0.4.5", default-features = false, features = [
|
||||
grenad = { git = "https://github.com/meilisearch/grenad.git", branch = "keep-source-index-in-merger", version = "0.4.5", default-features = false, features = [
|
||||
"rayon",
|
||||
"tempfile",
|
||||
] }
|
||||
|
@ -67,6 +67,8 @@ pub mod main_key {
|
||||
pub const PAGINATION_MAX_TOTAL_HITS: &str = "pagination-max-total-hits";
|
||||
pub const PROXIMITY_PRECISION: &str = "proximity-precision";
|
||||
pub const EMBEDDING_CONFIGS: &str = "embedding_configs";
|
||||
|
||||
pub const CORRUPTED: &str = "corrupted";
|
||||
}
|
||||
|
||||
pub mod db_name {
|
||||
@ -1507,6 +1509,103 @@ impl Index {
|
||||
_ => "default".to_owned(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn check_document_facet_consistency(
|
||||
&self,
|
||||
rtxn: &RoTxn<'_>,
|
||||
) -> Result<DocumentFacetConsistency> {
|
||||
let documents = self.documents_ids(rtxn)?;
|
||||
|
||||
let field_ids_map = self.fields_ids_map(rtxn)?;
|
||||
|
||||
let mut facets = Vec::new();
|
||||
let mut facet_exists = Vec::new();
|
||||
let faceted_fields = self.user_defined_faceted_fields(rtxn)?;
|
||||
for fid in field_ids_map.ids() {
|
||||
let facet_name = field_ids_map.name(fid).unwrap();
|
||||
if !faceted_fields.contains(facet_name) {
|
||||
continue;
|
||||
};
|
||||
let mut facet = RoaringBitmap::new();
|
||||
|
||||
// value doesn't matter here we'll truncate to the level
|
||||
let key = crate::heed_codec::facet::FacetGroupKey {
|
||||
field_id: fid,
|
||||
level: 0,
|
||||
left_bound: &[] as _,
|
||||
};
|
||||
|
||||
for res in self
|
||||
.facet_id_f64_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<crate::heed_codec::BytesRefCodec>>()
|
||||
.prefix_iter(rtxn, &key)?
|
||||
{
|
||||
let (_k, v) = res?;
|
||||
facet |= v.bitmap;
|
||||
}
|
||||
|
||||
for res in self
|
||||
.facet_id_string_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<crate::heed_codec::BytesRefCodec>>()
|
||||
.prefix_iter(rtxn, &key)?
|
||||
{
|
||||
let (_k, v) = res?;
|
||||
facet |= v.bitmap;
|
||||
}
|
||||
|
||||
facets.push((field_ids_map.name(fid).unwrap().to_owned(), facet));
|
||||
facet_exists.push(self.exists_faceted_documents_ids(rtxn, fid)?);
|
||||
}
|
||||
|
||||
Ok(DocumentFacetConsistency { documents, facets, facet_exists })
|
||||
}
|
||||
|
||||
pub fn mark_as_corrupted(&self, wtxn: &mut RwTxn<'_>) -> Result<()> {
|
||||
Ok(self.main.remap_types::<Str, Str>().put(wtxn, main_key::CORRUPTED, "corrupted")?)
|
||||
}
|
||||
|
||||
pub fn is_corrupted(&self, txn: &RoTxn<'_>) -> Result<bool> {
|
||||
Ok(self.main.remap_types::<Str, Str>().get(txn, main_key::CORRUPTED)?.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DocumentFacetConsistency {
|
||||
documents: RoaringBitmap,
|
||||
facets: Vec<(String, RoaringBitmap)>,
|
||||
facet_exists: Vec<RoaringBitmap>,
|
||||
}
|
||||
|
||||
impl DocumentFacetConsistency {
|
||||
pub fn check(&self) {
|
||||
let mut inconsistencies = 0;
|
||||
for ((field_name, facet), _facet_exists) in self.facets.iter().zip(self.facet_exists.iter())
|
||||
{
|
||||
if field_name == "_geo" {
|
||||
continue;
|
||||
}
|
||||
|
||||
// only check the internal ids missing in documents as it is the grave condition
|
||||
// let documents = self.documents.clone() & facet_exists;
|
||||
let documents = self.documents.clone();
|
||||
// let missing_in_facets = &documents - facet;
|
||||
let missing_in_documents = facet - documents;
|
||||
|
||||
/*for id in missing_in_facets {
|
||||
tracing::error!(id, field_name, "Missing in facets");
|
||||
inconsistencies += 1;
|
||||
}*/
|
||||
for id in missing_in_documents {
|
||||
tracing::error!(id, field_name, "Missing in documents");
|
||||
inconsistencies += 1;
|
||||
}
|
||||
}
|
||||
if inconsistencies > 0 {
|
||||
panic!(
|
||||
"Panicked due to the previous {} inconsistencies between documents and facets",
|
||||
inconsistencies
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
Reference in New Issue
Block a user