mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-22 14:21:03 +00:00
Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
32d2cc3aea | |||
8a17fcdda5 | |||
9602d7a960 | |||
ac12a4b9c9 | |||
af96050944 | |||
a43b37dfc1 | |||
c08dcac1d4 |
@ -40,7 +40,7 @@ struct IndexCommand {
|
||||
|
||||
#[derive(Debug, StructOpt)]
|
||||
struct SearchCommand {
|
||||
/// The destination where the database must be created.
|
||||
/// The path of the database to work with.
|
||||
#[structopt(parse(from_os_str))]
|
||||
database_path: PathBuf,
|
||||
|
||||
@ -65,10 +65,18 @@ struct SearchCommand {
|
||||
displayed_fields: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, StructOpt)]
|
||||
struct ShowUpdatesCommand {
|
||||
/// The path of the database to work with.
|
||||
#[structopt(parse(from_os_str))]
|
||||
database_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug, StructOpt)]
|
||||
enum Command {
|
||||
Index(IndexCommand),
|
||||
Search(SearchCommand),
|
||||
ShowUpdates(ShowUpdatesCommand),
|
||||
}
|
||||
|
||||
impl Command {
|
||||
@ -76,6 +84,7 @@ impl Command {
|
||||
match self {
|
||||
Command::Index(command) => &command.database_path,
|
||||
Command::Search(command) => &command.database_path,
|
||||
Command::ShowUpdates(command) => &command.database_path,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -303,6 +312,7 @@ fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<
|
||||
let reader = env.read_txn().unwrap();
|
||||
let schema = index.main.schema(&reader)?;
|
||||
reader.abort();
|
||||
|
||||
let schema = schema.ok_or(meilidb_core::Error::SchemaMissing)?;
|
||||
|
||||
let fields = command.displayed_fields.iter().map(String::as_str);
|
||||
@ -418,6 +428,23 @@ fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn show_updates_command(
|
||||
_command: ShowUpdatesCommand,
|
||||
database: Database,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
let env = &database.env;
|
||||
let index = database
|
||||
.open_index(INDEX_NAME)
|
||||
.expect("Could not find index");
|
||||
|
||||
let reader = env.read_txn().unwrap();
|
||||
let updates = index.all_updates_status(&reader)?;
|
||||
println!("{:#?}", updates);
|
||||
reader.abort();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn Error>> {
|
||||
env_logger::init();
|
||||
|
||||
@ -427,5 +454,6 @@ fn main() -> Result<(), Box<dyn Error>> {
|
||||
match opt {
|
||||
Command::Index(command) => index_command(command, database),
|
||||
Command::Search(command) => search_command(command, database),
|
||||
Command::ShowUpdates(command) => show_updates_command(command, database),
|
||||
}
|
||||
}
|
||||
|
@ -35,11 +35,14 @@ fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc<ArcSwap
|
||||
|
||||
match update::update_task(&mut writer, index.clone()) {
|
||||
Ok(Some(status)) => {
|
||||
if status.result.is_ok() {
|
||||
match status.result {
|
||||
Ok(_) => {
|
||||
if let Err(e) = writer.commit() {
|
||||
error!("update transaction failed: {}", e)
|
||||
}
|
||||
}
|
||||
Err(_) => writer.abort(),
|
||||
}
|
||||
|
||||
if let Some(ref callback) = *update_fn.load() {
|
||||
(callback)(status);
|
||||
|
@ -12,7 +12,6 @@ pub enum Error {
|
||||
SchemaMissing,
|
||||
WordIndexMissing,
|
||||
MissingDocumentId,
|
||||
DuplicateDocument,
|
||||
Zlmdb(heed::Error),
|
||||
Fst(fst::Error),
|
||||
SerdeJson(SerdeJsonError),
|
||||
@ -80,7 +79,6 @@ impl fmt::Display for Error {
|
||||
SchemaMissing => write!(f, "this index does not have a schema"),
|
||||
WordIndexMissing => write!(f, "this index does not have a word index"),
|
||||
MissingDocumentId => write!(f, "document id is missing"),
|
||||
DuplicateDocument => write!(f, "update contains documents with the same id"),
|
||||
Zlmdb(e) => write!(f, "heed error; {}", e),
|
||||
Fst(e) => write!(f, "fst error; {}", e),
|
||||
SerdeJson(e) => write!(f, "serde json error; {}", e),
|
||||
|
@ -167,6 +167,7 @@ impl Index {
|
||||
}
|
||||
|
||||
pub fn clear_all(&self, writer: &mut heed::RwTxn) -> MResult<u64> {
|
||||
let _ = self.updates_notifier.send(());
|
||||
update::push_clear_all(writer, self.updates, self.updates_results)
|
||||
}
|
||||
|
||||
@ -201,6 +202,20 @@ impl Index {
|
||||
update::update_status(reader, self.updates, self.updates_results, update_id)
|
||||
}
|
||||
|
||||
pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult<Vec<update::UpdateStatus>> {
|
||||
match self.updates_results.last_update_id(reader)? {
|
||||
Some((last_id, _)) => {
|
||||
let mut updates = Vec::with_capacity(last_id as usize + 1);
|
||||
for id in 0..=last_id {
|
||||
let update = self.update_status(reader, id)?;
|
||||
updates.push(update);
|
||||
}
|
||||
Ok(updates)
|
||||
}
|
||||
None => Ok(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query_builder(&self) -> QueryBuilder {
|
||||
QueryBuilder::new(
|
||||
self.main,
|
||||
|
@ -1,4 +1,4 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use fst::{set::OpBuilder, SetBuilder};
|
||||
use sdset::{duo::Union, SetOperation};
|
||||
@ -86,7 +86,7 @@ pub fn apply_documents_addition(
|
||||
docs_words_store: store::DocsWords,
|
||||
addition: Vec<serde_json::Value>,
|
||||
) -> MResult<()> {
|
||||
let mut documents_ids = HashSet::new();
|
||||
let mut documents_additions = HashMap::new();
|
||||
let mut indexer = RawIndexer::new();
|
||||
|
||||
let schema = match main_store.schema(writer)? {
|
||||
@ -97,19 +97,18 @@ pub fn apply_documents_addition(
|
||||
let identifier = schema.identifier_name();
|
||||
|
||||
// 1. store documents ids for future deletion
|
||||
for document in addition.iter() {
|
||||
for document in addition {
|
||||
let document_id = match extract_document_id(identifier, &document)? {
|
||||
Some(id) => id,
|
||||
None => return Err(Error::MissingDocumentId),
|
||||
};
|
||||
|
||||
if !documents_ids.insert(document_id) {
|
||||
return Err(Error::DuplicateDocument);
|
||||
}
|
||||
documents_additions.insert(document_id, document);
|
||||
}
|
||||
|
||||
// 2. remove the documents posting lists
|
||||
let number_of_inserted_documents = documents_ids.len();
|
||||
let number_of_inserted_documents = documents_additions.len();
|
||||
let documents_ids = documents_additions.iter().map(|(id, _)| *id).collect();
|
||||
apply_documents_deletion(
|
||||
writer,
|
||||
main_store,
|
||||
@ -117,7 +116,7 @@ pub fn apply_documents_addition(
|
||||
documents_fields_counts_store,
|
||||
postings_lists_store,
|
||||
docs_words_store,
|
||||
documents_ids.into_iter().collect(),
|
||||
documents_ids,
|
||||
)?;
|
||||
|
||||
let mut ranked_map = match main_store.ranked_map(writer)? {
|
||||
@ -126,12 +125,7 @@ pub fn apply_documents_addition(
|
||||
};
|
||||
|
||||
// 3. index the documents fields in the stores
|
||||
for document in addition {
|
||||
let document_id = match extract_document_id(identifier, &document)? {
|
||||
Some(id) => id,
|
||||
None => return Err(Error::MissingDocumentId),
|
||||
};
|
||||
|
||||
for (document_id, document) in documents_additions {
|
||||
let serializer = Serializer {
|
||||
txn: writer,
|
||||
schema: &schema,
|
||||
|
@ -47,12 +47,12 @@ pub enum UpdateType {
|
||||
SynonymsDeletion { number: usize },
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DetailedDuration {
|
||||
pub main: Duration,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct UpdateResult {
|
||||
pub update_id: u64,
|
||||
pub update_type: UpdateType,
|
||||
@ -60,7 +60,7 @@ pub struct UpdateResult {
|
||||
pub detailed_duration: DetailedDuration,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum UpdateStatus {
|
||||
Enqueued,
|
||||
Processed(UpdateResult),
|
||||
|
Reference in New Issue
Block a user