add json payload check on document addition

This commit is contained in:
mpostma
2021-03-16 14:09:01 +01:00
parent 0c17b166df
commit 204c743bcc
3 changed files with 42 additions and 4 deletions

View File

@@ -1,13 +1,15 @@
use std::collections::{hash_map::Entry, HashMap};
use std::io::SeekFrom;
use std::fs::{create_dir_all, remove_dir_all};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use super::index_actor::IndexActorHandle;
use log::info;
use oxidized_json_checker::JsonChecker;
use super::index_actor::IndexActorHandle;
use thiserror::Error;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncWriteExt, AsyncSeekExt};
use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid;
@@ -125,7 +127,11 @@ where
let update_store = self.store.get_or_create(uuid).await?;
let update_file_id = uuid::Uuid::new_v4();
let path = self.path.join(format!("update_{}", update_file_id));
let mut file = File::create(&path)
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
@@ -146,7 +152,31 @@ where
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
file.seek(SeekFrom::Start(0))
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let mut file = file.into_std().await;
tokio::task::spawn_blocking(move || {
use std::io::{BufReader, sink, copy, Seek};
// If the payload is empty, ignore the check.
if file.metadata().map_err(|e| UpdateError::Error(Box::new(e)))?.len() > 0 {
// Check that the json payload is valid:
let reader = BufReader::new(&mut file);
let mut checker = JsonChecker::new(reader);
if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() {
// The json file is invalid, we use Serde to get a nice error message:
file.seek(SeekFrom::Start(0))
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let _: serde_json::Value = serde_json::from_reader(file)
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
}
// The payload is valid, we can register it to the update store.
update_store
.register_update(meta, path, uuid)
.map(UpdateStatus::Pending)