Compare commits

...

18 Commits
v0.2 ... v0.2.1

Author SHA1 Message Date
810dfdf656 Merge pull request #90 from Kerollmops/version-bump
Bump version to 0.2.1
2019-01-25 17:08:53 +01:00
f016652fca chore: Bump version to 0.2.1 2019-01-25 16:41:08 +01:00
6c99ebe3fa Merge pull request #89 from Kerollmops/no-more-compaction
Remove the manual compaction triggering
2019-01-25 16:40:08 +01:00
94d357985f feat: Remove the manual compaction triggering 2019-01-25 16:05:56 +01:00
fbc698567a Merge pull request #87 from Kerollmops/measure-index-loading
Display index loading times
2019-01-24 14:07:11 +01:00
aa9db14c09 chore: Display index loading times 2019-01-23 11:19:44 +01:00
61e83a1c21 Merge pull request #86 from Kerollmops/measure-indexation
Display timings of indexation operations
2019-01-16 13:32:44 +01:00
1316be5b09 chore: Display timings of indexation operations 2019-01-16 11:45:33 +01:00
4e8b0383dd Merge pull request #85 from Kerollmops/debug-more-stats
Display more stats infos
2019-01-15 14:20:28 +01:00
4fa10753c1 chore: Display more stats infos 2019-01-14 21:18:46 +01:00
2473e289e8 Merge pull request #84 from qdequele/create-server-example
Example HTTP server example can use stopwords
2019-01-14 18:55:58 +01:00
e0e5e87ed3 feat: HTTP server example can use stopwords 2019-01-14 18:21:58 +01:00
b13e61f40a Merge pull request #83 from qdequele/create-server-example
Create an example of HTTP server managing multiple databases
2019-01-14 14:35:33 +01:00
c023cb3065 feat: Create an example for HTTP server managing multiple databases 2019-01-14 13:39:54 +01:00
0a3d069fbc Merge pull request #79 from qdequele/master
Schema can be de/serialized from a json format
2019-01-12 21:50:02 +01:00
fa062ce2cf feat: Schema can be de/serialized from a json format 2019-01-12 21:05:48 +01:00
cdc6e47bf5 Merge pull request #81 from Kerollmops/update-readme
Simplify the examples command lines
2019-01-12 13:43:42 +01:00
d5f44838be doc: Simplify the examples command lines 2019-01-12 12:56:11 +01:00
6 changed files with 528 additions and 18 deletions

View File

@ -1,13 +1,14 @@
[package]
edition = "2018"
name = "meilidb"
version = "0.2.0"
version = "0.2.1"
authors = ["Kerollmops <renault.cle@gmail.com>"]
[dependencies]
bincode = "1.0"
byteorder = "1.2"
crossbeam = "0.6"
elapsed = "0.1"
fst = "0.3"
hashbrown = { version = "0.1", features = ["serde"] }
lazy_static = "1.1"
@ -17,6 +18,7 @@ log = "0.4"
sdset = "0.3"
serde = "1.0"
serde_derive = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
unidecode = "0.3"
[dependencies.toml]
@ -41,7 +43,6 @@ nightly = ["hashbrown/nightly", "group-by/nightly"]
[dev-dependencies]
csv = "1.0"
elapsed = "0.1"
env_logger = "0.6"
jemallocator = "0.1"
quickcheck = "0.8"
@ -50,6 +51,11 @@ rand_xorshift = "0.1"
structopt = "0.2"
tempfile = "3.0"
termcolor = "1.0"
warp = "0.1"
[dev-dependencies.chashmap]
git = "https://gitlab.redox-os.org/redox-os/tfs.git"
rev = "b3e7cae1"
[profile.release]
debug = true

View File

@ -49,7 +49,7 @@ MeiliDB runs with an index like most search engines.
So to test the library you can create one by indexing a simple csv file.
```bash
cargo run --release --example create-database -- test.mdb misc/kaggle.csv --schema schema-example.toml --stop-words misc/fr.stopwords.txt
cargo run --release --example create-database -- test.mdb misc/kaggle.csv --schema schema-example.toml
```
Once the command is executed, the index should be in the `test.mdb` folder. You are now able to run the `query-database` example and play with MeiliDB.

435
examples/http-server.rs Normal file
View File

@ -0,0 +1,435 @@
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
use log::{error, info};
use std::error::Error;
use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, File};
use std::io::{self, BufRead, BufReader};
use std::net::SocketAddr;
use std::path::{PathBuf, Path};
use std::sync::Arc;
use std::time::SystemTime;
use hashbrown::{HashMap, HashSet};
use chashmap::CHashMap;
use chashmap::ReadGuard;
use elapsed::measure_time;
use meilidb::database::Database;
use meilidb::database::UpdateBuilder;
use meilidb::database::schema::Schema;
use meilidb::database::schema::SchemaBuilder;
use meilidb::tokenizer::DefaultBuilder;
use serde_derive::Deserialize;
use serde_derive::Serialize;
use structopt::StructOpt;
use warp::{Rejection, Filter};
#[derive(Debug, StructOpt)]
pub struct Opt {
/// The destination where the database must be created.
#[structopt(parse(from_os_str))]
pub database_path: PathBuf,
/// The address and port to bind the server to.
#[structopt(short = "l", default_value = "127.0.0.1:8080")]
pub listen_addr: SocketAddr,
/// The path to the list of stop words (one by line).
#[structopt(long = "stop-words", parse(from_os_str))]
pub stop_words: PathBuf,
}
//
// ERRORS FOR THE MULTIDATABASE
//
#[derive(Debug)]
pub enum DatabaseError {
AlreadyExist,
NotExist,
NotFound(String),
Unknown(Box<Error>),
}
impl fmt::Display for DatabaseError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
DatabaseError::AlreadyExist => write!(f, "File already exist"),
DatabaseError::NotExist => write!(f, "File not exist"),
DatabaseError::NotFound(ref name) => write!(f, "Database {} not found", name),
DatabaseError::Unknown(e) => write!(f, "{}", e),
}
}
}
impl Error for DatabaseError {}
impl From<Box<Error>> for DatabaseError {
fn from(e: Box<Error>) -> DatabaseError {
DatabaseError::Unknown(e)
}
}
//
// MULTIDATABASE DEFINITION
//
pub struct MultiDatabase {
databases: CHashMap<String, Database>,
db_path: PathBuf,
stop_words: HashSet<String>,
}
impl MultiDatabase {
pub fn new(path: PathBuf, stop_words: HashSet<String>) -> MultiDatabase {
MultiDatabase {
databases: CHashMap::new(),
db_path: path,
stop_words: stop_words
}
}
pub fn create(&self, name: String, schema: Schema) -> Result<(), DatabaseError> {
let rdb_name = format!("{}.mdb", name);
let database_path = self.db_path.join(rdb_name);
if database_path.exists() {
return Err(DatabaseError::AlreadyExist.into());
}
let index = Database::create(database_path, &schema)?;
self.databases.insert_new(name, index);
Ok(())
}
pub fn load(&self, name: String) -> Result<(), DatabaseError> {
let rdb_name = format!("{}.mdb", name);
let index_path = self.db_path.join(rdb_name);
if !index_path.exists() {
return Err(DatabaseError::NotExist.into());
}
let index = Database::open(index_path)?;
self.databases.insert_new(name, index);
Ok(())
}
pub fn load_existing(&self) {
let paths = match fs::read_dir(self.db_path.clone()){
Ok(p) => p,
Err(e) => {
error!("{}", e);
return
}
};
for path in paths {
let path = match path {
Ok(p) => p.path(),
Err(_) => continue
};
let path_str = match path.to_str() {
Some(p) => p,
None => continue
};
let extension = match get_extension_from_path(path_str) {
Some(e) => e,
None => continue
};
if extension != "mdb" {
continue
}
let name = match get_file_name_from_path(path_str) {
Some(f) => f,
None => continue
};
let db = match Database::open(path.clone()) {
Ok(db) => db,
Err(_) => continue
};
self.databases.insert_new(name.to_string(), db);
info!("Load database {}", name);
}
}
pub fn create_or_load(&self, name: String, schema: Schema) -> Result<(), DatabaseError> {
match self.create(name.clone(), schema) {
Err(DatabaseError::AlreadyExist) => self.load(name),
x => x,
}
}
pub fn get(&self, name: String) -> Result<ReadGuard<String, Database>, Box<Error>> {
Ok(self.databases.get(&name).ok_or(DatabaseError::NotFound(name))?)
}
}
fn get_extension_from_path(path: &str) -> Option<&str> {
Path::new(path).extension().and_then(OsStr::to_str)
}
fn get_file_name_from_path(path: &str) -> Option<&str> {
Path::new(path).file_stem().and_then(OsStr::to_str)
}
fn retrieve_stop_words(path: &Path) -> io::Result<HashSet<String>> {
let f = File::open(path)?;
let reader = BufReader::new(f);
let mut words = HashSet::new();
for line in reader.lines() {
let line = line?;
let word = line.trim().to_string();
words.insert(word);
}
Ok(words)
}
//
// PARAMS & BODY FOR HTTPS HANDLERS
//
#[derive(Deserialize)]
struct CreateBody {
name: String,
schema: SchemaBuilder,
}
#[derive(Deserialize)]
struct IngestBody {
insert: Option<Vec<HashMap<String, String>>>,
delete: Option<Vec<HashMap<String, String>>>
}
#[derive(Serialize)]
struct IngestResponse {
inserted: usize,
deleted: usize
}
#[derive(Deserialize)]
struct SearchQuery {
q: String,
limit: Option<usize>,
}
//
// HTTP ROUTES
//
// Create a new index.
// The index name should be unused and the schema valid.
//
// POST /create
// Body:
// - name: String
// - schema: JSON
// - stopwords: Vec<String>
fn create(body: CreateBody, db: Arc<MultiDatabase>) -> Result<String, Rejection> {
let schema = body.schema.build();
match db.create(body.name.clone(), schema) {
Ok(_) => Ok(format!("{} created ", body.name)),
Err(e) => {
error!("{:?}", e);
return Err(warp::reject::not_found())
}
}
}
// Ingest new document.
// It's possible to have positive or/and negative updates.
//
// PUT /:name/ingest
// Body:
// - insert: Option<Vec<JSON>>
// - delete: Option<Vec<String>>
fn ingest(index_name: String, body: IngestBody, db: Arc<MultiDatabase>) -> Result<String, Rejection> {
let schema = {
let index = match db.get(index_name.clone()){
Ok(i) => i,
Err(_) => return Err(warp::reject::not_found()),
};
let view = index.view();
view.schema().clone()
};
let tokenizer_builder = DefaultBuilder::new();
let now = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
};
let sst_name = format!("update-{}-{}.sst", index_name, now);
let sst_path = db.db_path.join(sst_name);
let mut response = IngestResponse{inserted: 0, deleted: 0};
let mut update = UpdateBuilder::new(sst_path, schema);
if let Some(documents) = body.delete {
for doc in documents {
if let Err(e) = update.remove_document(doc) {
error!("Impossible to remove document; {:?}", e);
} else {
response.deleted += 1;
}
}
}
let stop_words = &db.stop_words;
if let Some(documents) = body.insert {
for doc in documents {
if let Err(e) = update.update_document(doc, &tokenizer_builder, &stop_words) {
error!("Impossible to update document; {:?}", e);
} else {
response.inserted += 1;
}
}
}
let update = match update.build() {
Ok(u) => u,
Err(e) => {
error!("Impossible to create an update file; {:?}", e);
return Err(warp::reject::not_found())
}
};
{
let index = match db.get(index_name.clone()){
Ok(i) => i,
Err(_) => return Err(warp::reject::not_found()),
};
if let Err(e) = index.ingest_update_file(update) {
error!("Impossible to ingest sst file; {:?}", e);
return Err(warp::reject::not_found())
};
}
if let Ok(response) = serde_json::to_string(&response) {
return Ok(response);
};
return Err(warp::reject::not_found())
}
// Search in a specific index
// The default limit is 20
//
// GET /:name/search
// Params:
// - query: String
// - limit: Option<usize>
fn search(index_name: String, query: SearchQuery, db: Arc<MultiDatabase>) -> Result<String, Rejection> {
let view = {
let index = match db.get(index_name.clone()){
Ok(i) => i,
Err(_) => return Err(warp::reject::not_found()),
};
index.view()
};
let limit = query.limit.unwrap_or(20);
let query_builder = match view.query_builder() {
Ok(q) => q,
Err(_err) => return Err(warp::reject::not_found()),
};
let (time, responses) = measure_time(|| {
let docs = query_builder.query(&query.q, 0..limit);
let mut results: Vec<HashMap<String, String>> = Vec::with_capacity(limit);
for doc in docs {
match view.document_by_id(doc.id) {
Ok(val) => results.push(val),
Err(e) => println!("{:?}", e),
}
}
results
});
let response = match serde_json::to_string(&responses) {
Ok(val) => val,
Err(err) => format!("{:?}", err),
};
info!("index: {} - search: {:?} - limit: {} - time: {}", index_name, query.q, limit, time);
Ok(response)
}
fn start_server(listen_addr: SocketAddr, db: Arc<MultiDatabase>) {
let index_path = warp::path("index").and(warp::path::param::<String>());
let db = warp::any().map(move || db.clone());
let create_path = warp::path("create").and(warp::path::end());
let ingest_path = index_path.and(warp::path("ingest")).and(warp::path::end());
let search_path = index_path.and(warp::path("search")).and(warp::path::end());
let create = warp::post2()
.and(create_path)
.and(warp::body::json())
.and(db.clone())
.and_then(create);
let ingest = warp::put2()
.and(ingest_path)
.and(warp::body::json())
.and(db.clone())
.and_then(ingest);
let search = warp::get2()
.and(search_path)
.and(warp::query())
.and(db.clone())
.and_then(search);
let api = create
.or(ingest)
.or(search);
let logs = warp::log("server");
let headers = warp::reply::with::header("Content-Type", "application/json");
let routes = api.with(logs).with(headers);
info!("Server is started on {}", listen_addr);
warp::serve(routes).run(listen_addr);
}
fn main() {
env_logger::init();
let opt = Opt::from_args();
let stop_words = match retrieve_stop_words(&opt.stop_words) {
Ok(s) => s,
Err(_) => HashSet::new(),
};
let db = Arc::new(MultiDatabase::new(opt.database_path.clone(), stop_words));
db.load_existing();
start_server(opt.listen_addr, db);
}

View File

@ -7,7 +7,7 @@ use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamil
use rocksdb::rocksdb::{Writable, Snapshot};
use rocksdb::{DB, DBVector, MergeOperands};
use crossbeam::atomic::ArcCell;
use log::debug;
use log::info;
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
pub use self::view::{DatabaseView, DocumentIter};
@ -39,10 +39,18 @@ where D: Deref<Target=DB>
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<Index, Box<Error>>
where D: Deref<Target=DB>
{
let index = match snapshot.get(DATA_INDEX)? {
let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_INDEX));
info!("loading index from kv-store took {}", elapsed);
let index = match vector? {
Some(vector) => {
let bytes = vector.as_ref().to_vec();
Index::from_bytes(bytes)?
info!("index size if {} MiB", bytes.len() / 1024 / 1024);
let (elapsed, index) = elapsed::measure_time(|| Index::from_bytes(bytes));
info!("loading index from bytes took {}", elapsed);
index?
},
None => Index::default(),
};
@ -148,14 +156,12 @@ impl Database {
let options = IngestExternalFileOptions::new();
// options.move_files(move_update);
debug!("ingest update file");
let cf_handle = db.cf_handle("default").expect("\"default\" column family not found");
db.ingest_external_file_optimized(&cf_handle, &options, &[&path])?;
debug!("compacting index range");
// Compacting to trigger the merge operator only one time
// while ingesting the update and not each time searching
db.compact_range(Some(DATA_INDEX), Some(DATA_INDEX));
let (elapsed, result) = elapsed::measure_time(|| {
let cf_handle = db.cf_handle("default").expect("\"default\" column family not found");
db.ingest_external_file_optimized(&cf_handle, &options, &[&path])
});
let _ = result?;
info!("ingesting update file took {}", elapsed);
Snapshot::new(db.clone())
};

View File

@ -113,6 +113,23 @@ impl Schema {
Ok(())
}
pub fn from_json<R: Read>(mut reader: R) -> Result<Schema, Box<Error>> {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer)?;
let builder: SchemaBuilder = serde_json::from_slice(&buffer)?;
Ok(builder.build())
}
pub fn to_json<W: Write>(&self, mut writer: W) -> Result<(), Box<Error>> {
let identifier = self.inner.identifier.clone();
let attributes = self.attributes_ordered();
let builder = SchemaBuilder { identifier, attributes };
let string = serde_json::to_string_pretty(&builder)?;
writer.write_all(string.as_bytes())?;
Ok(())
}
pub(crate) fn read_from_bin<R: Read>(reader: R) -> bincode::Result<Schema> {
let builder: SchemaBuilder = bincode::deserialize_from(reader)?;
Ok(builder.build())
@ -254,4 +271,40 @@ mod tests {
Ok(())
}
#[test]
fn serialize_deserialize_json() -> Result<(), Box<Error>> {
let mut builder = SchemaBuilder::with_identifier("id");
builder.new_attribute("alpha", STORED);
builder.new_attribute("beta", STORED | INDEXED);
builder.new_attribute("gamma", INDEXED);
let schema = builder.build();
let mut buffer = Vec::new();
schema.to_json(&mut buffer)?;
let schema2 = Schema::from_json(buffer.as_slice())?;
assert_eq!(schema, schema2);
let data = r#"
{
"identifier": "id",
"attributes": {
"alpha": {
"stored": true
},
"beta": {
"stored": true,
"indexed": true
},
"gamma": {
"indexed": true
}
}
}"#;
let schema2 = Schema::from_json(data.as_bytes())?;
assert_eq!(schema, schema2);
Ok(())
}
}

View File

@ -93,6 +93,7 @@ where D: Deref<Target=DB>,
op_builder.union()
};
let mut number_matches = 0;
let mut matches = HashMap::new();
while let Some((input, indexed_values)) = stream.next() {
@ -104,6 +105,7 @@ where D: Deref<Target=DB>,
let doc_indexes = &self.view.index().positive.indexes();
let doc_indexes = &doc_indexes[iv.value as usize];
number_matches += doc_indexes.len();
for doc_index in doc_indexes {
let match_ = Match {
query_index: iv.index as u32,
@ -117,7 +119,8 @@ where D: Deref<Target=DB>,
}
}
info!("{} documents to classify", matches.len());
info!("{} total documents to classify", matches.len());
info!("{} total matches to classify", number_matches);
matches.into_iter().map(|(i, m)| Document::from_matches(i, m)).collect()
}
@ -135,15 +138,19 @@ where D: Deref<Target=DB>,
return builder.query(query, range);
}
let mut documents = self.query_all(query);
let (elapsed, mut documents) = elapsed::measure_time(|| self.query_all(query));
info!("query_all took {}", elapsed);
let mut groups = vec![documents.as_mut_slice()];
let view = &self.view;
'criteria: for criterion in self.criteria.as_ref() {
'criteria: for (ci, criterion) in self.criteria.as_ref().iter().enumerate() {
let tmp_groups = mem::replace(&mut groups, Vec::new());
let mut documents_seen = 0;
for group in tmp_groups {
info!("criterion {}, documents group of size {}", ci, group.len());
// if this group does not overlap with the requested range,
// push it without sorting and splitting it
if documents_seen + group.len() < range.start {
@ -152,7 +159,10 @@ where D: Deref<Target=DB>,
continue;
}
group.sort_unstable_by(|a, b| criterion.evaluate(a, b, view));
let (elapsed, ()) = elapsed::measure_time(|| {
group.sort_unstable_by(|a, b| criterion.evaluate(a, b, view));
});
info!("criterion {} sort took {}", ci, elapsed);
for group in BinaryGroupByMut::new(group, |a, b| criterion.eq(a, b, view)) {
documents_seen += group.len();