Allow users to abort pending updates, one by one or all at once

This commit is contained in:
Clément Renault
2020-11-29 12:23:52 +01:00
committed by Kerollmops
parent e4c2abb1d9
commit 58a1f9081c
4 changed files with 175 additions and 22 deletions

View File

@@ -78,6 +78,12 @@ $(window).on('load', function () {
const content = $(`#${id} .updateStatus.content`); const content = $(`#${id} .updateStatus.content`);
content.html('processed ' + JSON.stringify(status.meta)); content.html('processed ' + JSON.stringify(status.meta));
} }
if (status.type == "Aborted") {
const id = 'update-' + status.update_id;
const content = $(`#${id} .updateStatus.content`);
content.html('aborted ' + JSON.stringify(status.meta));
}
} }
}); });

View File

@@ -189,6 +189,18 @@ enum UpdateStatus<M, P, N> {
Pending { update_id: u64, meta: M }, Pending { update_id: u64, meta: M },
Progressing { update_id: u64, meta: P }, Progressing { update_id: u64, meta: P },
Processed { update_id: u64, meta: N }, Processed { update_id: u64, meta: N },
Aborted { update_id: u64, meta: M },
}
impl<M, P, N> UpdateStatus<M, P, N> {
fn update_id(&self) -> u64 {
match self {
UpdateStatus::Pending { update_id, .. } => *update_id,
UpdateStatus::Progressing { update_id, .. } => *update_id,
UpdateStatus::Processed { update_id, .. } => *update_id,
UpdateStatus::Aborted { update_id, .. } => *update_id,
}
}
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -473,12 +485,16 @@ async fn main() -> anyhow::Result<()> {
.and(warp::path!("updates")) .and(warp::path!("updates"))
.map(move |header: String| { .map(move |header: String| {
let update_store = update_store_cloned.clone(); let update_store = update_store_cloned.clone();
let mut updates = update_store.iter_metas(|processed, pending| { let mut updates = update_store.iter_metas(|processed, aborted, pending| {
let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new(); let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new();
for result in processed { for result in processed {
let (uid, meta) = result?; let (uid, meta) = result?;
updates.push(UpdateStatus::Processed { update_id: uid.get(), meta }); updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
} }
for result in aborted {
let (uid, meta) = result?;
updates.push(UpdateStatus::Aborted { update_id: uid.get(), meta });
}
for result in pending { for result in pending {
let (uid, meta) = result?; let (uid, meta) = result?;
updates.push(UpdateStatus::Pending { update_id: uid.get(), meta }); updates.push(UpdateStatus::Pending { update_id: uid.get(), meta });
@@ -486,9 +502,9 @@ async fn main() -> anyhow::Result<()> {
Ok(updates) Ok(updates)
}).unwrap(); }).unwrap();
if header.contains("text/html") { updates.sort_unstable_by(|s1, s2| s1.update_id().cmp(&s2.update_id()).reverse());
updates.reverse();
if header.contains("text/html") {
// We retrieve the database size. // We retrieve the database size.
let db_size = File::open(lmdb_path_cloned.clone()) let db_size = File::open(lmdb_path_cloned.clone())
.unwrap() .unwrap()
@@ -798,6 +814,31 @@ async fn main() -> anyhow::Result<()> {
warp::reply() warp::reply()
}); });
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let abort_update_id_route = warp::filters::method::delete()
.and(warp::path!("update" / u64))
.map(move |update_id: u64| {
if let Some(meta) = update_store_cloned.abort_update(update_id).unwrap() {
let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta });
eprintln!("update {} aborted", update_id);
}
warp::reply()
});
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let abort_pending_updates_route = warp::filters::method::delete()
.and(warp::path!("updates"))
.map(move || {
let updates = update_store_cloned.abort_pendings().unwrap();
for (update_id, meta) in updates {
let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta });
eprintln!("update {} aborted", update_id);
}
warp::reply()
});
let update_ws_route = warp::ws() let update_ws_route = warp::ws()
.and(warp::path!("updates" / "ws")) .and(warp::path!("updates" / "ws"))
.map(move |ws: warp::ws::Ws| { .map(move |ws: warp::ws::Ws| {
@@ -844,6 +885,8 @@ async fn main() -> anyhow::Result<()> {
.or(indexing_csv_route) .or(indexing_csv_route)
.or(indexing_json_route) .or(indexing_json_route)
.or(indexing_json_stream_route) .or(indexing_json_stream_route)
.or(abort_update_id_route)
.or(abort_pending_updates_route)
.or(clearing_route) .or(clearing_route)
.or(change_settings_route) .or(change_settings_route)
.or(change_facet_levels_route) .or(change_facet_levels_route)

View File

@@ -72,6 +72,15 @@
</li> </li>
</ol> </ol>
</li> </li>
{% when UpdateStatus::Aborted with { update_id, meta } %}
<li id="update-{{ update_id }}" class="document">
<ol>
<li class="field">
<div class="attribute">update id</div><div class="updateId content">{{ update_id }}</div>
<div class="attribute">update status</div><div class="updateStatus content">aborted</div>
</li>
</ol>
</li>
{% else %} {% else %}
{% endmatch %} {% endmatch %}
{% endfor %} {% endfor %}

View File

@@ -1,5 +1,6 @@
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice};
@@ -14,7 +15,9 @@ pub struct UpdateStore<M, N> {
pending_meta: Database<OwnedType<BEU64>, SerdeJson<M>>, pending_meta: Database<OwnedType<BEU64>, SerdeJson<M>>,
pending: Database<OwnedType<BEU64>, ByteSlice>, pending: Database<OwnedType<BEU64>, ByteSlice>,
processed_meta: Database<OwnedType<BEU64>, SerdeJson<N>>, processed_meta: Database<OwnedType<BEU64>, SerdeJson<N>>,
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<M>>,
notification_sender: Sender<()>, notification_sender: Sender<()>,
processing_update_id: Arc<AtomicU64>,
} }
impl<M: 'static, N: 'static> UpdateStore<M, N> { impl<M: 'static, N: 'static> UpdateStore<M, N> {
@@ -29,11 +32,12 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
M: for<'a> Deserialize<'a>, M: for<'a> Deserialize<'a>,
N: Serialize, N: Serialize,
{ {
options.max_dbs(3); options.max_dbs(4);
let env = options.open(path)?; let env = options.open(path)?;
let pending_meta = env.create_database(Some("pending-meta"))?; let pending_meta = env.create_database(Some("pending-meta"))?;
let pending = env.create_database(Some("pending"))?; let pending = env.create_database(Some("pending"))?;
let processed_meta = env.create_database(Some("processed-meta"))?; let processed_meta = env.create_database(Some("processed-meta"))?;
let aborted_meta = env.create_database(Some("aborted-meta"))?;
let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1);
// Send a first notification to trigger the process. // Send a first notification to trigger the process.
@@ -44,7 +48,9 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
pending, pending,
pending_meta, pending_meta,
processed_meta, processed_meta,
aborted_meta,
notification_sender, notification_sender,
processing_update_id: Arc::new(AtomicU64::new(u64::MAX)),
}); });
let update_store_cloned = update_store.clone(); let update_store_cloned = update_store.clone();
@@ -67,20 +73,27 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
/// Returns the new biggest id to use to store the new update. /// Returns the new biggest id to use to store the new update.
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> { fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
let last_pending = self.pending_meta let last_pending = self.pending_meta
.as_polymorph() .remap_data_type::<DecodeIgnore>()
.last::<_, OwnedType<BEU64>, DecodeIgnore>(txn)? .last(txn)?
.map(|(k, _)| k.get()); .map(|(k, _)| k.get());
if let Some(last_id) = last_pending {
return Ok(last_id + 1);
}
let last_processed = self.processed_meta let last_processed = self.processed_meta
.as_polymorph() .remap_data_type::<DecodeIgnore>()
.last::<_, OwnedType<BEU64>, DecodeIgnore>(txn)? .last(txn)?
.map(|(k, _)| k.get()); .map(|(k, _)| k.get());
match last_processed { let last_aborted = self.aborted_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
let last_update_id = [last_pending, last_processed, last_aborted]
.iter()
.copied()
.flatten()
.max();
match last_update_id {
Some(last_id) => Ok(last_id + 1), Some(last_id) => Ok(last_id + 1),
None => Ok(0), None => Ok(0),
} }
@@ -134,7 +147,10 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
.expect("associated update content"); .expect("associated update content");
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let new_meta = (f)(first_id.get(), first_meta, first_content)?; self.processing_update_id.store(first_id.get(), Ordering::Relaxed);
let result = (f)(first_id.get(), first_meta, first_content);
self.processing_update_id.store(u64::MAX, Ordering::Relaxed);
let new_meta = result?;
drop(rtxn); drop(rtxn);
// Once the pending update have been successfully processed // Once the pending update have been successfully processed
@@ -152,8 +168,18 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
} }
} }
/// Execute the user defined function with both meta-store iterators, the first /// The id of the update tha is currently being processed,
/// iterator is the *processed* meta one and the secind is the *pending* meta one. /// `None` if no update is being processed.
pub fn processing_update_id(&self) -> Option<u64> {
match self.processing_update_id.load(Ordering::Relaxed) {
u64::MAX => None,
update_id => Some(update_id),
}
}
/// Execute the user defined function with the meta-store iterators, the first
/// iterator is the *processed* meta one, the second the *aborted* meta one
/// and, the last is the *pending* meta one.
pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T> pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T>
where where
M: for<'a> Deserialize<'a>, M: for<'a> Deserialize<'a>,
@@ -161,19 +187,21 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
F: for<'a> FnMut( F: for<'a> FnMut(
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<N>>, heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<N>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<M>>, heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<M>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<M>>,
) -> heed::Result<T>, ) -> heed::Result<T>,
{ {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
// We get both the pending and processed meta iterators. // We get the pending, processed and aborted meta iterators.
let processed_iter = self.processed_meta.iter(&rtxn)?; let processed_iter = self.processed_meta.iter(&rtxn)?;
let aborted_iter = self.aborted_meta.iter(&rtxn)?;
let pending_iter = self.pending_meta.iter(&rtxn)?; let pending_iter = self.pending_meta.iter(&rtxn)?;
// We execute the user defined function with both iterators. // We execute the user defined function with both iterators.
(f)(processed_iter, pending_iter) (f)(processed_iter, aborted_iter, pending_iter)
} }
/// Returns the update associated meta or `None` if the update deosn't exist. /// Returns the update associated meta or `None` if the update doesn't exist.
pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatusMeta<M, N>>> pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatusMeta<M, N>>>
where where
M: for<'a> Deserialize<'a>, M: for<'a> Deserialize<'a>,
@@ -186,10 +214,76 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
return Ok(Some(UpdateStatusMeta::Pending(meta))); return Ok(Some(UpdateStatusMeta::Pending(meta)));
} }
match self.processed_meta.get(&rtxn, &key)? { if let Some(meta) = self.processed_meta.get(&rtxn, &key)? {
Some(meta) => Ok(Some(UpdateStatusMeta::Processed(meta))), return Ok(Some(UpdateStatusMeta::Processed(meta)));
None => Ok(None),
} }
if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? {
return Ok(Some(UpdateStatusMeta::Aborted(meta)));
}
Ok(None)
}
/// Aborts an update, an aborted update content is deleted and
/// the meta of it is moved into the aborted updates database.
///
/// Trying to abort an update that is currently being processed, an update
/// that as already been processed or which doesn't actually exist, will
/// return `None`.
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<M>>
where M: Serialize + for<'a> Deserialize<'a>,
{
let mut wtxn = self.env.write_txn()?;
let key = BEU64::new(update_id);
// We cannot abort an update that is currently being processed.
if self.processing_update_id() == Some(update_id) {
return Ok(None);
}
let meta = match self.pending_meta.get(&wtxn, &key)? {
Some(meta) => meta,
None => return Ok(None),
};
self.aborted_meta.put(&mut wtxn, &key, &meta)?;
self.pending_meta.delete(&mut wtxn, &key)?;
self.pending.delete(&mut wtxn, &key)?;
wtxn.commit()?;
Ok(Some(meta))
}
/// Aborts all the pending updates, and not the one being currently processed.
/// Returns the update metas and ids that were successfully aborted.
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, M)>>
where M: Serialize + for<'a> Deserialize<'a>,
{
let mut wtxn = self.env.write_txn()?;
let processing_update_id = self.processing_update_id();
let mut aborted_updates = Vec::new();
for result in self.pending_meta.iter(&wtxn)? {
let (key, meta) = result?;
let id = key.get();
if processing_update_id == Some(id) {
continue;
}
aborted_updates.push((id, meta));
}
for (id, meta) in &aborted_updates {
let key = BEU64::new(*id);
self.aborted_meta.put(&mut wtxn, &key, &meta)?;
self.pending_meta.delete(&mut wtxn, &key)?;
self.pending.delete(&mut wtxn, &key)?;
}
wtxn.commit()?;
Ok(aborted_updates)
} }
} }
@@ -197,6 +291,7 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
pub enum UpdateStatusMeta<M, N> { pub enum UpdateStatusMeta<M, N> {
Pending(M), Pending(M),
Processed(N), Processed(N),
Aborted(M),
} }
#[cfg(test)] #[cfg(test)]