mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-18 12:20:48 +00:00
Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
810dfdf656 | |||
f016652fca | |||
6c99ebe3fa | |||
94d357985f | |||
fbc698567a | |||
aa9db14c09 | |||
61e83a1c21 | |||
1316be5b09 | |||
4e8b0383dd | |||
4fa10753c1 | |||
2473e289e8 | |||
e0e5e87ed3 | |||
b13e61f40a | |||
c023cb3065 | |||
0a3d069fbc | |||
fa062ce2cf | |||
cdc6e47bf5 | |||
d5f44838be |
10
Cargo.toml
10
Cargo.toml
@ -1,13 +1,14 @@
|
|||||||
[package]
|
[package]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
name = "meilidb"
|
name = "meilidb"
|
||||||
version = "0.2.0"
|
version = "0.2.1"
|
||||||
authors = ["Kerollmops <renault.cle@gmail.com>"]
|
authors = ["Kerollmops <renault.cle@gmail.com>"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bincode = "1.0"
|
bincode = "1.0"
|
||||||
byteorder = "1.2"
|
byteorder = "1.2"
|
||||||
crossbeam = "0.6"
|
crossbeam = "0.6"
|
||||||
|
elapsed = "0.1"
|
||||||
fst = "0.3"
|
fst = "0.3"
|
||||||
hashbrown = { version = "0.1", features = ["serde"] }
|
hashbrown = { version = "0.1", features = ["serde"] }
|
||||||
lazy_static = "1.1"
|
lazy_static = "1.1"
|
||||||
@ -17,6 +18,7 @@ log = "0.4"
|
|||||||
sdset = "0.3"
|
sdset = "0.3"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
|
serde_json = { version = "1.0", features = ["preserve_order"] }
|
||||||
unidecode = "0.3"
|
unidecode = "0.3"
|
||||||
|
|
||||||
[dependencies.toml]
|
[dependencies.toml]
|
||||||
@ -41,7 +43,6 @@ nightly = ["hashbrown/nightly", "group-by/nightly"]
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
csv = "1.0"
|
csv = "1.0"
|
||||||
elapsed = "0.1"
|
|
||||||
env_logger = "0.6"
|
env_logger = "0.6"
|
||||||
jemallocator = "0.1"
|
jemallocator = "0.1"
|
||||||
quickcheck = "0.8"
|
quickcheck = "0.8"
|
||||||
@ -50,6 +51,11 @@ rand_xorshift = "0.1"
|
|||||||
structopt = "0.2"
|
structopt = "0.2"
|
||||||
tempfile = "3.0"
|
tempfile = "3.0"
|
||||||
termcolor = "1.0"
|
termcolor = "1.0"
|
||||||
|
warp = "0.1"
|
||||||
|
|
||||||
|
[dev-dependencies.chashmap]
|
||||||
|
git = "https://gitlab.redox-os.org/redox-os/tfs.git"
|
||||||
|
rev = "b3e7cae1"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
debug = true
|
debug = true
|
||||||
|
@ -49,7 +49,7 @@ MeiliDB runs with an index like most search engines.
|
|||||||
So to test the library you can create one by indexing a simple csv file.
|
So to test the library you can create one by indexing a simple csv file.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
cargo run --release --example create-database -- test.mdb misc/kaggle.csv --schema schema-example.toml --stop-words misc/fr.stopwords.txt
|
cargo run --release --example create-database -- test.mdb misc/kaggle.csv --schema schema-example.toml
|
||||||
```
|
```
|
||||||
|
|
||||||
Once the command is executed, the index should be in the `test.mdb` folder. You are now able to run the `query-database` example and play with MeiliDB.
|
Once the command is executed, the index should be in the `test.mdb` folder. You are now able to run the `query-database` example and play with MeiliDB.
|
||||||
|
435
examples/http-server.rs
Normal file
435
examples/http-server.rs
Normal file
@ -0,0 +1,435 @@
|
|||||||
|
#[global_allocator]
|
||||||
|
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
||||||
|
|
||||||
|
use log::{error, info};
|
||||||
|
use std::error::Error;
|
||||||
|
use std::ffi::OsStr;
|
||||||
|
use std::fmt;
|
||||||
|
use std::fs::{self, File};
|
||||||
|
use std::io::{self, BufRead, BufReader};
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::path::{PathBuf, Path};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::SystemTime;
|
||||||
|
|
||||||
|
use hashbrown::{HashMap, HashSet};
|
||||||
|
use chashmap::CHashMap;
|
||||||
|
use chashmap::ReadGuard;
|
||||||
|
use elapsed::measure_time;
|
||||||
|
use meilidb::database::Database;
|
||||||
|
use meilidb::database::UpdateBuilder;
|
||||||
|
use meilidb::database::schema::Schema;
|
||||||
|
use meilidb::database::schema::SchemaBuilder;
|
||||||
|
use meilidb::tokenizer::DefaultBuilder;
|
||||||
|
use serde_derive::Deserialize;
|
||||||
|
use serde_derive::Serialize;
|
||||||
|
use structopt::StructOpt;
|
||||||
|
use warp::{Rejection, Filter};
|
||||||
|
|
||||||
|
#[derive(Debug, StructOpt)]
|
||||||
|
pub struct Opt {
|
||||||
|
/// The destination where the database must be created.
|
||||||
|
#[structopt(parse(from_os_str))]
|
||||||
|
pub database_path: PathBuf,
|
||||||
|
|
||||||
|
/// The address and port to bind the server to.
|
||||||
|
#[structopt(short = "l", default_value = "127.0.0.1:8080")]
|
||||||
|
pub listen_addr: SocketAddr,
|
||||||
|
|
||||||
|
/// The path to the list of stop words (one by line).
|
||||||
|
#[structopt(long = "stop-words", parse(from_os_str))]
|
||||||
|
pub stop_words: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// ERRORS FOR THE MULTIDATABASE
|
||||||
|
//
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DatabaseError {
|
||||||
|
AlreadyExist,
|
||||||
|
NotExist,
|
||||||
|
NotFound(String),
|
||||||
|
Unknown(Box<Error>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for DatabaseError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
DatabaseError::AlreadyExist => write!(f, "File already exist"),
|
||||||
|
DatabaseError::NotExist => write!(f, "File not exist"),
|
||||||
|
DatabaseError::NotFound(ref name) => write!(f, "Database {} not found", name),
|
||||||
|
DatabaseError::Unknown(e) => write!(f, "{}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Error for DatabaseError {}
|
||||||
|
|
||||||
|
impl From<Box<Error>> for DatabaseError {
|
||||||
|
fn from(e: Box<Error>) -> DatabaseError {
|
||||||
|
DatabaseError::Unknown(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MULTIDATABASE DEFINITION
|
||||||
|
//
|
||||||
|
|
||||||
|
pub struct MultiDatabase {
|
||||||
|
databases: CHashMap<String, Database>,
|
||||||
|
db_path: PathBuf,
|
||||||
|
stop_words: HashSet<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MultiDatabase {
|
||||||
|
|
||||||
|
pub fn new(path: PathBuf, stop_words: HashSet<String>) -> MultiDatabase {
|
||||||
|
MultiDatabase {
|
||||||
|
databases: CHashMap::new(),
|
||||||
|
db_path: path,
|
||||||
|
stop_words: stop_words
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create(&self, name: String, schema: Schema) -> Result<(), DatabaseError> {
|
||||||
|
let rdb_name = format!("{}.mdb", name);
|
||||||
|
let database_path = self.db_path.join(rdb_name);
|
||||||
|
|
||||||
|
if database_path.exists() {
|
||||||
|
return Err(DatabaseError::AlreadyExist.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let index = Database::create(database_path, &schema)?;
|
||||||
|
|
||||||
|
self.databases.insert_new(name, index);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn load(&self, name: String) -> Result<(), DatabaseError> {
|
||||||
|
let rdb_name = format!("{}.mdb", name);
|
||||||
|
let index_path = self.db_path.join(rdb_name);
|
||||||
|
|
||||||
|
if !index_path.exists() {
|
||||||
|
return Err(DatabaseError::NotExist.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let index = Database::open(index_path)?;
|
||||||
|
|
||||||
|
self.databases.insert_new(name, index);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn load_existing(&self) {
|
||||||
|
let paths = match fs::read_dir(self.db_path.clone()){
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
error!("{}", e);
|
||||||
|
return
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for path in paths {
|
||||||
|
let path = match path {
|
||||||
|
Ok(p) => p.path(),
|
||||||
|
Err(_) => continue
|
||||||
|
};
|
||||||
|
|
||||||
|
let path_str = match path.to_str() {
|
||||||
|
Some(p) => p,
|
||||||
|
None => continue
|
||||||
|
};
|
||||||
|
|
||||||
|
let extension = match get_extension_from_path(path_str) {
|
||||||
|
Some(e) => e,
|
||||||
|
None => continue
|
||||||
|
};
|
||||||
|
|
||||||
|
if extension != "mdb" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
let name = match get_file_name_from_path(path_str) {
|
||||||
|
Some(f) => f,
|
||||||
|
None => continue
|
||||||
|
};
|
||||||
|
|
||||||
|
let db = match Database::open(path.clone()) {
|
||||||
|
Ok(db) => db,
|
||||||
|
Err(_) => continue
|
||||||
|
};
|
||||||
|
|
||||||
|
self.databases.insert_new(name.to_string(), db);
|
||||||
|
info!("Load database {}", name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_or_load(&self, name: String, schema: Schema) -> Result<(), DatabaseError> {
|
||||||
|
match self.create(name.clone(), schema) {
|
||||||
|
Err(DatabaseError::AlreadyExist) => self.load(name),
|
||||||
|
x => x,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, name: String) -> Result<ReadGuard<String, Database>, Box<Error>> {
|
||||||
|
Ok(self.databases.get(&name).ok_or(DatabaseError::NotFound(name))?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_extension_from_path(path: &str) -> Option<&str> {
|
||||||
|
Path::new(path).extension().and_then(OsStr::to_str)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_file_name_from_path(path: &str) -> Option<&str> {
|
||||||
|
Path::new(path).file_stem().and_then(OsStr::to_str)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn retrieve_stop_words(path: &Path) -> io::Result<HashSet<String>> {
|
||||||
|
let f = File::open(path)?;
|
||||||
|
let reader = BufReader::new(f);
|
||||||
|
let mut words = HashSet::new();
|
||||||
|
|
||||||
|
for line in reader.lines() {
|
||||||
|
let line = line?;
|
||||||
|
let word = line.trim().to_string();
|
||||||
|
words.insert(word);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(words)
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// PARAMS & BODY FOR HTTPS HANDLERS
|
||||||
|
//
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct CreateBody {
|
||||||
|
name: String,
|
||||||
|
schema: SchemaBuilder,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct IngestBody {
|
||||||
|
insert: Option<Vec<HashMap<String, String>>>,
|
||||||
|
delete: Option<Vec<HashMap<String, String>>>
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct IngestResponse {
|
||||||
|
inserted: usize,
|
||||||
|
deleted: usize
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct SearchQuery {
|
||||||
|
q: String,
|
||||||
|
limit: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// HTTP ROUTES
|
||||||
|
//
|
||||||
|
|
||||||
|
// Create a new index.
|
||||||
|
// The index name should be unused and the schema valid.
|
||||||
|
//
|
||||||
|
// POST /create
|
||||||
|
// Body:
|
||||||
|
// - name: String
|
||||||
|
// - schema: JSON
|
||||||
|
// - stopwords: Vec<String>
|
||||||
|
fn create(body: CreateBody, db: Arc<MultiDatabase>) -> Result<String, Rejection> {
|
||||||
|
let schema = body.schema.build();
|
||||||
|
|
||||||
|
match db.create(body.name.clone(), schema) {
|
||||||
|
Ok(_) => Ok(format!("{} created ", body.name)),
|
||||||
|
Err(e) => {
|
||||||
|
error!("{:?}", e);
|
||||||
|
return Err(warp::reject::not_found())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ingest new document.
|
||||||
|
// It's possible to have positive or/and negative updates.
|
||||||
|
//
|
||||||
|
// PUT /:name/ingest
|
||||||
|
// Body:
|
||||||
|
// - insert: Option<Vec<JSON>>
|
||||||
|
// - delete: Option<Vec<String>>
|
||||||
|
fn ingest(index_name: String, body: IngestBody, db: Arc<MultiDatabase>) -> Result<String, Rejection> {
|
||||||
|
|
||||||
|
let schema = {
|
||||||
|
let index = match db.get(index_name.clone()){
|
||||||
|
Ok(i) => i,
|
||||||
|
Err(_) => return Err(warp::reject::not_found()),
|
||||||
|
};
|
||||||
|
let view = index.view();
|
||||||
|
|
||||||
|
view.schema().clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let tokenizer_builder = DefaultBuilder::new();
|
||||||
|
let now = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
|
||||||
|
Ok(n) => n.as_secs(),
|
||||||
|
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let sst_name = format!("update-{}-{}.sst", index_name, now);
|
||||||
|
let sst_path = db.db_path.join(sst_name);
|
||||||
|
|
||||||
|
let mut response = IngestResponse{inserted: 0, deleted: 0};
|
||||||
|
let mut update = UpdateBuilder::new(sst_path, schema);
|
||||||
|
|
||||||
|
if let Some(documents) = body.delete {
|
||||||
|
for doc in documents {
|
||||||
|
if let Err(e) = update.remove_document(doc) {
|
||||||
|
error!("Impossible to remove document; {:?}", e);
|
||||||
|
} else {
|
||||||
|
response.deleted += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let stop_words = &db.stop_words;
|
||||||
|
if let Some(documents) = body.insert {
|
||||||
|
for doc in documents {
|
||||||
|
if let Err(e) = update.update_document(doc, &tokenizer_builder, &stop_words) {
|
||||||
|
error!("Impossible to update document; {:?}", e);
|
||||||
|
} else {
|
||||||
|
response.inserted += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
let update = match update.build() {
|
||||||
|
Ok(u) => u,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Impossible to create an update file; {:?}", e);
|
||||||
|
return Err(warp::reject::not_found())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
let index = match db.get(index_name.clone()){
|
||||||
|
Ok(i) => i,
|
||||||
|
Err(_) => return Err(warp::reject::not_found()),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = index.ingest_update_file(update) {
|
||||||
|
error!("Impossible to ingest sst file; {:?}", e);
|
||||||
|
return Err(warp::reject::not_found())
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(response) = serde_json::to_string(&response) {
|
||||||
|
return Ok(response);
|
||||||
|
};
|
||||||
|
|
||||||
|
return Err(warp::reject::not_found())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search in a specific index
|
||||||
|
// The default limit is 20
|
||||||
|
//
|
||||||
|
// GET /:name/search
|
||||||
|
// Params:
|
||||||
|
// - query: String
|
||||||
|
// - limit: Option<usize>
|
||||||
|
fn search(index_name: String, query: SearchQuery, db: Arc<MultiDatabase>) -> Result<String, Rejection> {
|
||||||
|
|
||||||
|
let view = {
|
||||||
|
let index = match db.get(index_name.clone()){
|
||||||
|
Ok(i) => i,
|
||||||
|
Err(_) => return Err(warp::reject::not_found()),
|
||||||
|
};
|
||||||
|
index.view()
|
||||||
|
};
|
||||||
|
|
||||||
|
let limit = query.limit.unwrap_or(20);
|
||||||
|
|
||||||
|
let query_builder = match view.query_builder() {
|
||||||
|
Ok(q) => q,
|
||||||
|
Err(_err) => return Err(warp::reject::not_found()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let (time, responses) = measure_time(|| {
|
||||||
|
let docs = query_builder.query(&query.q, 0..limit);
|
||||||
|
let mut results: Vec<HashMap<String, String>> = Vec::with_capacity(limit);
|
||||||
|
for doc in docs {
|
||||||
|
match view.document_by_id(doc.id) {
|
||||||
|
Ok(val) => results.push(val),
|
||||||
|
Err(e) => println!("{:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results
|
||||||
|
});
|
||||||
|
|
||||||
|
let response = match serde_json::to_string(&responses) {
|
||||||
|
Ok(val) => val,
|
||||||
|
Err(err) => format!("{:?}", err),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("index: {} - search: {:?} - limit: {} - time: {}", index_name, query.q, limit, time);
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_server(listen_addr: SocketAddr, db: Arc<MultiDatabase>) {
|
||||||
|
let index_path = warp::path("index").and(warp::path::param::<String>());
|
||||||
|
let db = warp::any().map(move || db.clone());
|
||||||
|
|
||||||
|
let create_path = warp::path("create").and(warp::path::end());
|
||||||
|
let ingest_path = index_path.and(warp::path("ingest")).and(warp::path::end());
|
||||||
|
let search_path = index_path.and(warp::path("search")).and(warp::path::end());
|
||||||
|
|
||||||
|
let create = warp::post2()
|
||||||
|
.and(create_path)
|
||||||
|
.and(warp::body::json())
|
||||||
|
.and(db.clone())
|
||||||
|
.and_then(create);
|
||||||
|
|
||||||
|
let ingest = warp::put2()
|
||||||
|
.and(ingest_path)
|
||||||
|
.and(warp::body::json())
|
||||||
|
.and(db.clone())
|
||||||
|
.and_then(ingest);
|
||||||
|
|
||||||
|
let search = warp::get2()
|
||||||
|
.and(search_path)
|
||||||
|
.and(warp::query())
|
||||||
|
.and(db.clone())
|
||||||
|
.and_then(search);
|
||||||
|
|
||||||
|
let api = create
|
||||||
|
.or(ingest)
|
||||||
|
.or(search);
|
||||||
|
|
||||||
|
let logs = warp::log("server");
|
||||||
|
let headers = warp::reply::with::header("Content-Type", "application/json");
|
||||||
|
|
||||||
|
let routes = api.with(logs).with(headers);
|
||||||
|
|
||||||
|
info!("Server is started on {}", listen_addr);
|
||||||
|
warp::serve(routes).run(listen_addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
env_logger::init();
|
||||||
|
let opt = Opt::from_args();
|
||||||
|
|
||||||
|
let stop_words = match retrieve_stop_words(&opt.stop_words) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(_) => HashSet::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let db = Arc::new(MultiDatabase::new(opt.database_path.clone(), stop_words));
|
||||||
|
|
||||||
|
db.load_existing();
|
||||||
|
|
||||||
|
start_server(opt.listen_addr, db);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -7,7 +7,7 @@ use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamil
|
|||||||
use rocksdb::rocksdb::{Writable, Snapshot};
|
use rocksdb::rocksdb::{Writable, Snapshot};
|
||||||
use rocksdb::{DB, DBVector, MergeOperands};
|
use rocksdb::{DB, DBVector, MergeOperands};
|
||||||
use crossbeam::atomic::ArcCell;
|
use crossbeam::atomic::ArcCell;
|
||||||
use log::debug;
|
use log::info;
|
||||||
|
|
||||||
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
|
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
|
||||||
pub use self::view::{DatabaseView, DocumentIter};
|
pub use self::view::{DatabaseView, DocumentIter};
|
||||||
@ -39,10 +39,18 @@ where D: Deref<Target=DB>
|
|||||||
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<Index, Box<Error>>
|
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<Index, Box<Error>>
|
||||||
where D: Deref<Target=DB>
|
where D: Deref<Target=DB>
|
||||||
{
|
{
|
||||||
let index = match snapshot.get(DATA_INDEX)? {
|
let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_INDEX));
|
||||||
|
info!("loading index from kv-store took {}", elapsed);
|
||||||
|
|
||||||
|
let index = match vector? {
|
||||||
Some(vector) => {
|
Some(vector) => {
|
||||||
let bytes = vector.as_ref().to_vec();
|
let bytes = vector.as_ref().to_vec();
|
||||||
Index::from_bytes(bytes)?
|
info!("index size if {} MiB", bytes.len() / 1024 / 1024);
|
||||||
|
|
||||||
|
let (elapsed, index) = elapsed::measure_time(|| Index::from_bytes(bytes));
|
||||||
|
info!("loading index from bytes took {}", elapsed);
|
||||||
|
index?
|
||||||
|
|
||||||
},
|
},
|
||||||
None => Index::default(),
|
None => Index::default(),
|
||||||
};
|
};
|
||||||
@ -148,14 +156,12 @@ impl Database {
|
|||||||
let options = IngestExternalFileOptions::new();
|
let options = IngestExternalFileOptions::new();
|
||||||
// options.move_files(move_update);
|
// options.move_files(move_update);
|
||||||
|
|
||||||
debug!("ingest update file");
|
let (elapsed, result) = elapsed::measure_time(|| {
|
||||||
let cf_handle = db.cf_handle("default").expect("\"default\" column family not found");
|
let cf_handle = db.cf_handle("default").expect("\"default\" column family not found");
|
||||||
db.ingest_external_file_optimized(&cf_handle, &options, &[&path])?;
|
db.ingest_external_file_optimized(&cf_handle, &options, &[&path])
|
||||||
|
});
|
||||||
debug!("compacting index range");
|
let _ = result?;
|
||||||
// Compacting to trigger the merge operator only one time
|
info!("ingesting update file took {}", elapsed);
|
||||||
// while ingesting the update and not each time searching
|
|
||||||
db.compact_range(Some(DATA_INDEX), Some(DATA_INDEX));
|
|
||||||
|
|
||||||
Snapshot::new(db.clone())
|
Snapshot::new(db.clone())
|
||||||
};
|
};
|
||||||
|
@ -113,6 +113,23 @@ impl Schema {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn from_json<R: Read>(mut reader: R) -> Result<Schema, Box<Error>> {
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
reader.read_to_end(&mut buffer)?;
|
||||||
|
let builder: SchemaBuilder = serde_json::from_slice(&buffer)?;
|
||||||
|
Ok(builder.build())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_json<W: Write>(&self, mut writer: W) -> Result<(), Box<Error>> {
|
||||||
|
let identifier = self.inner.identifier.clone();
|
||||||
|
let attributes = self.attributes_ordered();
|
||||||
|
let builder = SchemaBuilder { identifier, attributes };
|
||||||
|
let string = serde_json::to_string_pretty(&builder)?;
|
||||||
|
writer.write_all(string.as_bytes())?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn read_from_bin<R: Read>(reader: R) -> bincode::Result<Schema> {
|
pub(crate) fn read_from_bin<R: Read>(reader: R) -> bincode::Result<Schema> {
|
||||||
let builder: SchemaBuilder = bincode::deserialize_from(reader)?;
|
let builder: SchemaBuilder = bincode::deserialize_from(reader)?;
|
||||||
Ok(builder.build())
|
Ok(builder.build())
|
||||||
@ -254,4 +271,40 @@ mod tests {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn serialize_deserialize_json() -> Result<(), Box<Error>> {
|
||||||
|
let mut builder = SchemaBuilder::with_identifier("id");
|
||||||
|
builder.new_attribute("alpha", STORED);
|
||||||
|
builder.new_attribute("beta", STORED | INDEXED);
|
||||||
|
builder.new_attribute("gamma", INDEXED);
|
||||||
|
let schema = builder.build();
|
||||||
|
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
schema.to_json(&mut buffer)?;
|
||||||
|
|
||||||
|
let schema2 = Schema::from_json(buffer.as_slice())?;
|
||||||
|
assert_eq!(schema, schema2);
|
||||||
|
|
||||||
|
let data = r#"
|
||||||
|
{
|
||||||
|
"identifier": "id",
|
||||||
|
"attributes": {
|
||||||
|
"alpha": {
|
||||||
|
"stored": true
|
||||||
|
},
|
||||||
|
"beta": {
|
||||||
|
"stored": true,
|
||||||
|
"indexed": true
|
||||||
|
},
|
||||||
|
"gamma": {
|
||||||
|
"indexed": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}"#;
|
||||||
|
let schema2 = Schema::from_json(data.as_bytes())?;
|
||||||
|
assert_eq!(schema, schema2);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,6 +93,7 @@ where D: Deref<Target=DB>,
|
|||||||
op_builder.union()
|
op_builder.union()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut number_matches = 0;
|
||||||
let mut matches = HashMap::new();
|
let mut matches = HashMap::new();
|
||||||
|
|
||||||
while let Some((input, indexed_values)) = stream.next() {
|
while let Some((input, indexed_values)) = stream.next() {
|
||||||
@ -104,6 +105,7 @@ where D: Deref<Target=DB>,
|
|||||||
let doc_indexes = &self.view.index().positive.indexes();
|
let doc_indexes = &self.view.index().positive.indexes();
|
||||||
let doc_indexes = &doc_indexes[iv.value as usize];
|
let doc_indexes = &doc_indexes[iv.value as usize];
|
||||||
|
|
||||||
|
number_matches += doc_indexes.len();
|
||||||
for doc_index in doc_indexes {
|
for doc_index in doc_indexes {
|
||||||
let match_ = Match {
|
let match_ = Match {
|
||||||
query_index: iv.index as u32,
|
query_index: iv.index as u32,
|
||||||
@ -117,7 +119,8 @@ where D: Deref<Target=DB>,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("{} documents to classify", matches.len());
|
info!("{} total documents to classify", matches.len());
|
||||||
|
info!("{} total matches to classify", number_matches);
|
||||||
|
|
||||||
matches.into_iter().map(|(i, m)| Document::from_matches(i, m)).collect()
|
matches.into_iter().map(|(i, m)| Document::from_matches(i, m)).collect()
|
||||||
}
|
}
|
||||||
@ -135,15 +138,19 @@ where D: Deref<Target=DB>,
|
|||||||
return builder.query(query, range);
|
return builder.query(query, range);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut documents = self.query_all(query);
|
let (elapsed, mut documents) = elapsed::measure_time(|| self.query_all(query));
|
||||||
|
info!("query_all took {}", elapsed);
|
||||||
|
|
||||||
let mut groups = vec![documents.as_mut_slice()];
|
let mut groups = vec![documents.as_mut_slice()];
|
||||||
let view = &self.view;
|
let view = &self.view;
|
||||||
|
|
||||||
'criteria: for criterion in self.criteria.as_ref() {
|
'criteria: for (ci, criterion) in self.criteria.as_ref().iter().enumerate() {
|
||||||
let tmp_groups = mem::replace(&mut groups, Vec::new());
|
let tmp_groups = mem::replace(&mut groups, Vec::new());
|
||||||
let mut documents_seen = 0;
|
let mut documents_seen = 0;
|
||||||
|
|
||||||
for group in tmp_groups {
|
for group in tmp_groups {
|
||||||
|
info!("criterion {}, documents group of size {}", ci, group.len());
|
||||||
|
|
||||||
// if this group does not overlap with the requested range,
|
// if this group does not overlap with the requested range,
|
||||||
// push it without sorting and splitting it
|
// push it without sorting and splitting it
|
||||||
if documents_seen + group.len() < range.start {
|
if documents_seen + group.len() < range.start {
|
||||||
@ -152,7 +159,10 @@ where D: Deref<Target=DB>,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
group.sort_unstable_by(|a, b| criterion.evaluate(a, b, view));
|
let (elapsed, ()) = elapsed::measure_time(|| {
|
||||||
|
group.sort_unstable_by(|a, b| criterion.evaluate(a, b, view));
|
||||||
|
});
|
||||||
|
info!("criterion {} sort took {}", ci, elapsed);
|
||||||
|
|
||||||
for group in BinaryGroupByMut::new(group, |a, b| criterion.eq(a, b, view)) {
|
for group in BinaryGroupByMut::new(group, |a, b| criterion.eq(a, b, view)) {
|
||||||
documents_seen += group.len();
|
documents_seen += group.len();
|
||||||
|
Reference in New Issue
Block a user