Compare commits

..

4 Commits

Author SHA1 Message Date
b7ed3308bb Update grenad 2024-04-09 09:31:23 +02:00
579a96adc7 Actually abort in case of corrupted index 2024-04-04 11:02:54 +02:00
e6ff45e3b9 Changes for tracking issue 138
- create a snapshot as well as a dump
- only detect inconsistencies in the facet -> document direction
- mark index as corrupted after creating snapshot and dump
- always abort tasks on indexes marked as corrupted
2024-04-04 10:22:49 +02:00
e4f8ee00c8 check consistency, create a dump and send push event for failed checks 2024-03-25 16:32:50 +01:00
9 changed files with 268 additions and 87 deletions

105
Cargo.lock generated
View File

@ -36,9 +36,9 @@ dependencies = [
[[package]]
name = "actix-http"
version = "3.6.0"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d223b13fd481fc0d1f83bb12659ae774d9e3601814c68a0bc539731698cca743"
checksum = "129d4c88e98860e1758c5de288d1632b07970a16d59bdf7b8d66053d582bb71f"
dependencies = [
"actix-codec",
"actix-rt",
@ -138,9 +138,9 @@ dependencies = [
[[package]]
name = "actix-tls"
version = "3.3.0"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4cce60a2f2b477bc72e5cde0af1812a6e82d8fd85b5570a5dcf2a5bf2c5be5f"
checksum = "72616e7fbec0aa99c6f3164677fa48ff5a60036d0799c98cab894a44f3e0efc3"
dependencies = [
"actix-rt",
"actix-service",
@ -148,6 +148,8 @@ dependencies = [
"futures-core",
"impl-more",
"pin-project-lite",
"rustls 0.21.6",
"rustls-webpki",
"tokio",
"tokio-rustls 0.23.4",
"tokio-util",
@ -167,9 +169,9 @@ dependencies = [
[[package]]
name = "actix-web"
version = "4.5.1"
version = "4.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a6556ddebb638c2358714d853257ed226ece6023ef9364f23f0c70737ea984"
checksum = "e43428f3bf11dee6d166b00ec2df4e3aa8cc1606aaa0b7433c146852e2f4e03b"
dependencies = [
"actix-codec",
"actix-http",
@ -257,9 +259,9 @@ dependencies = [
[[package]]
name = "ahash"
version = "0.8.11"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
checksum = "42cd52102d3df161c77a887b608d7a4897d7cc112886a9537b738a887a03aaff"
dependencies = [
"cfg-if",
"getrandom",
@ -494,7 +496,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "benchmarks"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"anyhow",
"bytes",
@ -628,7 +630,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"anyhow",
"time",
@ -833,9 +835,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.0.83"
version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01"
dependencies = [
"jobserver",
"libc",
@ -1529,7 +1531,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"anyhow",
"big_s",
@ -1767,7 +1769,7 @@ dependencies = [
[[package]]
name = "file-store"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"faux",
"tempfile",
@ -1790,7 +1792,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"insta",
"nom",
@ -1810,7 +1812,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"criterion",
"serde_json",
@ -1928,7 +1930,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"arbitrary",
"clap",
@ -2145,9 +2147,8 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "grenad"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c297f45167e6d543eb728e12ff284283e4ba2182a25c6cdcec883fda3316c7e7"
version = "0.4.5"
source = "git+https://github.com/meilisearch/grenad.git?branch=keep-source-index-in-merger#5a7c10fcd689f5967a8979f6b66da1e0939439ff"
dependencies = [
"bytemuck",
"byteorder",
@ -2157,9 +2158,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.3.26"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8"
checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9"
dependencies = [
"bytes",
"fnv",
@ -2391,7 +2392,7 @@ dependencies = [
"futures-util",
"http 0.2.11",
"hyper",
"rustls 0.21.10",
"rustls 0.21.6",
"tokio",
"tokio-rustls 0.24.1",
]
@ -2420,7 +2421,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
[[package]]
name = "index-scheduler"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"anyhow",
"big_s",
@ -2607,7 +2608,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"criterion",
"serde_json",
@ -3115,7 +3116,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"insta",
"md5",
@ -3124,7 +3125,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"actix-cors",
"actix-http",
@ -3217,7 +3218,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"base64 0.21.7",
"enum-iterator",
@ -3236,7 +3237,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"actix-web",
"anyhow",
@ -3266,7 +3267,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"anyhow",
"clap",
@ -3305,7 +3306,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"arroy",
"big_s",
@ -3411,9 +3412,9 @@ dependencies = [
[[package]]
name = "mio"
version = "0.8.11"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0"
dependencies = [
"libc",
"log",
@ -3746,7 +3747,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permissive-json-pointer"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"big_s",
"serde_json",
@ -4237,7 +4238,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.21.10",
"rustls 0.21.6",
"rustls-pemfile",
"serde",
"serde_json",
@ -4285,9 +4286,9 @@ dependencies = [
[[package]]
name = "ring"
version = "0.17.7"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74"
checksum = "9babe80d5c16becf6594aa32ad2be8fe08498e7ae60b77de8df700e67f191d7e"
dependencies = [
"cc",
"getrandom",
@ -4383,12 +4384,12 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.10"
version = "0.21.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb"
dependencies = [
"log",
"ring 0.17.7",
"ring 0.16.20",
"rustls-webpki",
"sct",
]
@ -4408,7 +4409,7 @@ version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
"ring 0.17.7",
"ring 0.17.3",
"untrusted 0.9.0",
]
@ -4451,12 +4452,12 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sct"
version = "0.7.1"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring 0.17.7",
"untrusted 0.9.0",
"ring 0.16.20",
"untrusted 0.7.1",
]
[[package]]
@ -5095,7 +5096,7 @@ version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls 0.21.10",
"rustls 0.21.6",
"tokio",
]
@ -5386,7 +5387,7 @@ dependencies = [
"flate2",
"log",
"once_cell",
"rustls 0.21.10",
"rustls 0.21.6",
"rustls-webpki",
"serde",
"serde_json",
@ -5630,12 +5631,12 @@ dependencies = [
[[package]]
name = "webpki"
version = "0.22.4"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53"
checksum = "07ecc0cd7cac091bf682ec5efa18b1cff79d617b84181f38b3951dbe135f607f"
dependencies = [
"ring 0.17.7",
"untrusted 0.9.0",
"ring 0.16.20",
"untrusted 0.7.1",
]
[[package]]
@ -5941,7 +5942,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.7.6"
version = "1.7.1"
dependencies = [
"anyhow",
"build-info",

View File

@ -21,7 +21,7 @@ members = [
]
[workspace.package]
version = "1.7.6"
version = "1.7.1"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@ -961,7 +961,22 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let mut index_wtxn = index.write_txn()?;
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
let mut tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
if index.is_corrupted(&index_wtxn)? {
tracing::error!("Aborting task due to corrupted index");
index_wtxn.abort();
for task in tasks.iter_mut() {
task.status = Status::Failed;
task.error = Some(Error::CorruptedIndex.into());
}
return Ok(tasks);
}
index.check_document_facet_consistency(&index_wtxn)?.check();
index_wtxn.commit()?;
// if the update processed successfully, we're going to store the new
@ -1339,6 +1354,7 @@ impl IndexScheduler {
} else {
unreachable!()
};
let deleted_documents = delete_document_by_filter(
index_wtxn,
filter,

View File

@ -138,6 +138,8 @@ pub enum Error {
CreateBatch(Box<Self>),
#[error("Corrupted task queue.")]
CorruptedTaskQueue,
#[error("Corrupted index.")]
CorruptedIndex,
#[error(transparent)]
TaskDatabaseUpdate(Box<Self>),
#[error(transparent)]
@ -192,6 +194,7 @@ impl Error {
| Error::Anyhow(_) => true,
Error::CreateBatch(_)
| Error::CorruptedTaskQueue
| Error::CorruptedIndex
| Error::TaskDatabaseUpdate(_)
| Error::HeedTransaction(_) => false,
#[cfg(test)]
@ -242,6 +245,7 @@ impl ErrorCode for Error {
Error::CorruptedDump => Code::Internal,
Error::TaskDatabaseUpdate(_) => Code::Internal,
Error::CreateBatch(_) => Code::Internal,
Error::CorruptedIndex => Code::Internal,
// This one should never be seen by the end user
Error::AbortedTask => Code::Internal,

View File

@ -1196,6 +1196,88 @@ impl IndexScheduler {
// Reset the currently updating index to relinquish the index handle
self.index_mapper.set_currently_updating_index(None);
if let Err(_error) = &res {
let dump_batch = batch::Batch::Dump(Task {
uid: u32::MAX,
enqueued_at: OffsetDateTime::now_utc(),
started_at: Some(OffsetDateTime::now_utc()),
finished_at: None,
error: None,
canceled_by: None,
details: None,
status: Status::Processing,
kind: KindWithContent::DumpCreation { keys: vec![], instance_uid: None },
});
let res = {
let cloned_index_scheduler = self.private_clone();
let handle = std::thread::Builder::new()
.name(String::from("batch-operation"))
.spawn(move || cloned_index_scheduler.process_batch(dump_batch))
.unwrap();
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
};
match res {
Ok(_) => tracing::info!("Created a dump after panicked task"),
Err(error) => {
tracing::error!(%error, "Could not create a dump after panicked task")
}
}
let snap_batch = batch::Batch::SnapshotCreation(vec![Task {
uid: u32::MAX,
enqueued_at: OffsetDateTime::now_utc(),
started_at: Some(OffsetDateTime::now_utc()),
finished_at: None,
error: None,
canceled_by: None,
details: None,
status: Status::Processing,
kind: KindWithContent::SnapshotCreation,
}]);
let res = {
let cloned_index_scheduler = self.private_clone();
let handle = std::thread::Builder::new()
.name(String::from("batch-operation"))
.spawn(move || cloned_index_scheduler.process_batch(snap_batch))
.unwrap();
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
};
match res {
Ok(_) => tracing::info!("Created a snapshot after panicked task"),
Err(error) => {
tracing::error!(%error, "Could not create a snapshot after panicked task")
}
}
{
if let Some(index_uid) = index_uid.as_deref() {
if let Ok(index) = self.index(index_uid) {
let mut index_wtxn = index.write_txn()?;
index.mark_as_corrupted(&mut index_wtxn)?;
index_wtxn.commit()?;
}
}
}
let user = std::env::var("MEILI_LOUIS_PUSHOVER_USER").unwrap();
let app = std::env::var("MEILI_LOUIS_PUSHOVER_APP").unwrap();
if let Err(error) = ureq::post("https://api.pushover.net/1/messages.json").send_json(
serde_json::json!({
"token": app,
"user": user,
"title": "Issue 138 db inconsistency",
"message": "Dump and snapshot created, the index has been marked as corrupted",
}),
) {
tracing::error!(%error, "could not send pushover")
}
}
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
@ -1396,7 +1478,6 @@ impl IndexScheduler {
// let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let request = ureq::post(url)
.timeout(Duration::from_secs(30))
.set("Content-Encoding", "gzip")
.set("Content-Type", "application/x-ndjson");
let request = match &self.webhook_authorization_header {

View File

@ -74,6 +74,9 @@ fn on_panic(info: &std::panic::PanicInfo) {
async fn main() -> anyhow::Result<()> {
let (opt, config_read_from) = Opt::try_build()?;
std::env::var("MEILI_LOUIS_PUSHOVER_USER").expect("MEILI_LOUIS_PUSHOVER_USER not set");
std::env::var("MEILI_LOUIS_PUSHOVER_APP").expect("MEILI_LOUIS_PUSHOVER_APP not set");
std::panic::set_hook(Box::new(on_panic));
anyhow::ensure!(

View File

@ -26,7 +26,7 @@ flatten-serde-json = { path = "../flatten-serde-json" }
fst = "0.4.7"
fxhash = "0.2.1"
geoutils = "0.5.1"
grenad = { version = "0.4.6", default-features = false, features = [
grenad = { git = "https://github.com/meilisearch/grenad.git", branch = "keep-source-index-in-merger", version = "0.4.5", default-features = false, features = [
"rayon",
"tempfile",
] }

View File

@ -67,6 +67,8 @@ pub mod main_key {
pub const PAGINATION_MAX_TOTAL_HITS: &str = "pagination-max-total-hits";
pub const PROXIMITY_PRECISION: &str = "proximity-precision";
pub const EMBEDDING_CONFIGS: &str = "embedding_configs";
pub const CORRUPTED: &str = "corrupted";
}
pub mod db_name {
@ -1507,6 +1509,103 @@ impl Index {
_ => "default".to_owned(),
})
}
pub fn check_document_facet_consistency(
&self,
rtxn: &RoTxn<'_>,
) -> Result<DocumentFacetConsistency> {
let documents = self.documents_ids(rtxn)?;
let field_ids_map = self.fields_ids_map(rtxn)?;
let mut facets = Vec::new();
let mut facet_exists = Vec::new();
let faceted_fields = self.user_defined_faceted_fields(rtxn)?;
for fid in field_ids_map.ids() {
let facet_name = field_ids_map.name(fid).unwrap();
if !faceted_fields.contains(facet_name) {
continue;
};
let mut facet = RoaringBitmap::new();
// value doesn't matter here we'll truncate to the level
let key = crate::heed_codec::facet::FacetGroupKey {
field_id: fid,
level: 0,
left_bound: &[] as _,
};
for res in self
.facet_id_f64_docids
.remap_key_type::<FacetGroupKeyCodec<crate::heed_codec::BytesRefCodec>>()
.prefix_iter(rtxn, &key)?
{
let (_k, v) = res?;
facet |= v.bitmap;
}
for res in self
.facet_id_string_docids
.remap_key_type::<FacetGroupKeyCodec<crate::heed_codec::BytesRefCodec>>()
.prefix_iter(rtxn, &key)?
{
let (_k, v) = res?;
facet |= v.bitmap;
}
facets.push((field_ids_map.name(fid).unwrap().to_owned(), facet));
facet_exists.push(self.exists_faceted_documents_ids(rtxn, fid)?);
}
Ok(DocumentFacetConsistency { documents, facets, facet_exists })
}
pub fn mark_as_corrupted(&self, wtxn: &mut RwTxn<'_>) -> Result<()> {
Ok(self.main.remap_types::<Str, Str>().put(wtxn, main_key::CORRUPTED, "corrupted")?)
}
pub fn is_corrupted(&self, txn: &RoTxn<'_>) -> Result<bool> {
Ok(self.main.remap_types::<Str, Str>().get(txn, main_key::CORRUPTED)?.is_some())
}
}
pub struct DocumentFacetConsistency {
documents: RoaringBitmap,
facets: Vec<(String, RoaringBitmap)>,
facet_exists: Vec<RoaringBitmap>,
}
impl DocumentFacetConsistency {
pub fn check(&self) {
let mut inconsistencies = 0;
for ((field_name, facet), _facet_exists) in self.facets.iter().zip(self.facet_exists.iter())
{
if field_name == "_geo" {
continue;
}
// only check the internal ids missing in documents as it is the grave condition
// let documents = self.documents.clone() & facet_exists;
let documents = self.documents.clone();
// let missing_in_facets = &documents - facet;
let missing_in_documents = facet - documents;
/*for id in missing_in_facets {
tracing::error!(id, field_name, "Missing in facets");
inconsistencies += 1;
}*/
for id in missing_in_documents {
tracing::error!(id, field_name, "Missing in documents");
inconsistencies += 1;
}
}
if inconsistencies > 0 {
panic!(
"Panicked due to the previous {} inconsistencies between documents and facets",
inconsistencies
)
}
}
}
#[cfg(test)]

View File

@ -1032,14 +1032,13 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
{
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
// Note: this MUST be before `update_sortable` so that we can get the old value to compare with the updated value afterwards
let existing_fields: HashSet<_> = self
.index
.field_distribution(self.wtxn)?
.into_iter()
.filter_map(|(field, count)| (count != 0).then_some(field))
.collect();
let old_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?;
let old_fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
@ -1056,7 +1055,13 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
self.update_sort_facet_values_by()?;
self.update_pagination_max_total_hits()?;
let faceted_updated = self.update_faceted(existing_fields, old_faceted_fields)?;
// If there is new faceted fields we indicate that we must reindex as we must
// index new fields as facets. It means that the distinct attribute,
// an Asc/Desc criterion or a filtered attribute as be added or removed.
let new_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?;
let faceted_updated =
(&existing_fields - &old_faceted_fields) != (&existing_fields - &new_faceted_fields);
let stop_words_updated = self.update_stop_words()?;
let non_separator_tokens_updated = self.update_non_separator_tokens()?;
let separator_tokens_updated = self.update_separator_tokens()?;
@ -1089,34 +1094,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
Ok(())
}
fn update_faceted(
&self,
existing_fields: HashSet<String>,
old_faceted_fields: HashSet<String>,
) -> Result<bool> {
if existing_fields.iter().any(|field| field.contains('.')) {
return Ok(true);
}
if old_faceted_fields.iter().any(|field| field.contains('.')) {
return Ok(true);
}
// If there is new faceted fields we indicate that we must reindex as we must
// index new fields as facets. It means that the distinct attribute,
// an Asc/Desc criterion or a filtered attribute as be added or removed.
let new_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?;
if new_faceted_fields.iter().any(|field| field.contains('.')) {
return Ok(true);
}
let faceted_updated =
(&existing_fields - &old_faceted_fields) != (&existing_fields - &new_faceted_fields);
Ok(faceted_updated)
}
}
fn validate_prompt(