feat: Save the schema in the key-value store

This commit is contained in:
Clément Renault
2018-11-21 15:19:29 +01:00
parent 8df068af3c
commit 86f23d2695
4 changed files with 63 additions and 34 deletions

View File

@@ -12,12 +12,14 @@ use std::path::{Path, PathBuf};
use std::collections::{BTreeSet, BTreeMap}; use std::collections::{BTreeSet, BTreeMap};
use fs2::FileExt; use fs2::FileExt;
use ::rocksdb::rocksdb::Writable;
use ::rocksdb::{rocksdb, rocksdb_options}; use ::rocksdb::{rocksdb, rocksdb_options};
use ::rocksdb::merge_operator::MergeOperands; use ::rocksdb::merge_operator::MergeOperands;
use crate::rank::Document; use crate::rank::Document;
use crate::data::DocIdsBuilder; use crate::data::DocIdsBuilder;
use crate::{DocIndex, DocumentId}; use crate::{DocIndex, DocumentId};
use crate::index::schema::Schema;
use crate::index::update::Update; use crate::index::update::Update;
use crate::blob::{PositiveBlobBuilder, Blob, Sign}; use crate::blob::{PositiveBlobBuilder, Blob, Sign};
use crate::blob::ordered_blobs_from_slice; use crate::blob::ordered_blobs_from_slice;
@@ -25,6 +27,13 @@ use crate::tokenizer::{TokenizerBuilder, DefaultBuilder, Tokenizer};
use crate::rank::{criterion, Config, RankedStream}; use crate::rank::{criterion, Config, RankedStream};
use crate::automaton; use crate::automaton;
const DATA_PREFIX: &str = "data";
const BLOB_PREFIX: &str = "blob";
const DOCU_PREFIX: &str = "docu";
const DATA_BLOBS_ORDER: &str = "data-blobs-order";
const DATA_SCHEMA: &str = "data-schema";
fn simple_vec_append(key: &[u8], value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> { fn simple_vec_append(key: &[u8], value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
let mut output = Vec::new(); let mut output = Vec::new();
for bytes in operands.chain(value) { for bytes in operands.chain(value) {
@@ -38,15 +47,18 @@ pub struct Index {
} }
impl Index { impl Index {
pub fn create<P: AsRef<Path>>(path: P) -> Result<Index, Box<Error>> { pub fn create<P: AsRef<Path>>(path: P, schema: Schema) -> Result<Index, Box<Error>> {
unimplemented!("return a soft error: the database already exist at the given path")
// Self::open must not take a parameter for create_if_missing // Self::open must not take a parameter for create_if_missing
// or we must create an OpenOptions with many parameters // or we must create an OpenOptions with many parameters
// https://doc.rust-lang.org/std/fs/struct.OpenOptions.html // https://doc.rust-lang.org/std/fs/struct.OpenOptions.html
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Index, Box<Error>> {
let path = path.as_ref().to_string_lossy();
let path = path.as_ref();
if path.exists() {
return Err(format!("File already exists at path: {}, cannot create database.",
path.display()).into())
}
let path = path.to_string_lossy();
let mut opts = rocksdb_options::DBOptions::new(); let mut opts = rocksdb_options::DBOptions::new();
opts.create_if_missing(true); opts.create_if_missing(true);
@@ -55,8 +67,28 @@ impl Index {
let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?; let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
// check if index is a valid RocksDB and let mut schema_bytes = Vec::new();
// contains the right key-values (i.e. "blobs-order") schema.write_to(&mut schema_bytes)?;
database.put(DATA_SCHEMA.as_bytes(), &schema_bytes)?;
Ok(Self { database })
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Index, Box<Error>> {
let path = path.as_ref().to_string_lossy();
let mut opts = rocksdb_options::DBOptions::new();
opts.create_if_missing(false);
let mut cf_opts = rocksdb_options::ColumnFamilyOptions::new();
cf_opts.add_merge_operator("blobs order operator", simple_vec_append);
let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
let _schema = match database.get(DATA_SCHEMA.as_bytes())? {
Some(value) => Schema::read_from(&*value)?,
None => return Err(String::from("Database does not contain a schema").into()),
};
Ok(Self { database }) Ok(Self { database })
} }
@@ -74,17 +106,20 @@ impl Index {
Ok(()) Ok(())
} }
fn blobs(&self) -> Result<Vec<Blob>, Box<Error>> { pub fn schema(&self) -> Result<Schema, Box<Error>> {
match self.database.get(b"00-blobs-order")? { let bytes = self.database.get(DATA_SCHEMA.as_bytes())?.expect("data-schema entry not found");
Some(value) => Ok(ordered_blobs_from_slice(&value)?), Ok(Schema::read_from(&*bytes).expect("Invalid schema"))
None => Ok(Vec::new()),
}
} }
pub fn search(&self, query: &str) -> Result<Vec<Document>, Box<Error>> { pub fn search(&self, query: &str) -> Result<Vec<Document>, Box<Error>> {
// this snapshot will allow consistent operations on documents
let snapshot = self.database.snapshot();
// FIXME create a SNAPSHOT for the search ! // FIXME create a SNAPSHOT for the search !
let blobs = self.blobs()?; let blobs = match snapshot.get(DATA_BLOBS_ORDER.as_bytes())? {
Some(value) => ordered_blobs_from_slice(&value)?,
None => Vec::new(),
};
let mut automatons = Vec::new(); let mut automatons = Vec::new();
for query in query.split_whitespace().map(str::to_lowercase) { for query in query.split_whitespace().map(str::to_lowercase) {

View File

@@ -12,11 +12,6 @@ mod positive_update;
pub use self::negative_update::{NegativeUpdateBuilder}; pub use self::negative_update::{NegativeUpdateBuilder};
pub use self::positive_update::{PositiveUpdateBuilder, NewState}; pub use self::positive_update::{PositiveUpdateBuilder, NewState};
// These prefixes are here to make sure the documents fields
// and the internal data doesn't collide and the internal data are
// at the top of the sst file.
const FIELD_BLOBS_ORDER: &str = "00-blobs-order";
pub struct Update { pub struct Update {
path: PathBuf, path: PathBuf,
} }
@@ -31,10 +26,7 @@ impl Update {
file_writer.open(&path.to_string_lossy())?; file_writer.open(&path.to_string_lossy())?;
let infos = file_writer.finish()?; let infos = file_writer.finish()?;
if infos.smallest_key() != FIELD_BLOBS_ORDER.as_bytes() { // FIXME check if the update contains a blobs-order entry
// FIXME return a nice error
panic!("Invalid update file: the blobs-order field is not the smallest key")
}
Ok(Update { path }) Ok(Update { path })
} }

View File

@@ -3,7 +3,8 @@ use std::error::Error;
use ::rocksdb::rocksdb_options; use ::rocksdb::rocksdb_options;
use crate::index::update::{FIELD_BLOBS_ORDER, Update}; use crate::index::DATA_BLOBS_ORDER;
use crate::index::update::Update;
use crate::index::blob_name::BlobName; use crate::index::blob_name::BlobName;
use crate::data::DocIdsBuilder; use crate::data::DocIdsBuilder;
use crate::DocumentId; use crate::DocumentId;
@@ -40,16 +41,16 @@ impl NegativeUpdateBuilder {
// write the blob name to be merged // write the blob name to be merged
let blob_name = blob_name.to_string(); let blob_name = blob_name.to_string();
file_writer.merge(FIELD_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?; file_writer.merge(DATA_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?;
// write the doc ids // write the doc ids
let blob_key = format!("0b-{}-doc-ids", blob_name); let blob_key = format!("BLOB-{}-doc-ids", blob_name);
let blob_doc_ids = self.doc_ids.into_inner()?; let blob_doc_ids = self.doc_ids.into_inner()?;
file_writer.put(blob_key.as_bytes(), &blob_doc_ids)?; file_writer.put(blob_key.as_bytes(), &blob_doc_ids)?;
for id in blob_doc_ids { for id in blob_doc_ids {
let start = format!("5d-{}", id); let start = format!("DOCU-{}", id);
let end = format!("5d-{}", id + 1); let end = format!("DOCU-{}", id + 1);
file_writer.delete_range(start.as_bytes(), end.as_bytes())?; file_writer.delete_range(start.as_bytes(), end.as_bytes())?;
} }

View File

@@ -5,10 +5,11 @@ use std::fmt::Write;
use ::rocksdb::rocksdb_options; use ::rocksdb::rocksdb_options;
use crate::index::schema::{SchemaProps, Schema, SchemaAttr}; use crate::index::DATA_BLOBS_ORDER;
use crate::index::update::{FIELD_BLOBS_ORDER, Update}; use crate::index::update::Update;
use crate::tokenizer::TokenizerBuilder;
use crate::index::blob_name::BlobName; use crate::index::blob_name::BlobName;
use crate::index::schema::{SchemaProps, Schema, SchemaAttr};
use crate::tokenizer::TokenizerBuilder;
use crate::blob::PositiveBlobBuilder; use crate::blob::PositiveBlobBuilder;
use crate::{DocIndex, DocumentId}; use crate::{DocIndex, DocumentId};
@@ -66,7 +67,7 @@ where B: TokenizerBuilder
// write the blob name to be merged // write the blob name to be merged
let blob_name = blob_name.to_string(); let blob_name = blob_name.to_string();
file_writer.put(FIELD_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?; file_writer.put(DATA_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?;
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
for ((document_id, field), state) in &self.new_states { for ((document_id, field), state) in &self.new_states {
@@ -96,15 +97,15 @@ where B: TokenizerBuilder
let (blob_fst_map, blob_doc_idx) = builder.into_inner()?; let (blob_fst_map, blob_doc_idx) = builder.into_inner()?;
// write the fst // write the fst
let blob_key = format!("0b-{}-fst", blob_name); let blob_key = format!("BLOB-{}-fst", blob_name);
file_writer.put(blob_key.as_bytes(), &blob_fst_map)?; file_writer.put(blob_key.as_bytes(), &blob_fst_map)?;
// write the doc-idx // write the doc-idx
let blob_key = format!("0b-{}-doc-idx", blob_name); let blob_key = format!("BLOB-{}-doc-idx", blob_name);
file_writer.put(blob_key.as_bytes(), &blob_doc_idx)?; file_writer.put(blob_key.as_bytes(), &blob_doc_idx)?;
// write all the documents fields updates // write all the documents fields updates
let mut key = String::from("5d-"); let mut key = String::from("DOCU-");
let prefix_len = key.len(); let prefix_len = key.len();
for ((id, field), state) in self.new_states { for ((id, field), state) in self.new_states {