implement snapshot

This commit is contained in:
many
2020-07-02 16:20:45 +02:00
parent 1903302a74
commit 9014290875
7 changed files with 217 additions and 10 deletions

View File

@@ -27,6 +27,7 @@ bytes = "0.5.4"
chrono = { version = "0.4.11", features = ["serde"] }
crossbeam-channel = "0.4.2"
env_logger = "0.7.1"
flate2 = "1.0.16"
futures = "0.3.4"
http = "0.1.19"
indexmap = { version = "1.3.2", features = ["serde-1"] }
@@ -47,6 +48,8 @@ sha2 = "0.8.1"
siphasher = "0.3.2"
slice-group-by = "0.2.6"
structopt = "0.3.12"
tar = "0.4.29"
tempfile = "3.1.0"
tokio = { version = "0.2.18", features = ["macros"] }
ureq = { version = "0.12.0", features = ["tls"], default-features = false }
walkdir = "2.3.1"

View File

@@ -236,6 +236,18 @@ impl From<actix_http::Error> for Error {
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Error {
Error::Internal(err.to_string())
}
}
impl From<meilisearch_core::Error> for Error {
fn from(err: meilisearch_core::Error) -> Error {
Error::Internal(err.to_string())
}
}
impl From<FacetCountError> for ResponseError {
fn from(err: FacetCountError) -> ResponseError {
ResponseError { inner: Box::new(err) }

View File

@@ -7,6 +7,7 @@ pub mod models;
pub mod option;
pub mod routes;
pub mod analytics;
pub mod snapshot;
use actix_http::Error;
use actix_service::ServiceFactory;

View File

@@ -6,6 +6,7 @@ use main_error::MainError;
use meilisearch_http::helpers::NormalizePath;
use meilisearch_http::{create_app, index_update_callback, Data, Opt};
use structopt::StructOpt;
use meilisearch_http::snapshot;
mod analytics;
@@ -51,6 +52,10 @@ async fn main() -> Result<(), MainError> {
_ => unreachable!(),
}
if let Some(path) = &opt.load_from_snapshot {
snapshot::load_snapshot(&opt.db_path, path, opt.ignore_snapshot_if_db_exists, opt.ignore_missing_snapshot)?;
}
let data = Data::new(opt.clone())?;
if !opt.no_analytics {
@@ -64,6 +69,10 @@ async fn main() -> Result<(), MainError> {
index_update_callback(name, &data_cloned, status);
}));
if let Some(path) = &opt.snapshot_path {
snapshot::schedule_snapshot(data.clone(), &path, opt.snapshot_interval_sec)?;
}
print_launch_resume(&opt, &data);
let http_server = HttpServer::new(move || {

View File

@@ -1,7 +1,7 @@
use std::{error, fs};
use std::io::{BufReader, Read};
use std::path::PathBuf;
use std::sync::Arc;
use std::{error, fs};
use rustls::internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
use rustls::{
@@ -93,6 +93,28 @@ pub struct Opt {
/// SSL support tickets.
#[structopt(long, env = "MEILI_SSL_TICKETS")]
pub ssl_tickets: bool,
/// Defines the path of the snapshot file to import.
/// This option will, by default, stop the process if a database already exist or if no snapshot exists at
/// the given path. If this option is not specified no snapshot is imported.
#[structopt(long, env = "MEILI_LOAD_FROM_SNAPSHOT")]
pub load_from_snapshot: Option<PathBuf>,
/// The engine will ignore a missing snapshot and not return an error in such case.
#[structopt(long, requires = "load-from-snapshot", env = "MEILI_IGNORE_MISSING_SNAPSHOT")]
pub ignore_missing_snapshot: bool,
/// The engine will skip snapshot importation and not return an error in such case.
#[structopt(long, requires = "load-from-snapshot", env = "MEILI_IGNORE_SNAPSHOT_IF_DB_EXISTS")]
pub ignore_snapshot_if_db_exists: bool,
/// Defines the directory path where meilisearch will create snapshot each snapshot_time_gap.
#[structopt(long, env = "MEILI_SNAPSHOT_PATH")]
pub snapshot_path: Option<PathBuf>,
/// Defines time interval, in seconds, between each snapshot creation.
#[structopt(long, requires = "snapshot-path", default_value = "86400", env = "MEILI_SNAPSHOT_INTERVAL_SEC")]
pub snapshot_interval_sec: u64,
}
impl Opt {

View File

@@ -0,0 +1,124 @@
use crate::Data;
use crate::error::Error;
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use log::error;
use std::fs::{create_dir_all, File};
use std::io;
use std::path::Path;
use std::thread;
use std::time::{Duration};
use tar::{Builder, Archive};
use tempfile::TempDir;
fn pack(src: &Path, dest: &Path) -> io::Result<()> {
let f = File::create(dest)?;
let gz_encoder = GzEncoder::new(f, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", src)?;
let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?;
Ok(())
}
fn unpack(src: &Path, dest: &Path) -> Result<(), Error> {
let f = File::open(src)?;
let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz);
create_dir_all(dest)?;
ar.unpack(dest)?;
Ok(())
}
pub fn load_snapshot(
db_path: &str,
snapshot_path: &Path,
ignore_snapshot_if_db_exists: bool,
ignore_missing_snapshot: bool
) -> Result<(), Error> {
let db_path = Path::new(db_path);
if !db_path.exists() && snapshot_path.exists() {
unpack(snapshot_path, db_path)
} else if db_path.exists() && !ignore_snapshot_if_db_exists {
Err(Error::Internal(format!("database already exists at {:?}", db_path)))
} else if !snapshot_path.exists() && !ignore_missing_snapshot {
Err(Error::Internal(format!("snapshot doesn't exist at {:?}", snapshot_path)))
} else {
Ok(())
}
}
pub fn create_snapshot(data: &Data, snapshot_path: &Path) -> Result<(), Error> {
let tmp_dir = TempDir::new()?;
data.db.copy_and_compact_to_path(tmp_dir.path())?;
pack(tmp_dir.path(), snapshot_path).or_else(|e| Err(Error::Internal(format!("something went wrong during snapshot compression: {}", e))))
}
pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> {
if snapshot_dir.file_name().is_none() {
return Err(Error::Internal("invalid snapshot file path".to_string()));
}
let db_name = Path::new(&data.db_path).file_name().ok_or_else(|| Error::Internal("invalid database name".to_string()))?;
create_dir_all(snapshot_dir)?;
let snapshot_path = snapshot_dir.join(format!("{}.tar.gz", db_name.to_str().unwrap_or("data.ms")));
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(time_gap_s));
if let Err(e) = create_snapshot(&data, &snapshot_path) {
error!("Unsuccessful snapshot creation: {}", e);
}
});
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::prelude::*;
use std::fs;
#[test]
fn test_pack_unpack() {
let tempdir = TempDir::new().unwrap();
let test_dir = tempdir.path();
let src_dir = test_dir.join("src");
let dest_dir = test_dir.join("complex/destination/path/");
let archive_path = test_dir.join("archive.tar.gz");
let file_1_relative = Path::new("file1.txt");
let subfolder_relative = Path::new("subfolder/");
let file_2_relative = Path::new("subfolder/file2.txt");
create_dir_all(src_dir.join(subfolder_relative)).unwrap();
File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
assert!(pack(&src_dir, &archive_path).is_ok());
assert!(archive_path.exists());
assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok());
assert!(dest_dir.exists());
assert!(dest_dir.join(file_1_relative).exists());
assert!(dest_dir.join(subfolder_relative).exists());
assert!(dest_dir.join(file_2_relative).exists());
let contents = fs::read_to_string(dest_dir.join(file_1_relative)).unwrap();
assert_eq!(contents, "Hello_file_1");
let contents = fs::read_to_string(dest_dir.join(file_2_relative)).unwrap();
assert_eq!(contents, "Hello_file_2");
}
}