feat: Introduce the index module

This commit is contained in:
Clément Renault
2018-11-20 11:37:19 +01:00
parent b3249d515d
commit 7c1a17520d
9 changed files with 188 additions and 92 deletions

View File

@@ -35,4 +35,5 @@ serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
structopt = "0.2" structopt = "0.2"
tempfile = "3.0"
warp = "0.1" warp = "0.1"

41
examples/create-index.rs Normal file
View File

@@ -0,0 +1,41 @@
use std::path::Path;
use std::error::Error;
use std::path::PathBuf;
use std::io::{self, Write};
use elapsed::measure_time;
use moby_name_gen::random_name;
use structopt::StructOpt;
use pentium::index::update::Update;
use pentium::index::Index;
#[derive(Debug, StructOpt)]
pub struct Cmd {
/// csv file to index
#[structopt(parse(from_os_str))]
pub csv_file: PathBuf,
}
fn generate_update_from_csv(path: &Path) -> Result<Update, Box<Error>> {
unimplemented!()
}
fn main() -> Result<(), Box<Error>> {
let command = Cmd::from_args();
let path = random_name();
println!("generating the update...");
let update = generate_update_from_csv(&command.csv_file)?;
println!("creating the index");
let index = Index::open(&path)?;
println!("ingesting the changes in the index");
index.ingest_update(update)?;
println!("the index {:?} has been created!", path);
Ok(())
}

40
examples/index-search.rs Normal file
View File

@@ -0,0 +1,40 @@
use std::error::Error;
use std::path::PathBuf;
use std::io::{self, Write};
use elapsed::measure_time;
use structopt::StructOpt;
use pentium::index::Index;
#[derive(Debug, StructOpt)]
pub struct Cmd {
/// Index path (e.g. relaxed-colden).
#[structopt(parse(from_os_str))]
pub index_path: PathBuf,
}
fn main() -> Result<(), Box<Error>> {
let command = Cmd::from_args();
let index = Index::open(command.index_path)?;
loop {
print!("Searching for: ");
io::stdout().flush()?;
let mut query = String::new();
io::stdin().read_line(&mut query)?;
if query.is_empty() { break }
let (elapsed, result) = measure_time(|| index.search(&query));
match result {
Ok(documents) => {
// display documents here !
println!("Finished in {}", elapsed)
},
Err(e) => panic!("{}", e),
}
}
Ok(())
}

View File

@@ -1,3 +1,4 @@
use std::error::Error;
use std::str::from_utf8_unchecked; use std::str::from_utf8_unchecked;
use std::io::{self, Write}; use std::io::{self, Write};
use structopt::StructOpt; use structopt::StructOpt;
@@ -5,37 +6,25 @@ use std::path::PathBuf;
use elapsed::measure_time; use elapsed::measure_time;
use rocksdb::{DB, DBOptions, IngestExternalFileOptions}; use rocksdb::{DB, DBOptions, IngestExternalFileOptions};
use pentium::index::Index;
use pentium::rank::{criterion, Config, RankedStream}; use pentium::rank::{criterion, Config, RankedStream};
use pentium::{automaton, DocumentId, Metadata}; use pentium::{automaton, DocumentId};
#[derive(Debug, StructOpt)] #[derive(Debug, StructOpt)]
pub struct CommandConsole { pub struct CommandConsole {
/// Meta file name (e.g. relaxed-colden). /// Meta file name (e.g. relaxed-colden).
#[structopt(parse(from_os_str))] #[structopt(parse(from_os_str))]
pub meta_name: PathBuf, pub index_path: PathBuf,
} }
pub struct ConsoleSearch { pub struct ConsoleSearch {
metadata: Metadata, index: Index,
db: DB,
} }
impl ConsoleSearch { impl ConsoleSearch {
pub fn from_command(command: CommandConsole) -> io::Result<ConsoleSearch> { pub fn from_command(command: CommandConsole) -> Result<ConsoleSearch, Box<Error>> {
let map_file = command.meta_name.with_extension("map"); let index = Index::open(command.index_path)?;
let idx_file = command.meta_name.with_extension("idx"); Ok(ConsoleSearch { index })
let sst_file = command.meta_name.with_extension("sst");
let metadata = unsafe { Metadata::from_paths(map_file, idx_file).unwrap() };
let rocksdb = "rocksdb/storage";
let db = DB::open_default(rocksdb).unwrap();
let sst_file = sst_file.to_str().unwrap();
db.ingest_external_file(&IngestExternalFileOptions::new(), &[sst_file]).unwrap();
drop(db);
let db = DB::open_for_read_only(DBOptions::default(), rocksdb, false).unwrap();
Ok(ConsoleSearch { metadata, db })
} }
pub fn serve(self) { pub fn serve(self) {
@@ -48,13 +37,13 @@ impl ConsoleSearch {
if query.is_empty() { break } if query.is_empty() { break }
let (elapsed, _) = measure_time(|| search(&self.metadata, &self.db, &query)); let (elapsed, _) = measure_time(|| search(&self.index, &query));
println!("Finished in {}", elapsed); println!("Finished in {}", elapsed);
} }
} }
} }
fn search(metadata: &Metadata, database: &DB, query: &str) { fn search(index: &Index, query: &str) {
let mut automatons = Vec::new(); let mut automatons = Vec::new();
for query in query.split_whitespace().map(str::to_lowercase) { for query in query.split_whitespace().map(str::to_lowercase) {
let lev = automaton::build_prefix_dfa(&query); let lev = automaton::build_prefix_dfa(&query);
@@ -75,9 +64,11 @@ fn search(metadata: &Metadata, database: &DB, query: &str) {
} }
}; };
let index: Index = unimplemented!();
// "Sony" "PlayStation 4 500GB" // "Sony" "PlayStation 4 500GB"
let config = Config { let config = Config {
index: unimplemented!(), blobs: &index.blobs().unwrap(),
automatons: automatons, automatons: automatons,
criteria: criterion::default(), criteria: criterion::default(),
distinct: (distinct_by_title_first_four_chars, 1), distinct: (distinct_by_title_first_four_chars, 1),

View File

@@ -8,8 +8,8 @@ pub use self::merge::Merge;
pub use self::positive_blob::{PositiveBlob, PositiveBlobBuilder}; pub use self::positive_blob::{PositiveBlob, PositiveBlobBuilder};
pub use self::negative_blob::{NegativeBlob, NegativeBlobBuilder}; pub use self::negative_blob::{NegativeBlob, NegativeBlobBuilder};
use std::error::Error;
use fst::Map; use fst::Map;
use crate::data::DocIndexes; use crate::data::DocIndexes;
pub enum Blob { pub enum Blob {
@@ -40,3 +40,7 @@ impl Sign {
} }
} }
} }
pub fn ordered_blobs_from_slice(slice: &[u8]) -> Result<Vec<Blob>, Box<Error>> {
unimplemented!()
}

View File

@@ -1,6 +1,5 @@
pub mod blob_name; pub mod blob_name;
pub mod schema; pub mod schema;
pub mod search;
pub mod update; pub mod update;
use std::io; use std::io;
@@ -19,9 +18,12 @@ use ::rocksdb::merge_operator::MergeOperands;
use crate::rank::Document; use crate::rank::Document;
use crate::data::DocIdsBuilder; use crate::data::DocIdsBuilder;
use crate::{DocIndex, DocumentId}; use crate::{DocIndex, DocumentId};
use crate::index::{update::Update, search::Search}; use crate::index::update::Update;
use crate::blob::{PositiveBlobBuilder, Blob, Sign}; use crate::blob::{PositiveBlobBuilder, Blob, Sign};
use crate::blob::ordered_blobs_from_slice;
use crate::tokenizer::{TokenizerBuilder, DefaultBuilder, Tokenizer}; use crate::tokenizer::{TokenizerBuilder, DefaultBuilder, Tokenizer};
use crate::rank::{criterion, Config, RankedStream};
use crate::automaton;
fn simple_vec_append(key: &[u8], value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> { fn simple_vec_append(key: &[u8], value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
let mut output = Vec::new(); let mut output = Vec::new();
@@ -36,6 +38,12 @@ pub struct Index {
} }
impl Index { impl Index {
pub fn create<P: AsRef<Path>>(path: P) -> Result<Index, Box<Error>> {
unimplemented!("return a soft error: the database already exist at the given path")
// Self::open must not take a parameter for create_if_missing
// or we must create an OpenOptions with many parameters
// https://doc.rust-lang.org/std/fs/struct.OpenOptions.html
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Index, Box<Error>> { pub fn open<P: AsRef<Path>>(path: P) -> Result<Index, Box<Error>> {
let path = path.as_ref().to_string_lossy(); let path = path.as_ref().to_string_lossy();
@@ -66,50 +74,47 @@ impl Index {
Ok(()) Ok(())
} }
pub fn snapshot(&self) -> Snapshot<&rocksdb::DB> { fn blobs(&self) -> Result<Vec<Blob>, Box<Error>> {
Snapshot::new(&self.database) match self.database.get(b"00-blobs-order")? {
Some(value) => Ok(ordered_blobs_from_slice(&value)?),
None => Ok(Vec::new()),
}
} }
}
impl Search for Index { pub fn search(&self, query: &str) -> Result<Vec<Document>, Box<Error>> {
fn search(&self, text: &str) -> Vec<Document> {
unimplemented!()
}
}
pub struct Snapshot<D> // FIXME create a SNAPSHOT for the search !
where D: Deref<Target=rocksdb::DB>, let blobs = self.blobs()?;
{
inner: rocksdb::Snapshot<D>,
}
impl<D> Snapshot<D> let mut automatons = Vec::new();
where D: Deref<Target=rocksdb::DB>, for query in query.split_whitespace().map(str::to_lowercase) {
{ let lev = automaton::build_prefix_dfa(&query);
pub fn new(inner: D) -> Snapshot<D> { automatons.push(lev);
Self { inner: rocksdb::Snapshot::new(inner) } }
}
}
impl<D> Search for Snapshot<D> let config = Config {
where D: Deref<Target=rocksdb::DB>, blobs: &blobs,
{ automatons: automatons,
fn search(&self, text: &str) -> Vec<Document> { criteria: criterion::default(),
unimplemented!() distinct: ((), 1),
};
Ok(RankedStream::new(config).retrieve_documents(0..20))
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use tempfile::NamedTempFile;
use super::*; use super::*;
use crate::index::schema::Schema; use crate::index::schema::Schema;
use crate::index::update::{PositiveUpdateBuilder, NegativeUpdateBuilder}; use crate::index::update::{PositiveUpdateBuilder, NegativeUpdateBuilder};
#[test] #[test]
fn generate_negative_update() -> Result<(), Box<Error>> { fn generate_negative_update() -> Result<(), Box<Error>> {
let path = NamedTempFile::new()?.into_temp_path();
let schema = Schema::open("/meili/default.sch")?; let mut builder = NegativeUpdateBuilder::new(&path);
let mut builder = NegativeUpdateBuilder::new("update-delete-0001.sst");
// you can insert documents in any order, it is sorted internally // you can insert documents in any order, it is sorted internally
builder.remove(1); builder.remove(1);
@@ -157,18 +162,18 @@ mod tests {
////////////// //////////////
let index = Index::open("/meili/data")?; // let index = Index::open("/meili/data")?;
let update = Update::open("update-0001.sst")?; // let update = Update::open("update-0001.sst")?;
// if you create a snapshot before an update // // if you create a snapshot before an update
let snapshot = index.snapshot(); // let snapshot = index.snapshot();
index.ingest_update(update)?; // index.ingest_update(update)?;
// the snapshot does not see the updates // // the snapshot does not see the updates
let results = snapshot.search("helo"); // let results = snapshot.search("helo");
// the raw index itself see new results // // the raw index itself see new results
let results = index.search("helo"); // let results = index.search("helo");
Ok(()) Ok(())
} }

View File

@@ -1,6 +1,8 @@
use std::io::{Read, Write};
use std::error::Error; use std::error::Error;
use std::path::Path; use std::path::Path;
use std::ops::BitOr; use std::ops::BitOr;
use std::fs::File;
use std::fmt; use std::fmt;
pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false }; pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false };
@@ -33,15 +35,23 @@ impl BitOr for SchemaProps {
} }
} }
pub struct SchemaBuilder; pub struct SchemaBuilder {
fields: Vec<(String, SchemaProps)>,
}
impl SchemaBuilder { impl SchemaBuilder {
pub fn new() -> SchemaBuilder { pub fn new() -> SchemaBuilder {
unimplemented!() SchemaBuilder { fields: Vec::new() }
} }
pub fn field(&mut self, name: &str, props: SchemaProps) -> SchemaField { pub fn field<N>(&mut self, name: N, props: SchemaProps) -> SchemaField
unimplemented!() where N: Into<String>,
{
let len = self.fields.len();
let name = name.into();
self.fields.push((name, props));
SchemaField(len as u32)
} }
pub fn build(self) -> Schema { pub fn build(self) -> Schema {
@@ -49,6 +59,32 @@ impl SchemaBuilder {
} }
} }
#[derive(Clone)]
pub struct Schema;
impl Schema {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Schema, Box<Error>> {
let file = File::open(path)?;
Schema::read_from(file)
}
pub fn read_from<R: Read>(reader: R) -> Result<Schema, Box<Error>> {
unimplemented!()
}
pub fn write_to<W: Write>(writer: W) -> Result<(), Box<Error>> {
unimplemented!()
}
pub fn props(&self, field: SchemaField) -> SchemaProps {
unimplemented!()
}
pub fn field(&self, name: &str) -> Option<SchemaField> {
unimplemented!()
}
}
#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq)] #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub struct SchemaField(u32); pub struct SchemaField(u32);
@@ -63,20 +99,3 @@ impl fmt::Display for SchemaField {
write!(f, "{}", self.0) write!(f, "{}", self.0)
} }
} }
#[derive(Clone)]
pub struct Schema;
impl Schema {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Schema, Box<Error>> {
unimplemented!()
}
pub fn props(&self, field: SchemaField) -> SchemaProps {
unimplemented!()
}
pub fn field(&self, name: &str) -> Option<SchemaField> {
unimplemented!()
}
}

View File

@@ -1,5 +0,0 @@
use crate::rank::Document;
pub trait Search {
fn search(&self, text: &str) -> Vec<Document>;
}

View File

@@ -22,8 +22,8 @@ fn clamp_range<T: Copy + Ord>(range: Range<T>, big: Range<T>) -> Range<T> {
} }
} }
pub struct Config<C, F> { pub struct Config<'a, C, F> {
pub index: Index, pub blobs: &'a [Blob],
pub automatons: Vec<DfaExt>, pub automatons: Vec<DfaExt>,
pub criteria: Vec<C>, pub criteria: Vec<C>,
pub distinct: (F, usize), pub distinct: (F, usize),
@@ -37,11 +37,11 @@ pub struct RankedStream<'m, C, F> {
} }
impl<'m, C, F> RankedStream<'m, C, F> { impl<'m, C, F> RankedStream<'m, C, F> {
pub fn new(config: Config<C, F>) -> Self { pub fn new(config: Config<'m, C, F>) -> Self {
let automatons: Vec<_> = config.automatons.into_iter().map(Rc::new).collect(); let automatons: Vec<_> = config.automatons.into_iter().map(Rc::new).collect();
RankedStream { RankedStream {
stream: Merge::with_automatons(automatons.clone(), unimplemented!()), stream: Merge::with_automatons(automatons.clone(), config.blobs),
automatons: automatons, automatons: automatons,
criteria: config.criteria, criteria: config.criteria,
distinct: config.distinct, distinct: config.distinct,