Implement the webhook

This commit is contained in:
Tamo
2023-11-27 15:11:22 +01:00
parent 00f0711207
commit 9b1de777de
5 changed files with 44 additions and 23 deletions

29
Cargo.lock generated
View File

@ -2467,6 +2467,7 @@ dependencies = [
"tempfile", "tempfile",
"thiserror", "thiserror",
"time", "time",
"ureq",
"uuid 1.5.0", "uuid 1.5.0",
] ]
@ -4116,7 +4117,7 @@ checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb"
dependencies = [ dependencies = [
"log", "log",
"ring", "ring",
"rustls-webpki 0.101.3", "rustls-webpki",
"sct", "sct",
] ]
@ -4129,16 +4130,6 @@ dependencies = [
"base64 0.21.2", "base64 0.21.2",
] ]
[[package]]
name = "rustls-webpki"
version = "0.100.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e98ff011474fa39949b7e5c0428f9b4937eda7da7848bbb947786b7be0b27dab"
dependencies = [
"ring",
"untrusted",
]
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
version = "0.101.3" version = "0.101.3"
@ -4860,17 +4851,18 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]] [[package]]
name = "ureq" name = "ureq"
version = "2.7.1" version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b11c96ac7ee530603dcdf68ed1557050f374ce55a5a07193ebf8cbc9f8927e9" checksum = "f8cdd25c339e200129fe4de81451814e5228c9b771d57378817d6117cc2b3f97"
dependencies = [ dependencies = [
"base64 0.21.2", "base64 0.21.2",
"flate2",
"log", "log",
"once_cell", "once_cell",
"rustls 0.21.6", "rustls 0.21.6",
"rustls-webpki 0.100.2", "rustls-webpki",
"url", "url",
"webpki-roots 0.23.1", "webpki-roots 0.25.3",
] ]
[[package]] [[package]]
@ -5094,12 +5086,9 @@ dependencies = [
[[package]] [[package]]
name = "webpki-roots" name = "webpki-roots"
version = "0.23.1" version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338" checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10"
dependencies = [
"rustls-webpki 0.100.2",
]
[[package]] [[package]]
name = "whatlang" name = "whatlang"

View File

@ -30,6 +30,7 @@ synchronoise = "1.0.1"
tempfile = "3.5.0" tempfile = "3.5.0"
thiserror = "1.0.40" thiserror = "1.0.40"
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] } time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
ureq = "2.9.1"
uuid = { version = "1.3.1", features = ["serde", "v4"] } uuid = { version = "1.3.1", features = ["serde", "v4"] }
[dev-dependencies] [dev-dependencies]

View File

@ -36,6 +36,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
snapshots_path: _, snapshots_path: _,
auth_path: _, auth_path: _,
version_file_path: _, version_file_path: _,
webhook_url: _,
test_breakpoint_sdr: _, test_breakpoint_sdr: _,
planned_failures: _, planned_failures: _,
run_loop_iteration: _, run_loop_iteration: _,

View File

@ -169,8 +169,8 @@ impl ProcessingTasks {
} }
/// Set the processing tasks to an empty list /// Set the processing tasks to an empty list
fn stop_processing(&mut self) { fn stop_processing(&mut self) -> RoaringBitmap {
self.processing = RoaringBitmap::new(); std::mem::take(&mut self.processing)
} }
/// Returns `true` if there, at least, is one task that is currently processing that we must stop. /// Returns `true` if there, at least, is one task that is currently processing that we must stop.
@ -240,6 +240,7 @@ pub struct IndexSchedulerOptions {
pub snapshots_path: PathBuf, pub snapshots_path: PathBuf,
/// The path to the folder containing the dumps. /// The path to the folder containing the dumps.
pub dumps_path: PathBuf, pub dumps_path: PathBuf,
pub webhook_url: Option<String>,
/// The maximum size, in bytes, of the task index. /// The maximum size, in bytes, of the task index.
pub task_db_size: usize, pub task_db_size: usize,
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index. /// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
@ -316,6 +317,9 @@ pub struct IndexScheduler {
/// the finished tasks automatically. /// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize, pub(crate) max_number_of_tasks: usize,
/// The webhook url we should send tasks to after processing every batches.
pub(crate) webhook_url: Option<String>,
/// A frame to output the indexation profiling files to disk. /// A frame to output the indexation profiling files to disk.
pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>, pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>,
@ -378,6 +382,7 @@ impl IndexScheduler {
dumps_path: self.dumps_path.clone(), dumps_path: self.dumps_path.clone(),
auth_path: self.auth_path.clone(), auth_path: self.auth_path.clone(),
version_file_path: self.version_file_path.clone(), version_file_path: self.version_file_path.clone(),
webhook_url: self.webhook_url.clone(),
currently_updating_index: self.currently_updating_index.clone(), currently_updating_index: self.currently_updating_index.clone(),
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
@ -475,6 +480,7 @@ impl IndexScheduler {
snapshots_path: options.snapshots_path, snapshots_path: options.snapshots_path,
auth_path: options.auth_path, auth_path: options.auth_path,
version_file_path: options.version_file_path, version_file_path: options.version_file_path,
webhook_url: options.webhook_url,
currently_updating_index: Arc::new(RwLock::new(None)), currently_updating_index: Arc::new(RwLock::new(None)),
#[cfg(test)] #[cfg(test)]
@ -1240,19 +1246,41 @@ impl IndexScheduler {
} }
} }
self.processing_tasks.write().unwrap().stop_processing(); let processed = self.processing_tasks.write().unwrap().stop_processing();
#[cfg(test)] #[cfg(test)]
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
wtxn.commit().map_err(Error::HeedTransaction)?; wtxn.commit().map_err(Error::HeedTransaction)?;
// We shouldn't crash the tick function if we can't send data to the webhook.
let _ = self.notify_webhook(&processed);
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::AfterProcessing); self.breakpoint(Breakpoint::AfterProcessing);
Ok(TickOutcome::TickAgain(processed_tasks)) Ok(TickOutcome::TickAgain(processed_tasks))
} }
/// Once the tasks changes have been commited we must send all the tasks that were updated to our webhook if there is one.
fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> {
if let Some(ref url) = self.webhook_url {
let rtxn = self.env.read_txn()?;
// on average a task takes ~50 bytes
let mut buffer = Vec::with_capacity(updated.len() as usize * 50);
for id in updated {
let task = self.get_task(&rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let _ = serde_json::to_writer(&mut buffer, &task);
}
let _ = ureq::post(url).send_bytes(&buffer);
}
Ok(())
}
/// Register a task to cleanup the task queue if needed /// Register a task to cleanup the task queue if needed
fn cleanup_task_queue(&self) -> Result<()> { fn cleanup_task_queue(&self) -> Result<()> {
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
@ -1632,6 +1660,7 @@ mod tests {
indexes_path: tempdir.path().join("indexes"), indexes_path: tempdir.path().join("indexes"),
snapshots_path: tempdir.path().join("snapshots"), snapshots_path: tempdir.path().join("snapshots"),
dumps_path: tempdir.path().join("dumps"), dumps_path: tempdir.path().join("dumps"),
webhook_url: None,
task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
enable_mdb_writemap: false, enable_mdb_writemap: false,

View File

@ -228,6 +228,7 @@ fn open_or_create_database_unchecked(
indexes_path: opt.db_path.join("indexes"), indexes_path: opt.db_path.join("indexes"),
snapshots_path: opt.snapshot_dir.clone(), snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(), dumps_path: opt.dump_dir.clone(),
webhook_url: opt.task_webhook_url.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize, task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize, index_base_map_size: opt.max_index_size.get_bytes() as usize,
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,