feat(http): store processing as RwLock<Option<Uuid>> in index_actor

This commit is contained in:
Alexey Shekhirin
2021-04-07 18:57:46 +03:00
parent 87412f63ef
commit 698a1ea582
2 changed files with 28 additions and 19 deletions

View File

@@ -8,7 +8,7 @@ use futures::pin_mut;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use heed::CompactionOption; use heed::CompactionOption;
use log::debug; use log::debug;
use tokio::sync::mpsc; use tokio::sync::{mpsc, RwLock};
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use uuid::Uuid; use uuid::Uuid;
@@ -25,6 +25,7 @@ pub struct IndexActor<S> {
read_receiver: Option<mpsc::Receiver<IndexMsg>>, read_receiver: Option<mpsc::Receiver<IndexMsg>>,
write_receiver: Option<mpsc::Receiver<IndexMsg>>, write_receiver: Option<mpsc::Receiver<IndexMsg>>,
update_handler: Arc<UpdateHandler>, update_handler: Arc<UpdateHandler>,
processing: RwLock<Option<Uuid>>,
store: S, store: S,
} }
@@ -42,8 +43,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(Self { Ok(Self {
read_receiver, read_receiver,
write_receiver, write_receiver,
store,
update_handler, update_handler,
processing: RwLock::new(Default::default()),
store,
}) })
} }
@@ -181,16 +183,26 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
meta: Processing<UpdateMeta>, meta: Processing<UpdateMeta>,
data: File, data: File,
) -> Result<UpdateResult> { ) -> Result<UpdateResult> {
let uuid = meta.index_uuid().clone();
*self.processing.write().await = Some(uuid);
let result = {
debug!("Processing update {}", meta.id()); debug!("Processing update {}", meta.id());
let uuid = meta.index_uuid();
let update_handler = self.update_handler.clone(); let update_handler = self.update_handler.clone();
let index = match self.store.get(*uuid).await? { let index = match self.store.get(uuid).await? {
Some(index) => index, Some(index) => index,
None => self.store.create(*uuid, None).await?, None => self.store.create(uuid, None).await?,
}; };
spawn_blocking(move || update_handler.handle_update(meta, data, index)) spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await .await
.map_err(|e| IndexError::Error(e.into())) .map_err(|e| IndexError::Error(e.into()))
};
*self.processing.write().await = None;
result
} }
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> { async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
@@ -342,13 +354,16 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.await? .await?
.ok_or(IndexError::UnexistingIndex)?; .ok_or(IndexError::UnexistingIndex)?;
let processing = self.processing.read().await;
let is_indexing = *processing == Some(uuid);
spawn_blocking(move || { spawn_blocking(move || {
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
Ok(IndexStats { Ok(IndexStats {
size: index.size()?, size: index.size()?,
number_of_documents: index.number_of_documents(&rtxn)?, number_of_documents: index.number_of_documents(&rtxn)?,
is_indexing: false, // We set this field in src/index_controller/mod.rs get_stats is_indexing,
fields_distribution: index.fields_distribution(&rtxn)?, fields_distribution: index.fields_distribution(&rtxn)?,
}) })
}) })

View File

@@ -354,13 +354,7 @@ impl IndexController {
pub async fn get_stats(&self, uid: String) -> anyhow::Result<IndexStats> { pub async fn get_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
let uuid = self.uuid_resolver.get(uid.clone()).await?; let uuid = self.uuid_resolver.get(uid.clone()).await?;
let stats = self.index_handle.get_index_stats(uuid); Ok(self.index_handle.get_index_stats(uuid).await?)
let is_indexing = self.update_handle.is_locked(uuid);
Ok(IndexStats {
is_indexing: is_indexing.await?,
..stats.await?
})
} }
} }