109: Make updates atomic r=curquiza a=MarinPostma

Until now, the index_uid->uuid mapping was done before the update was written to disk in the case of automatic index creation. This was an issue when the update failed, and the index would still exists in the uuid resolver.

This is fixed by this pr, by first creating the update with an uuid if the index does not exist, and then register this uuid to the uuid resolver.

This is preliminary work to the implementation of snapshots (#19).

This pr also changes the `resolve` method on the `UuidResolver` to `get` to make it clearer.


The `create_uuid` method may be bound to disappear when the index name resolution is handled by a remote machine.

Co-authored-by: mpostma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot]
2021-03-24 12:24:32 +00:00
committed by GitHub
2 changed files with 127 additions and 87 deletions

View File

@@ -16,10 +16,12 @@ use milli::update::{IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::time::sleep;
use uuid::Uuid;
use crate::index::{Document, SearchQuery, SearchResult};
use crate::index::{Facets, Settings, UpdateResult};
pub use updates::{Failed, Processed, Processing};
use uuid_resolver::UuidError;
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
@@ -80,10 +82,10 @@ impl IndexController {
uid: String,
method: milli::update::IndexDocumentsMethod,
format: milli::update::UpdateFormat,
mut payload: Payload,
payload: Payload,
primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus> {
let uuid = self.uuid_resolver.get_or_create(uid).await?;
let perform_update = |uuid| async move {
let meta = UpdateMeta::DocumentsAddition {
method,
format,
@@ -91,30 +93,40 @@ impl IndexController {
};
let (sender, receiver) = mpsc::channel(10);
// It is necessary to spawn a local task to senf the payload to the update handle to
// It is necessary to spawn a local task to send the payload to the update handle to
// prevent dead_locking between the update_handle::update that waits for the update to be
// registered and the update_actor that waits for the the payload to be sent to it.
tokio::task::spawn_local(async move {
while let Some(bytes) = payload.next().await {
match bytes {
Ok(bytes) => {
let _ = sender.send(Ok(bytes)).await;
}
Err(e) => {
let error: Box<dyn std::error::Error + Sync + Send + 'static> = Box::new(e);
let _ = sender.send(Err(error)).await;
}
}
}
payload
.map(|bytes| {
bytes.map_err(|e| {
Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
})
})
.for_each(|r| async {
let _ = sender.send(r).await;
})
.await
});
// This must be done *AFTER* spawning the task.
let status = self.update_handle.update(meta, receiver, uuid).await?;
self.update_handle.update(meta, receiver, uuid).await
};
match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_update(uuid).await?),
Err(UuidError::UnexistingIndex(name)) => {
let uuid = Uuid::new_v4();
let status = perform_update(uuid).await?;
self.uuid_resolver.insert(name, uuid).await?;
Ok(status)
}
Err(e) => Err(e.into()),
}
}
pub async fn clear_documents(&self, uid: String) -> anyhow::Result<UpdateStatus> {
let uuid = self.uuid_resolver.resolve(uid).await?;
let uuid = self.uuid_resolver.get(uid).await?;
let meta = UpdateMeta::ClearDocuments;
let (_, receiver) = mpsc::channel(1);
let status = self.update_handle.update(meta, receiver, uuid).await?;
@@ -126,7 +138,7 @@ impl IndexController {
uid: String,
document_ids: Vec<String>,
) -> anyhow::Result<UpdateStatus> {
let uuid = self.uuid_resolver.resolve(uid).await?;
let uuid = self.uuid_resolver.get(uid).await?;
let meta = UpdateMeta::DeleteDocuments;
let (sender, receiver) = mpsc::channel(10);
@@ -146,27 +158,24 @@ impl IndexController {
settings: Settings,
create: bool,
) -> anyhow::Result<UpdateStatus> {
let uuid = if create {
let uuid = self.uuid_resolver.get_or_create(uid).await?;
// We need to create the index upfront, since it would otherwise only be created when
// the update is processed. This would make calls to GET index to fail until the update
// is complete. Since this is get or create, we ignore the error when the index already
// exists.
match self.index_handle.create_index(uuid, None).await {
Ok(_) | Err(index_actor::IndexError::IndexAlreadyExists) => (),
Err(e) => return Err(e.into()),
}
uuid
} else {
self.uuid_resolver.resolve(uid).await?
};
let perform_udpate = |uuid| async move {
let meta = UpdateMeta::Settings(settings);
// Nothing so send, drop the sender right away, as not to block the update actor.
let (_, receiver) = mpsc::channel(1);
self.update_handle.update(meta, receiver, uuid).await
};
let status = self.update_handle.update(meta, receiver, uuid).await?;
match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_udpate(uuid).await?),
Err(UuidError::UnexistingIndex(name)) if create => {
let uuid = Uuid::new_v4();
let status = perform_udpate(uuid).await?;
self.uuid_resolver.insert(name, uuid).await?;
Ok(status)
}
Err(e) => Err(e.into()),
}
}
pub async fn create_index(
&self,
@@ -177,7 +186,11 @@ impl IndexController {
let uuid = self.uuid_resolver.create(uid.clone()).await?;
let meta = self.index_handle.create_index(uuid, primary_key).await?;
let _ = self.update_handle.create(uuid).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta };
let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
Ok(meta)
}
@@ -190,13 +203,13 @@ impl IndexController {
}
pub async fn update_status(&self, uid: String, id: u64) -> anyhow::Result<UpdateStatus> {
let uuid = self.uuid_resolver.resolve(uid).await?;
let uuid = self.uuid_resolver.get(uid).await?;
let result = self.update_handle.update_status(uuid, id).await?;
Ok(result)
}
pub async fn all_update_status(&self, uid: String) -> anyhow::Result<Vec<UpdateStatus>> {
let uuid = self.uuid_resolver.resolve(uid).await?;
let uuid = self.uuid_resolver.get(uid).await?;
let result = self.update_handle.get_all_updates_status(uuid).await?;
Ok(result)
}
@@ -208,7 +221,11 @@ impl IndexController {
for (uid, uuid) in uuids {
let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta };
let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
ret.push(meta);
}
@@ -216,7 +233,7 @@ impl IndexController {
}
pub async fn settings(&self, uid: String) -> anyhow::Result<Settings> {
let uuid = self.uuid_resolver.resolve(uid.clone()).await?;
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let settings = self.index_handle.settings(uuid).await?;
Ok(settings)
}
@@ -228,7 +245,7 @@ impl IndexController {
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Vec<Document>> {
let uuid = self.uuid_resolver.resolve(uid.clone()).await?;
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let documents = self
.index_handle
.documents(uuid, offset, limit, attributes_to_retrieve)
@@ -242,7 +259,7 @@ impl IndexController {
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Document> {
let uuid = self.uuid_resolver.resolve(uid.clone()).await?;
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let document = self
.index_handle
.document(uuid, doc_id, attributes_to_retrieve)
@@ -259,22 +276,30 @@ impl IndexController {
bail!("Can't change the index uid.")
}
let uuid = self.uuid_resolver.resolve(uid.clone()).await?;
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let meta = self.index_handle.update_index(uuid, index_settings).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta };
let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
Ok(meta)
}
pub async fn search(&self, uid: String, query: SearchQuery) -> anyhow::Result<SearchResult> {
let uuid = self.uuid_resolver.resolve(uid).await?;
let uuid = self.uuid_resolver.get(uid).await?;
let result = self.index_handle.search(uuid, query).await?;
Ok(result)
}
pub async fn get_index(&self, uid: String) -> anyhow::Result<IndexMetadata> {
let uuid = self.uuid_resolver.resolve(uid.clone()).await?;
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta };
let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
Ok(meta)
}
}

View File

@@ -13,11 +13,7 @@ pub type Result<T> = std::result::Result<T, UuidError>;
#[derive(Debug)]
enum UuidResolveMsg {
Resolve {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
GetOrCreate {
Get {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
@@ -32,6 +28,11 @@ enum UuidResolveMsg {
List {
ret: oneshot::Sender<Result<Vec<(String, Uuid)>>>,
},
Insert {
uuid: Uuid,
name: String,
ret: oneshot::Sender<Result<()>>,
}
}
struct UuidResolverActor<S> {
@@ -54,11 +55,8 @@ impl<S: UuidStore> UuidResolverActor<S> {
Some(Create { uid: name, ret }) => {
let _ = ret.send(self.handle_create(name).await);
}
Some(GetOrCreate { uid: name, ret }) => {
let _ = ret.send(self.handle_get_or_create(name).await);
}
Some(Resolve { uid: name, ret }) => {
let _ = ret.send(self.handle_resolve(name).await);
Some(Get { uid: name, ret }) => {
let _ = ret.send(self.handle_get(name).await);
}
Some(Delete { uid: name, ret }) => {
let _ = ret.send(self.handle_delete(name).await);
@@ -66,6 +64,9 @@ impl<S: UuidStore> UuidResolverActor<S> {
Some(List { ret }) => {
let _ = ret.send(self.handle_list().await);
}
Some(Insert { ret, uuid, name }) => {
let _ = ret.send(self.handle_insert(name, uuid).await);
}
// all senders have been dropped, need to quit.
None => break,
}
@@ -81,14 +82,7 @@ impl<S: UuidStore> UuidResolverActor<S> {
self.store.create_uuid(uid, true).await
}
async fn handle_get_or_create(&self, uid: String) -> Result<Uuid> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
}
self.store.create_uuid(uid, false).await
}
async fn handle_resolve(&self, uid: String) -> Result<Uuid> {
async fn handle_get(&self, uid: String) -> Result<Uuid> {
self.store
.get_uuid(uid.clone())
.await?
@@ -106,6 +100,14 @@ impl<S: UuidStore> UuidResolverActor<S> {
let result = self.store.list().await?;
Ok(result)
}
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
}
self.store.insert(uid, uuid).await?;
Ok(())
}
}
fn is_index_uid_valid(uid: &str) -> bool {
@@ -127,18 +129,9 @@ impl UuidResolverHandle {
Ok(Self { sender })
}
pub async fn resolve(&self, name: String) -> anyhow::Result<Uuid> {
pub async fn get(&self, name: String) -> Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Resolve { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
pub async fn get_or_create(&self, name: String) -> Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::GetOrCreate { uid: name, ret };
let msg = UuidResolveMsg::Get { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
@@ -171,6 +164,15 @@ impl UuidResolverHandle {
.await
.expect("Uuid resolver actor has been killed")?)
}
pub async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Insert { ret, name, uuid };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
}
#[derive(Debug, Error)]
@@ -197,6 +199,7 @@ trait UuidStore {
async fn get_uuid(&self, uid: String) -> Result<Option<Uuid>>;
async fn delete(&self, uid: String) -> Result<Option<Uuid>>;
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
}
struct HeedUuidStore {
@@ -292,4 +295,16 @@ impl UuidStore for HeedUuidStore {
})
.await?
}
async fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(())
})
.await?
}
}