Compare commits

...

7 Commits

6 changed files with 61 additions and 23 deletions

View File

@ -40,7 +40,7 @@ struct IndexCommand {
#[derive(Debug, StructOpt)]
struct SearchCommand {
/// The destination where the database must be created.
/// The path of the database to work with.
#[structopt(parse(from_os_str))]
database_path: PathBuf,
@ -65,10 +65,18 @@ struct SearchCommand {
displayed_fields: Vec<String>,
}
#[derive(Debug, StructOpt)]
struct ShowUpdatesCommand {
/// The path of the database to work with.
#[structopt(parse(from_os_str))]
database_path: PathBuf,
}
#[derive(Debug, StructOpt)]
enum Command {
Index(IndexCommand),
Search(SearchCommand),
ShowUpdates(ShowUpdatesCommand),
}
impl Command {
@ -76,6 +84,7 @@ impl Command {
match self {
Command::Index(command) => &command.database_path,
Command::Search(command) => &command.database_path,
Command::ShowUpdates(command) => &command.database_path,
}
}
}
@ -303,6 +312,7 @@ fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<
let reader = env.read_txn().unwrap();
let schema = index.main.schema(&reader)?;
reader.abort();
let schema = schema.ok_or(meilidb_core::Error::SchemaMissing)?;
let fields = command.displayed_fields.iter().map(String::as_str);
@ -418,6 +428,23 @@ fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<
Ok(())
}
fn show_updates_command(
_command: ShowUpdatesCommand,
database: Database,
) -> Result<(), Box<dyn Error>> {
let env = &database.env;
let index = database
.open_index(INDEX_NAME)
.expect("Could not find index");
let reader = env.read_txn().unwrap();
let updates = index.all_updates_status(&reader)?;
println!("{:#?}", updates);
reader.abort();
Ok(())
}
fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
@ -427,5 +454,6 @@ fn main() -> Result<(), Box<dyn Error>> {
match opt {
Command::Index(command) => index_command(command, database),
Command::Search(command) => search_command(command, database),
Command::ShowUpdates(command) => show_updates_command(command, database),
}
}

View File

@ -35,10 +35,13 @@ fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc<ArcSwap
match update::update_task(&mut writer, index.clone()) {
Ok(Some(status)) => {
if status.result.is_ok() {
if let Err(e) = writer.commit() {
error!("update transaction failed: {}", e)
match status.result {
Ok(_) => {
if let Err(e) = writer.commit() {
error!("update transaction failed: {}", e)
}
}
Err(_) => writer.abort(),
}
if let Some(ref callback) = *update_fn.load() {

View File

@ -12,7 +12,6 @@ pub enum Error {
SchemaMissing,
WordIndexMissing,
MissingDocumentId,
DuplicateDocument,
Zlmdb(heed::Error),
Fst(fst::Error),
SerdeJson(SerdeJsonError),
@ -80,7 +79,6 @@ impl fmt::Display for Error {
SchemaMissing => write!(f, "this index does not have a schema"),
WordIndexMissing => write!(f, "this index does not have a word index"),
MissingDocumentId => write!(f, "document id is missing"),
DuplicateDocument => write!(f, "update contains documents with the same id"),
Zlmdb(e) => write!(f, "heed error; {}", e),
Fst(e) => write!(f, "fst error; {}", e),
SerdeJson(e) => write!(f, "serde json error; {}", e),

View File

@ -167,6 +167,7 @@ impl Index {
}
pub fn clear_all(&self, writer: &mut heed::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(());
update::push_clear_all(writer, self.updates, self.updates_results)
}
@ -201,6 +202,20 @@ impl Index {
update::update_status(reader, self.updates, self.updates_results, update_id)
}
pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult<Vec<update::UpdateStatus>> {
match self.updates_results.last_update_id(reader)? {
Some((last_id, _)) => {
let mut updates = Vec::with_capacity(last_id as usize + 1);
for id in 0..=last_id {
let update = self.update_status(reader, id)?;
updates.push(update);
}
Ok(updates)
}
None => Ok(Vec::new()),
}
}
pub fn query_builder(&self) -> QueryBuilder {
QueryBuilder::new(
self.main,

View File

@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use fst::{set::OpBuilder, SetBuilder};
use sdset::{duo::Union, SetOperation};
@ -86,7 +86,7 @@ pub fn apply_documents_addition(
docs_words_store: store::DocsWords,
addition: Vec<serde_json::Value>,
) -> MResult<()> {
let mut documents_ids = HashSet::new();
let mut documents_additions = HashMap::new();
let mut indexer = RawIndexer::new();
let schema = match main_store.schema(writer)? {
@ -97,19 +97,18 @@ pub fn apply_documents_addition(
let identifier = schema.identifier_name();
// 1. store documents ids for future deletion
for document in addition.iter() {
for document in addition {
let document_id = match extract_document_id(identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
if !documents_ids.insert(document_id) {
return Err(Error::DuplicateDocument);
}
documents_additions.insert(document_id, document);
}
// 2. remove the documents posting lists
let number_of_inserted_documents = documents_ids.len();
let number_of_inserted_documents = documents_additions.len();
let documents_ids = documents_additions.iter().map(|(id, _)| *id).collect();
apply_documents_deletion(
writer,
main_store,
@ -117,7 +116,7 @@ pub fn apply_documents_addition(
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
documents_ids.into_iter().collect(),
documents_ids,
)?;
let mut ranked_map = match main_store.ranked_map(writer)? {
@ -126,12 +125,7 @@ pub fn apply_documents_addition(
};
// 3. index the documents fields in the stores
for document in addition {
let document_id = match extract_document_id(identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
for (document_id, document) in documents_additions {
let serializer = Serializer {
txn: writer,
schema: &schema,

View File

@ -47,12 +47,12 @@ pub enum UpdateType {
SynonymsDeletion { number: usize },
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DetailedDuration {
pub main: Duration,
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateResult {
pub update_id: u64,
pub update_type: UpdateType,
@ -60,7 +60,7 @@ pub struct UpdateResult {
pub detailed_duration: DetailedDuration,
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateStatus {
Enqueued,
Processed(UpdateResult),