mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-20 21:30:58 +00:00
Compare commits
2 Commits
prototype-
...
ha/2pc
Author | SHA1 | Date | |
---|---|---|---|
01925af1de | |||
782acc5a7d |
@ -1,12 +1,13 @@
|
||||
use std::fs::File;
|
||||
use std::sync::{mpsc, Arc};
|
||||
|
||||
use crate::index::Index;
|
||||
use milli::update::UpdateBuilder;
|
||||
use milli::CompressionType;
|
||||
use rayon::ThreadPool;
|
||||
|
||||
use crate::index_controller::UpdateMeta;
|
||||
use crate::index_controller::{Failed, Processed, Processing};
|
||||
use crate::index_controller::{Aborted, Done, Failed, Processed, Processing};
|
||||
use crate::index_controller::{UpdateMeta, UpdateResult};
|
||||
use crate::option::IndexerOpts;
|
||||
|
||||
pub struct UpdateHandler {
|
||||
@ -54,15 +55,17 @@ impl UpdateHandler {
|
||||
|
||||
pub fn handle_update(
|
||||
&self,
|
||||
channel: mpsc::Sender<(mpsc::Sender<Hello>, Result<Processed, Failed>)>,
|
||||
meta: Processing,
|
||||
content: Option<File>,
|
||||
index: Index,
|
||||
) -> Result<Processed, Failed> {
|
||||
) -> Result<Done, Aborted> {
|
||||
use UpdateMeta::*;
|
||||
|
||||
let update_id = meta.id();
|
||||
|
||||
let update_builder = self.update_builder(update_id);
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
|
||||
let result = match meta.meta() {
|
||||
DocumentsAddition {
|
||||
@ -70,20 +73,47 @@ impl UpdateHandler {
|
||||
format,
|
||||
primary_key,
|
||||
} => index.update_documents(
|
||||
&mut wtxn,
|
||||
*format,
|
||||
*method,
|
||||
content,
|
||||
update_builder,
|
||||
primary_key.as_deref(),
|
||||
),
|
||||
ClearDocuments => index.clear_documents(update_builder),
|
||||
DeleteDocuments { ids } => index.delete_documents(ids, update_builder),
|
||||
Settings(settings) => index.update_settings(&settings.clone().check(), update_builder),
|
||||
ClearDocuments => index.clear_documents(&mut wtxn, update_builder),
|
||||
DeleteDocuments { ids } => index.delete_documents(&mut wtxn, ids, update_builder),
|
||||
Settings(settings) => {
|
||||
index.update_settings(&mut wtxn, &settings.clone().check(), update_builder)
|
||||
}
|
||||
};
|
||||
|
||||
match result {
|
||||
let result = match result {
|
||||
Ok(result) => Ok(meta.process(result)),
|
||||
Err(e) => Err(meta.fail(e.into())),
|
||||
};
|
||||
|
||||
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
channel.send((sender, result));
|
||||
|
||||
// here we should decide how we want to handle a failure. probably by closing the channel
|
||||
// right: for now I'm just going to panic
|
||||
|
||||
let meta = result.unwrap();
|
||||
|
||||
match receiver.recv() {
|
||||
Ok(Hello::Abort) => Err(meta.abort()),
|
||||
Ok(Hello::Commit) => wtxn
|
||||
.commit()
|
||||
.map(|ok| meta.commit())
|
||||
.map_err(|e| meta.abort()),
|
||||
Err(e) => panic!("update actor died {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// MARIN: I can't find any good name for this and I'm not even sure we need a new enum
|
||||
pub enum Hello {
|
||||
Commit,
|
||||
Abort,
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ use std::marker::PhantomData;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use flate2::read::GzDecoder;
|
||||
use heed::RwTxn;
|
||||
use log::{debug, info, trace};
|
||||
use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder, UpdateFormat};
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
@ -160,24 +161,17 @@ pub struct Facets {
|
||||
}
|
||||
|
||||
impl Index {
|
||||
pub fn update_documents(
|
||||
&self,
|
||||
pub fn update_documents<'a>(
|
||||
&'a self,
|
||||
txn: &mut RwTxn<'a, 'a>,
|
||||
format: UpdateFormat,
|
||||
method: IndexDocumentsMethod,
|
||||
content: Option<impl io::Read>,
|
||||
update_builder: UpdateBuilder,
|
||||
primary_key: Option<&str>,
|
||||
) -> Result<UpdateResult> {
|
||||
let mut txn = self.write_txn()?;
|
||||
let result = self.update_documents_txn(
|
||||
&mut txn,
|
||||
format,
|
||||
method,
|
||||
content,
|
||||
update_builder,
|
||||
primary_key,
|
||||
)?;
|
||||
txn.commit()?;
|
||||
let result =
|
||||
self.update_documents_txn(txn, format, method, content, update_builder, primary_key)?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
@ -220,16 +214,14 @@ impl Index {
|
||||
Ok(UpdateResult::DocumentsAddition(addition))
|
||||
}
|
||||
|
||||
pub fn clear_documents(&self, update_builder: UpdateBuilder) -> Result<UpdateResult> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = self.write_txn()?;
|
||||
let builder = update_builder.clear_documents(&mut wtxn, self);
|
||||
|
||||
pub fn clear_documents<'a>(
|
||||
&'a self,
|
||||
wtxn: &mut RwTxn<'a, 'a>,
|
||||
update_builder: UpdateBuilder,
|
||||
) -> Result<UpdateResult> {
|
||||
let builder = update_builder.clear_documents(wtxn, self);
|
||||
let _count = builder.execute()?;
|
||||
|
||||
wtxn.commit()
|
||||
.and(Ok(UpdateResult::Other))
|
||||
.map_err(Into::into)
|
||||
Ok(UpdateResult::Other)
|
||||
}
|
||||
|
||||
pub fn update_settings_txn<'a, 'b>(
|
||||
@ -302,8 +294,9 @@ impl Index {
|
||||
Ok(UpdateResult::Other)
|
||||
}
|
||||
|
||||
pub fn update_settings(
|
||||
&self,
|
||||
pub fn update_settings<'a>(
|
||||
&'a self,
|
||||
txn: &mut RwTxn<'a, 'a>,
|
||||
settings: &Settings<Checked>,
|
||||
update_builder: UpdateBuilder,
|
||||
) -> Result<UpdateResult> {
|
||||
@ -313,12 +306,12 @@ impl Index {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn delete_documents(
|
||||
&self,
|
||||
pub fn delete_documents<'a>(
|
||||
&'a self,
|
||||
txn: &mut RwTxn<'a, 'a>,
|
||||
document_ids: &[String],
|
||||
update_builder: UpdateBuilder,
|
||||
) -> Result<UpdateResult> {
|
||||
let mut txn = self.write_txn()?;
|
||||
let mut builder = update_builder.delete_documents(&mut txn, self)?;
|
||||
|
||||
// We ignore unexisting document ids
|
||||
@ -327,9 +320,7 @@ impl Index {
|
||||
});
|
||||
|
||||
let deleted = builder.execute()?;
|
||||
txn.commit()
|
||||
.and(Ok(UpdateResult::DocumentDeletion { deleted }))
|
||||
.map_err(Into::into)
|
||||
Ok(UpdateResult::DocumentDeletion { deleted })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,16 +7,16 @@ use futures::stream::StreamExt;
|
||||
use heed::CompactionOption;
|
||||
use log::debug;
|
||||
use milli::update::UpdateBuilder;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::{fs, sync::mpsc};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index::update_handler::Hello;
|
||||
use crate::index::{
|
||||
update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings,
|
||||
};
|
||||
use crate::index_controller::{
|
||||
get_arc_ownership_blocking, Failed, IndexStats, Processed, Processing,
|
||||
};
|
||||
use crate::index_controller::{Aborted, Done, Failed, IndexStats, Processed, Processing, get_arc_ownership_blocking};
|
||||
use crate::option::IndexerOpts;
|
||||
|
||||
use super::error::{IndexActorError, Result};
|
||||
@ -81,11 +81,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
|
||||
}
|
||||
Update {
|
||||
ret,
|
||||
channel,
|
||||
meta,
|
||||
data,
|
||||
uuid,
|
||||
} => {
|
||||
let _ = ret.send(self.handle_update(uuid, meta, data).await);
|
||||
let _ = ret.send(self.handle_update(channel, uuid, meta, data).await);
|
||||
}
|
||||
Search { ret, query, uuid } => {
|
||||
let _ = ret.send(self.handle_search(uuid, query).await);
|
||||
@ -163,10 +164,11 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
|
||||
|
||||
async fn handle_update(
|
||||
&self,
|
||||
channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender<Hello>, std::result::Result<Processed, Failed>)>,
|
||||
uuid: Uuid,
|
||||
meta: Processing,
|
||||
data: Option<File>,
|
||||
) -> Result<std::result::Result<Processed, Failed>> {
|
||||
) -> Result<std::result::Result<Done, Aborted>> {
|
||||
debug!("Processing update {}", meta.id());
|
||||
let update_handler = self.update_handler.clone();
|
||||
let index = match self.store.get(uuid).await? {
|
||||
@ -174,7 +176,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
|
||||
None => self.store.create(uuid, None).await?,
|
||||
};
|
||||
|
||||
Ok(spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?)
|
||||
Ok(spawn_blocking(move || update_handler.handle_update(channel, meta, data, index)).await?)
|
||||
}
|
||||
|
||||
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {
|
||||
|
@ -4,10 +4,7 @@ use std::path::{Path, PathBuf};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
index::Checked,
|
||||
index_controller::{IndexSettings, IndexStats, Processing},
|
||||
};
|
||||
use crate::{index::{Checked, update_handler::Hello}, index_controller::{IndexSettings, IndexStats, Processing}};
|
||||
use crate::{
|
||||
index::{Document, SearchQuery, SearchResult, Settings},
|
||||
index_controller::{Failed, Processed},
|
||||
@ -36,13 +33,15 @@ impl IndexActorHandle for IndexActorHandleImpl {
|
||||
|
||||
async fn update(
|
||||
&self,
|
||||
channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender<Hello>, std::result::Result<Processed, Failed>)>,
|
||||
uuid: Uuid,
|
||||
meta: Processing,
|
||||
data: Option<std::fs::File>,
|
||||
) -> Result<std::result::Result<Processed, Failed>> {
|
||||
) -> Result<std::result::Result<(Processed, oneshot::Sender<()>), Failed>> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = IndexMsg::Update {
|
||||
ret,
|
||||
channel,
|
||||
meta,
|
||||
data,
|
||||
uuid,
|
||||
|
@ -4,6 +4,7 @@ use tokio::sync::oneshot;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::error::Result as IndexResult;
|
||||
use crate::index::update_handler::Hello;
|
||||
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
|
||||
use crate::index_controller::{Failed, IndexStats, Processed, Processing};
|
||||
|
||||
@ -18,9 +19,10 @@ pub enum IndexMsg {
|
||||
},
|
||||
Update {
|
||||
uuid: Uuid,
|
||||
channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender<Hello>, std::result::Result<Processed, Failed>)>,
|
||||
meta: Processing,
|
||||
data: Option<std::fs::File>,
|
||||
ret: oneshot::Sender<IndexResult<Result<Processed, Failed>>>,
|
||||
ret: oneshot::Sender<IndexResult<Result<(Processed, oneshot::Sender<()>), Failed>>>,
|
||||
},
|
||||
Search {
|
||||
uuid: Uuid,
|
||||
|
@ -13,6 +13,7 @@ pub use handle_impl::IndexActorHandleImpl;
|
||||
use message::IndexMsg;
|
||||
use store::{IndexStore, MapIndexStore};
|
||||
|
||||
use crate::index::update_handler::Hello;
|
||||
use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings};
|
||||
use crate::index_controller::{Failed, IndexStats, Processed, Processing};
|
||||
use error::Result;
|
||||
@ -57,6 +58,7 @@ pub trait IndexActorHandle {
|
||||
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>;
|
||||
async fn update(
|
||||
&self,
|
||||
channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender<Hello>, std::result::Result<Processed, Failed>)>,
|
||||
uuid: Uuid,
|
||||
meta: Processing,
|
||||
data: Option<File>,
|
||||
|
@ -2,6 +2,7 @@ mod codec;
|
||||
pub mod dump;
|
||||
|
||||
use std::fs::{copy, create_dir_all, remove_file, File};
|
||||
use std::io::BufRead;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
@ -29,6 +30,7 @@ use codec::*;
|
||||
use super::error::Result;
|
||||
use super::UpdateMeta;
|
||||
use crate::helpers::EnvSizer;
|
||||
use crate::index::update_handler::Hello;
|
||||
use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle};
|
||||
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
@ -329,13 +331,29 @@ impl UpdateStore {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let (sender, receiver) = std::sync::mpsc::channel();
|
||||
// Process the pending update using the provided user function.
|
||||
let handle = Handle::current();
|
||||
let result =
|
||||
match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) {
|
||||
Ok(result) => result,
|
||||
Err(e) => Err(processing.fail(e.into())),
|
||||
let handle =
|
||||
tokio::task::spawn(index_handle.update(sender, index_uuid, processing.clone(), file));
|
||||
|
||||
let (sender2, receiver) = std::sync::mpsc::channel();
|
||||
// TODO: we should not panic here
|
||||
let (sender, result) = receiver.recv().unwrap();
|
||||
let mut line = String::new();
|
||||
loop {
|
||||
std::io::stdin().lock().read_line(&mut line).unwrap();
|
||||
match line.as_str() {
|
||||
"commit" => {
|
||||
sender.send((sender2, Hello::Commit));
|
||||
break;
|
||||
}
|
||||
"abort" => {
|
||||
sender.send((sender2, Hello::Abort));
|
||||
break;
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
|
||||
// Once the pending update have been successfully processed
|
||||
// we must remove the content from the pending and processing stores and
|
||||
|
@ -73,6 +73,8 @@ impl Enqueued {
|
||||
}
|
||||
}
|
||||
|
||||
/// This state indicate that we were able to process the update successfully. Now we are waiting
|
||||
/// for the user to `commit` his change
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Processed {
|
||||
@ -90,8 +92,55 @@ impl Processed {
|
||||
pub fn meta(&self) -> &UpdateMeta {
|
||||
self.from.meta()
|
||||
}
|
||||
|
||||
/// The commit was made successfully and we can move to our last state
|
||||
pub fn commit(self) -> Done {
|
||||
Done {
|
||||
success: self.success,
|
||||
from: self.from,
|
||||
processed_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// The commit failed
|
||||
pub fn fail(self, error: ResponseError) -> Failed {
|
||||
Failed {
|
||||
from: self.from, // MARIN: maybe we should update Failed so it can fail from the processed state?
|
||||
error,
|
||||
failed_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// The update was aborted
|
||||
pub fn abort(self) -> Aborted {
|
||||
Aborted {
|
||||
from: self.from.from, // MARIN: maybe we should update Aborted so it can fail from the processed state?
|
||||
aborted_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Final state: everything went well
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Done {
|
||||
pub success: UpdateResult,
|
||||
pub processed_at: DateTime<Utc>,
|
||||
#[serde(flatten)]
|
||||
pub from: Processing,
|
||||
}
|
||||
|
||||
impl Done {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &UpdateMeta {
|
||||
self.from.meta()
|
||||
}
|
||||
}
|
||||
|
||||
/// The update is being handled by milli. It can fail but not be aborted.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Processing {
|
||||
@ -126,6 +175,7 @@ impl Processing {
|
||||
}
|
||||
}
|
||||
|
||||
/// Final state: The update has been aborted.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Aborted {
|
||||
@ -144,6 +194,7 @@ impl Aborted {
|
||||
}
|
||||
}
|
||||
|
||||
/// Final state: The update failed to process or commit correctly.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Failed {
|
||||
@ -169,6 +220,7 @@ pub enum UpdateStatus {
|
||||
Processing(Processing),
|
||||
Enqueued(Enqueued),
|
||||
Processed(Processed),
|
||||
Done(Done),
|
||||
Aborted(Aborted),
|
||||
Failed(Failed),
|
||||
}
|
||||
@ -179,6 +231,7 @@ impl UpdateStatus {
|
||||
UpdateStatus::Processing(u) => u.id(),
|
||||
UpdateStatus::Enqueued(u) => u.id(),
|
||||
UpdateStatus::Processed(u) => u.id(),
|
||||
UpdateStatus::Done(u) => u.id(),
|
||||
UpdateStatus::Aborted(u) => u.id(),
|
||||
UpdateStatus::Failed(u) => u.id(),
|
||||
}
|
||||
@ -189,6 +242,7 @@ impl UpdateStatus {
|
||||
UpdateStatus::Processing(u) => u.meta(),
|
||||
UpdateStatus::Enqueued(u) => u.meta(),
|
||||
UpdateStatus::Processed(u) => u.meta(),
|
||||
UpdateStatus::Done(u) => u.meta(),
|
||||
UpdateStatus::Aborted(u) => u.meta(),
|
||||
UpdateStatus::Failed(u) => u.meta(),
|
||||
}
|
||||
@ -214,15 +268,21 @@ impl From<Aborted> for UpdateStatus {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Processing> for UpdateStatus {
|
||||
fn from(other: Processing) -> Self {
|
||||
Self::Processing(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Processed> for UpdateStatus {
|
||||
fn from(other: Processed) -> Self {
|
||||
Self::Processed(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Processing> for UpdateStatus {
|
||||
fn from(other: Processing) -> Self {
|
||||
Self::Processing(other)
|
||||
impl From<Done> for UpdateStatus {
|
||||
fn from(other: Done) -> Self {
|
||||
Self::Done(other)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,6 +132,10 @@ pub enum UpdateStatusResponse {
|
||||
#[serde(flatten)]
|
||||
content: ProcessedUpdateResult,
|
||||
},
|
||||
Done {
|
||||
#[serde(flatten)]
|
||||
content: ProcessedUpdateResult,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<UpdateStatus> for UpdateStatusResponse {
|
||||
@ -175,6 +179,24 @@ impl From<UpdateStatus> for UpdateStatusResponse {
|
||||
};
|
||||
UpdateStatusResponse::Processed { content }
|
||||
}
|
||||
UpdateStatus::Done(done) => {
|
||||
let duration = done
|
||||
.processed_at
|
||||
.signed_duration_since(done.from.started_processing_at)
|
||||
.num_milliseconds();
|
||||
|
||||
// necessary since chrono::duration don't expose a f64 secs method.
|
||||
let duration = Duration::from_millis(duration as u64).as_secs_f64();
|
||||
|
||||
let content = ProcessedUpdateResult {
|
||||
update_id: done.id(),
|
||||
update_type,
|
||||
duration,
|
||||
enqueued_at: done.from.from.enqueued_at,
|
||||
processed_at: done.processed_at,
|
||||
};
|
||||
UpdateStatusResponse::Processed { content }
|
||||
}
|
||||
UpdateStatus::Aborted(_) => unreachable!(),
|
||||
UpdateStatus::Failed(failed) => {
|
||||
let duration = failed
|
||||
|
Reference in New Issue
Block a user