Compare commits

...

17 Commits

17 changed files with 1520 additions and 157 deletions

129
Cargo.lock generated
View File

@ -252,7 +252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8"
dependencies = [
"cfg-if",
"cipher",
"cipher 0.3.0",
"cpufeatures",
"opaque-debug",
]
@ -523,6 +523,17 @@ version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
[[package]]
name = "bus"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80cb4625f5b60155ff1018c9d4ce2e38bf5ae3e5780dfab9fa68bb44a6b751e2"
dependencies = [
"crossbeam-channel",
"num_cpus",
"parking_lot_core",
]
[[package]]
name = "byte-unit"
version = "4.0.18"
@ -641,6 +652,17 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7fc89c7c5b9e7a02dfe45cd2367bae382f9ed31c61ca8debe5f827c420a2f08"
dependencies = [
"cfg-if",
"cipher 0.4.4",
"cpufeatures",
]
[[package]]
name = "change-detection"
version = "1.2.0"
@ -712,6 +734,16 @@ dependencies = [
"generic-array",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "3.2.23"
@ -770,6 +802,24 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "cluster"
version = "1.1.0"
dependencies = [
"bus",
"crossbeam",
"ductile",
"log",
"meilisearch-types",
"roaring",
"serde",
"serde_json",
"synchronoise",
"thiserror",
"time",
"uuid 1.3.0",
]
[[package]]
name = "concat-arrays"
version = "0.1.2"
@ -1148,6 +1198,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "ductile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cde25956886749c891a27249630ae99471f1ba05c4a924aad1a6ffe6932812"
dependencies = [
"anyhow",
"bincode",
"chacha20",
"crossbeam-channel",
"log",
"rand",
"serde",
]
[[package]]
name = "dump"
version = "1.1.0"
@ -1170,14 +1235,14 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
name = "either"
version = "1.8.0"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
dependencies = [
"serde",
]
@ -1376,7 +1441,7 @@ dependencies = [
"faux",
"tempfile",
"thiserror",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -1895,6 +1960,7 @@ dependencies = [
"anyhow",
"big_s",
"bincode",
"cluster",
"crossbeam",
"csv",
"derive_builder",
@ -1915,7 +1981,7 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -1929,6 +1995,15 @@ dependencies = [
"serde",
]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]]
name = "insta"
version = "1.26.0"
@ -2473,6 +2548,7 @@ dependencies = [
"bytes",
"cargo_toml",
"clap 4.0.32",
"cluster",
"crossbeam-channel",
"deserr",
"dump",
@ -2533,7 +2609,7 @@ dependencies = [
"tokio-stream",
"toml",
"urlencoding",
"uuid 1.2.2",
"uuid 1.3.0",
"vergen",
"walkdir",
"yaup",
@ -2545,6 +2621,7 @@ name = "meilisearch-auth"
version = "1.1.0"
dependencies = [
"base64 0.13.1",
"cluster",
"enum-iterator",
"hmac",
"maplit",
@ -2556,7 +2633,7 @@ dependencies = [
"sha2",
"thiserror",
"time",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -2586,7 +2663,7 @@ dependencies = [
"thiserror",
"time",
"tokio",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -2661,7 +2738,7 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -3499,9 +3576,9 @@ checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a"
[[package]]
name = "serde"
version = "1.0.152"
version = "1.0.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
checksum = "71f2b4817415c6d4210bfe1c7bfcf4801b2d904cb4d0e1a8fdb651013c9e86b8"
dependencies = [
"serde_derive",
]
@ -3517,9 +3594,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.152"
version = "1.0.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
checksum = "d071a94a3fac4aff69d023a7f411e33f40f3483f8c5190b1953822b6b76d7630"
dependencies = [
"proc-macro2",
"quote",
@ -3528,9 +3605,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.91"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea"
dependencies = [
"indexmap",
"itoa 1.0.5",
@ -3816,18 +3893,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.38"
version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
checksum = "a5ab016db510546d856297882807df8da66a16fb8c4101cb8b30054b0d5b2d9c"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.38"
version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e"
dependencies = [
"proc-macro2",
"quote",
@ -3836,9 +3913,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.17"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376"
checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890"
dependencies = [
"itoa 1.0.5",
"serde",
@ -3854,9 +3931,9 @@ checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd"
[[package]]
name = "time-macros"
version = "0.2.6"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2"
checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36"
dependencies = [
"time-core",
]
@ -4101,9 +4178,9 @@ dependencies = [
[[package]]
name = "uuid"
version = "1.2.2"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c"
checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79"
dependencies = [
"getrandom",
"serde",

View File

@ -9,6 +9,7 @@ members = [
"dump",
"file-store",
"permissive-json-pointer",
"cluster",
"milli",
"filter-parser",
"flatten-serde-json",

25
cluster/Cargo.toml Normal file
View File

@ -0,0 +1,25 @@
[package]
name = "cluster"
publish = false
version.workspace = true
authors.workspace = true
description.workspace = true
homepage.workspace = true
readme.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
ductile = "0.3.0"
serde = { version = "1.0.155", features = ["derive"] }
serde_json = "1.0.94"
thiserror = "1.0.39"
meilisearch-types = { path = "../meilisearch-types" }
roaring = { version = "0.10.1", features = ["serde"] }
log = "0.4.17"
crossbeam = "0.8.2"
bus = "2.3.0"
time = "0.3.20"
uuid = { version = "1.3.0", features = ["v4"] }
synchronoise = "1.0.1"

148
cluster/src/batch.rs Normal file
View File

@ -0,0 +1,148 @@
use meilisearch_types::milli::update::IndexDocumentsMethod;
use meilisearch_types::settings::{Settings, Unchecked};
use meilisearch_types::tasks::TaskId;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
/// [`self.ids()`](Batch::ids)), as well as additional information on how to
/// be processed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Batch {
TaskCancelation {
/// The task cancelation itself.
task: TaskId,
/// The date and time at which the previously processing tasks started.
previous_started_at: OffsetDateTime,
/// The list of tasks that were processing when this task cancelation appeared.
previous_processing_tasks: RoaringBitmap,
},
TaskDeletion(TaskId),
SnapshotCreation(Vec<TaskId>),
Dump(TaskId),
IndexOperation {
op: IndexOperation,
must_create_index: bool,
},
IndexCreation {
index_uid: String,
primary_key: Option<String>,
task: TaskId,
},
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
task: TaskId,
},
IndexDeletion {
index_uid: String,
tasks: Vec<TaskId>,
index_has_been_created: bool,
},
IndexSwap {
task: TaskId,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DocumentOperation {
Add(Uuid),
Delete(Vec<String>),
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexOperation {
DocumentOperation {
index_uid: String,
primary_key: Option<String>,
method: IndexDocumentsMethod,
documents_counts: Vec<u64>,
operations: Vec<DocumentOperation>,
tasks: Vec<TaskId>,
},
DocumentDeletion {
index_uid: String,
// The vec associated with each document deletion tasks.
documents: Vec<Vec<String>>,
tasks: Vec<TaskId>,
},
DocumentClear {
index_uid: String,
tasks: Vec<TaskId>,
},
Settings {
index_uid: String,
// The boolean indicates if it's a settings deletion or creation.
settings: Vec<(bool, Settings<Unchecked>)>,
tasks: Vec<TaskId>,
},
DocumentClearAndSetting {
index_uid: String,
cleared_tasks: Vec<TaskId>,
// The boolean indicates if it's a settings deletion or creation.
settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<TaskId>,
},
SettingsAndDocumentOperation {
index_uid: String,
primary_key: Option<String>,
method: IndexDocumentsMethod,
documents_counts: Vec<u64>,
operations: Vec<DocumentOperation>,
document_import_tasks: Vec<TaskId>,
// The boolean indicates if it's a settings deletion or creation.
settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<TaskId>,
},
}
impl Batch {
pub fn ids(&self) -> impl Iterator<Item = TaskId> {
type Ret = Box<dyn Iterator<Item = TaskId>>;
match self {
Batch::TaskCancelation { task, .. } => Box::new(std::iter::once(*task)) as Ret,
Batch::TaskDeletion(task) => Box::new(std::iter::once(*task)) as Ret,
Batch::SnapshotCreation(tasks) => Box::new(tasks.clone().into_iter()) as Ret,
Batch::Dump(task) => Box::new(std::iter::once(*task)) as Ret,
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::DocumentDeletion { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::DocumentClear { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::Settings { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::DocumentClearAndSetting {
cleared_tasks, settings_tasks, ..
} => {
Box::new(cleared_tasks.clone().into_iter().chain(settings_tasks.clone())) as Ret
}
IndexOperation::SettingsAndDocumentOperation {
document_import_tasks,
settings_tasks,
..
} => Box::new(
document_import_tasks.clone().into_iter().chain(settings_tasks.clone()),
) as Ret,
},
Batch::IndexCreation { task, .. } => Box::new(std::iter::once(*task)) as Ret,
Batch::IndexUpdate { task, .. } => Box::new(std::iter::once(*task)) as Ret,
Batch::IndexDeletion { tasks, .. } => Box::new(tasks.clone().into_iter()) as Ret,
Batch::IndexSwap { task } => Box::new(std::iter::once(*task)) as Ret,
}
}
}

276
cluster/src/leader.rs Normal file
View File

@ -0,0 +1,276 @@
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{atomic, Arc, Mutex, RwLock};
use std::time::Duration;
use bus::{Bus, BusReader};
use crossbeam::channel::{unbounded, Receiver, Sender};
use ductile::{ChannelReceiver, ChannelSender, ChannelServer};
use log::{info, warn};
use meilisearch_types::keys::Key;
use meilisearch_types::tasks::Task;
use synchronoise::SignalEvent;
use uuid::Uuid;
use crate::batch::Batch;
use crate::{ApiKeyOperation, Consistency, FollowerMsg, LeaderMsg};
#[derive(Clone)]
pub struct Leader {
task_ready_to_commit: Receiver<u32>,
broadcast_to_follower: Sender<LeaderMsg>,
needs_key_sender: Sender<Sender<Vec<Key>>>,
needs_key_receiver: Receiver<Sender<Vec<Key>>>,
pub wake_up: Arc<SignalEvent>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
batch_id: Arc<RwLock<u32>>,
}
impl Leader {
pub fn new(
listen_on: impl ToSocketAddrs + Send + 'static,
master_key: Option<String>,
) -> Leader {
let new_followers = Arc::new(AtomicUsize::new(0));
let active_followers = Arc::new(AtomicUsize::new(1));
let wake_up = Arc::new(SignalEvent::auto(true));
let (broadcast_to_follower, process_batch_receiver) = unbounded();
let (task_finished_sender, task_finished_receiver) = unbounded();
let (needs_key_sender, needs_key_receiver) = unbounded();
let nf = new_followers.clone();
let af = active_followers.clone();
let wu = wake_up.clone();
std::thread::spawn(move || {
Self::listener(
listen_on,
master_key,
nf,
af,
wu,
process_batch_receiver,
task_finished_sender,
)
});
Leader {
task_ready_to_commit: task_finished_receiver,
broadcast_to_follower,
needs_key_sender,
needs_key_receiver,
wake_up,
new_followers,
active_followers,
batch_id: Arc::default(),
}
}
pub fn has_new_followers(&self) -> bool {
self.new_followers.load(Ordering::Relaxed) != 0
}
/// Takes all the necessary channels to chat with the scheduler and give them
/// to each new followers
fn listener(
listen_on: impl ToSocketAddrs,
master_key: Option<String>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
wake_up: Arc<SignalEvent>,
broadcast_to_follower: Receiver<LeaderMsg>,
task_finished: Sender<u32>,
) {
let listener: ChannelServer<LeaderMsg, FollowerMsg> = if let Some(ref master_key) =
master_key
{
let mut enc = [0; 32];
let master_key = master_key.as_bytes();
if master_key.len() < 32 {
warn!("Master key is not secure, use a longer master key (at least 32 bytes long)");
}
enc.iter_mut().zip(master_key).for_each(|(enc, mk)| *enc = *mk);
info!("Listening with encryption enabled");
ChannelServer::bind_with_enc(listen_on, enc).unwrap()
} else {
ChannelServer::bind(listen_on).unwrap()
};
info!("Ready to the receive connections");
// We're going to broadcast all the batches to all our follower
let bus: Bus<LeaderMsg> = Bus::new(10);
let bus = Arc::new(Mutex::new(bus));
let b = bus.clone();
std::thread::spawn(move || loop {
let msg = broadcast_to_follower.recv().expect("Main thread is dead");
b.lock().unwrap().broadcast(msg);
});
for (sender, receiver, _addr) in listener {
let task_finished = task_finished.clone();
let nf = new_followers.clone();
let af = active_followers.clone();
let wu = wake_up.clone();
let process_batch = bus.lock().unwrap().add_rx();
std::thread::spawn(move || {
Self::follower(sender, receiver, nf, af, wu, process_batch, task_finished)
});
}
}
/// Allow a follower to chat with the scheduler
fn follower(
sender: ChannelSender<LeaderMsg>,
receiver: ChannelReceiver<FollowerMsg>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
wake_up: Arc<SignalEvent>,
mut broadcast_to_follower: BusReader<LeaderMsg>,
task_finished: Sender<u32>,
) {
let size = new_followers.fetch_add(1, Ordering::Relaxed) + 1;
wake_up.signal();
info!("A new follower joined the cluster. {} members.", size);
loop {
if let msg @ LeaderMsg::JoinFromDump(_) =
broadcast_to_follower.recv().expect("Main thread died")
{
// we exit the new_follower state and become an active follower even though
// the dump will takes some time to index
new_followers.fetch_sub(1, Ordering::Relaxed);
let size = active_followers.fetch_add(1, Ordering::Relaxed) + 1;
info!("A new follower became active. {} active members.", size);
sender.send(msg).unwrap();
break;
}
}
// send messages to the follower
std::thread::spawn(move || loop {
let msg = broadcast_to_follower.recv().expect("Main thread died");
match msg {
LeaderMsg::JoinFromDump(_) => (),
msg => {
if sender.send(msg).is_err() {
// the follower died, the logging and cluster size update should be done
// in the other thread
break;
}
}
}
});
// receive messages from the follower
loop {
match receiver.recv() {
Err(_) => break,
Ok(msg) => match msg {
FollowerMsg::ReadyToCommit(id) => {
task_finished.send(id).expect("Can't reach the main thread")
}
FollowerMsg::RegisterNewTask(_) => todo!(),
},
}
}
// if we exited from the previous loop it means the follower is down and should
// be removed from the cluster
let size = active_followers.fetch_sub(1, atomic::Ordering::Relaxed) - 1;
info!("A follower left the cluster. {} members.", size);
}
// ============= Everything related to the setup of the cluster
pub fn join_me(&self, dump: Vec<u8>) {
self.broadcast_to_follower
.send(LeaderMsg::JoinFromDump(dump))
.expect("Lost the link with the followers");
}
// ============= Everything related to the scheduler
pub fn starts_batch(&self, batch: Batch) {
let mut batch_id = self.batch_id.write().unwrap();
info!("Send the batch to process to the followers");
*batch_id += 1;
self.broadcast_to_follower
.send(LeaderMsg::StartBatch { id: *batch_id, batch })
.expect("Can't reach the cluster");
}
pub fn commit(&self, consistency_level: Consistency) {
info!("Wait until enough followers are ready to commit a batch");
let batch_id = self.batch_id.write().unwrap();
let mut nodes_ready_to_commit = 1;
loop {
let size = self.active_followers.load(atomic::Ordering::Relaxed);
info!("{nodes_ready_to_commit} nodes are ready to commit for a cluster size of {size}");
let all = nodes_ready_to_commit == size;
match consistency_level {
Consistency::One if nodes_ready_to_commit >= 1 || all => break,
Consistency::Two if nodes_ready_to_commit >= 2 || all => break,
Consistency::Quorum if nodes_ready_to_commit >= (size / 2) || all => break,
Consistency::All if all => break,
_ => (),
}
// we can't wait forever here because if a node dies the cluster size might get updated while we're stuck
match self.task_ready_to_commit.recv_timeout(Duration::new(1, 0)) {
Ok(id) if id == *batch_id => nodes_ready_to_commit += 1,
_ => continue,
};
}
info!("Tells all the follower to commit");
self.broadcast_to_follower.send(LeaderMsg::Commit(*batch_id)).unwrap();
}
pub fn register_new_task(&self, task: Task, update_file: Option<Vec<u8>>) {
info!("Tells all the follower to register a new task");
self.broadcast_to_follower
.send(LeaderMsg::RegisterNewTask { task, update_file })
.expect("Main thread is dead");
}
// ============= Everything related to the api-keys
pub fn insert_key(&self, key: Key) {
self.broadcast_to_follower
.send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Insert(key)))
.unwrap()
}
pub fn delete_key(&self, uuid: Uuid) {
self.broadcast_to_follower
.send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Delete(uuid)))
.unwrap()
}
pub fn needs_keys(&self) -> Sender<Vec<Key>> {
self.needs_key_receiver.recv().expect("The cluster is dead")
}
pub fn get_keys(&self) -> Vec<Key> {
let (send, rcv) = crossbeam::channel::bounded(1);
self.needs_key_sender.send(send).expect("The cluster is dead");
rcv.recv().expect("The auth controller is dead")
}
}

231
cluster/src/lib.rs Normal file
View File

@ -0,0 +1,231 @@
use std::net::ToSocketAddrs;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use batch::Batch;
use crossbeam::channel::{unbounded, Receiver, Sender};
use ductile::{connect_channel, connect_channel_with_enc, ChannelReceiver, ChannelSender};
use log::{info, warn};
use meilisearch_types::keys::Key;
use meilisearch_types::tasks::{KindWithContent, Task};
use serde::{Deserialize, Serialize};
pub mod batch;
mod leader;
pub use leader::Leader;
use uuid::Uuid;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Network issue occured")]
NetworkIssue,
#[error("Internal error:{0}")]
SerdeJson(#[from] serde_json::Error),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LeaderMsg {
/// A dump to join the cluster
JoinFromDump(Vec<u8>),
/// Starts a new batch
StartBatch { id: u32, batch: Batch },
///Tell the follower to commit the update asap
Commit(u32),
///Tell the follower to commit the update asap
RegisterNewTask { task: Task, update_file: Option<Vec<u8>> },
///Tell the follower to commit the update asap
ApiKeyOperation(ApiKeyOperation),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FollowerMsg {
// Let the leader knows you're ready to commit
ReadyToCommit(u32),
RegisterNewTask(KindWithContent),
}
#[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum Consistency {
One,
Two,
Quorum,
#[default]
All,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ApiKeyOperation {
Insert(Key),
Delete(Uuid),
}
impl std::fmt::Display for Consistency {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Consistency::One => write!(f, "one"),
Consistency::Two => write!(f, "two"),
Consistency::Quorum => write!(f, "quorum"),
Consistency::All => write!(f, "all"),
}
}
}
impl FromStr for Consistency {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"one" => Ok(Consistency::One),
"two" => Ok(Consistency::Two),
"quorum" => Ok(Consistency::Quorum),
"all" => Ok(Consistency::All),
s => Err(format!(
"Unexpected value `{s}`, expected one of `one`, `two`, `quorum`, `all`"
)),
}
}
}
#[derive(Clone)]
pub enum Cluster {
Leader(Leader),
Follower(Follower),
}
#[derive(Clone)]
pub struct Follower {
sender: ChannelSender<FollowerMsg>,
get_batch: Receiver<(u32, Batch)>,
must_commit: Receiver<u32>,
register_new_task: Receiver<(Task, Option<Vec<u8>>)>,
api_key_op: Receiver<ApiKeyOperation>,
batch_id: Arc<RwLock<u32>>,
}
impl Follower {
pub fn join(leader: impl ToSocketAddrs, master_key: Option<String>) -> (Follower, Vec<u8>) {
let (sender, receiver) = if let Some(master_key) = master_key {
let mut enc = [0; 32];
let master_key = master_key.as_bytes();
if master_key.len() < 32 {
warn!("Master key is not secure, use a longer master key (at least 32 bytes long)");
}
enc.iter_mut().zip(master_key).for_each(|(enc, mk)| *enc = *mk);
info!("Connecting with encryption enabled");
connect_channel_with_enc(leader, &enc).unwrap()
} else {
connect_channel(leader).unwrap()
};
info!("Connection to the leader established");
info!("Waiting for the leader to contact us");
let state = receiver.recv().unwrap();
let dump = match state {
LeaderMsg::JoinFromDump(dump) => dump,
msg => panic!("Received unexpected message {msg:?}"),
};
let (get_batch_sender, get_batch_receiver) = unbounded();
let (must_commit_sender, must_commit_receiver) = unbounded();
let (register_task_sender, register_task_receiver) = unbounded();
let (create_api_key_sender, create_api_key_receiver) = unbounded();
std::thread::spawn(move || {
Self::router(
receiver,
get_batch_sender,
must_commit_sender,
register_task_sender,
create_api_key_sender,
);
});
(
Follower {
sender,
get_batch: get_batch_receiver,
must_commit: must_commit_receiver,
register_new_task: register_task_receiver,
api_key_op: create_api_key_receiver,
batch_id: Arc::default(),
},
dump,
)
}
fn router(
receiver: ChannelReceiver<LeaderMsg>,
get_batch: Sender<(u32, Batch)>,
must_commit: Sender<u32>,
register_new_task: Sender<(Task, Option<Vec<u8>>)>,
api_key_op: Sender<ApiKeyOperation>,
) {
loop {
match receiver.recv().expect("Lost connection to the leader") {
LeaderMsg::JoinFromDump(_) => {
warn!("Received a join from dump msg but Im already running : ignoring the message")
}
LeaderMsg::StartBatch { id, batch } => {
info!("Starting to process a new batch");
get_batch.send((id, batch)).expect("Lost connection to the main thread")
}
LeaderMsg::Commit(id) => {
info!("Must commit");
must_commit.send(id).expect("Lost connection to the main thread")
}
LeaderMsg::RegisterNewTask { task, update_file } => {
info!("Registered a new task");
register_new_task
.send((task, update_file))
.expect("Lost connection to the main thread")
}
LeaderMsg::ApiKeyOperation(key) => {
api_key_op.send(key).expect("Lost connection to the main thread")
}
}
}
}
pub fn get_new_batch(&self) -> Batch {
info!("Get new batch called");
let (id, batch) = self.get_batch.recv().expect("Lost connection to the leader");
info!("Got a new batch");
*self.batch_id.write().unwrap() = id;
batch
}
pub fn ready_to_commit(&self) {
info!("I'm ready to commit");
let batch_id = self.batch_id.read().unwrap();
self.sender.send(FollowerMsg::ReadyToCommit(*batch_id)).unwrap();
loop {
let id = self.must_commit.recv().expect("Lost connection to the leader");
#[allow(clippy::comparison_chain)]
if id == *batch_id {
break;
} else if id > *batch_id {
panic!("We missed a batch");
}
}
info!("I got the right to commit");
}
pub fn get_new_task(&self) -> (Task, Option<Vec<u8>>) {
self.register_new_task.recv().expect("Lost connection to the leader")
}
pub fn api_key_operation(&self) -> ApiKeyOperation {
info!("Creating a new api key");
self.api_key_op.recv().expect("Lost connection to the leader")
}
}

View File

@ -13,6 +13,8 @@ license.workspace = true
[dependencies]
anyhow = "1.0.64"
bincode = "1.3.3"
cluster = { path = "../cluster" }
crossbeam = "0.8.2"
csv = "1.1.6"
derive_builder = "0.11.2"
dump = { path = "../dump" }

View File

@ -22,7 +22,8 @@ use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::BufWriter;
use dump::IndexMetadata;
use crossbeam::utils::Backoff;
use dump::{DumpWriter, IndexMetadata};
use log::{debug, error, info};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@ -41,14 +42,14 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
use crate::{Cluster, Error, IndexScheduler, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
/// [`self.ids()`](Batch::ids)), as well as additional information on how to
/// be processed.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum Batch {
TaskCancelation {
/// The task cancelation itself.
@ -85,14 +86,14 @@ pub(crate) enum Batch {
},
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum DocumentOperation {
Add(Uuid),
Delete(Vec<String>),
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum IndexOperation {
DocumentOperation {
index_uid: String,
@ -586,6 +587,12 @@ impl IndexScheduler {
_ => unreachable!(),
}
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
// We must only remove the content files if the transaction is successfully committed
// and if errors occurs when we are deleting files we must do our best to delete
// everything. We do not return the encountered errors when deleting the content
@ -629,6 +636,13 @@ impl IndexScheduler {
}
_ => unreachable!(),
}
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
wtxn.commit()?;
Ok(vec![task])
}
@ -723,96 +737,9 @@ impl IndexScheduler {
Ok(tasks)
}
Batch::Dump(mut task) => {
// TODO: It would be better to use the started_at from the task instead of generating a new one
let started_at = OffsetDateTime::now_utc();
let (keys, instance_uid) =
if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind {
(keys, instance_uid)
} else {
unreachable!();
};
let dump = dump::DumpWriter::new(*instance_uid)?;
// 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys {
dump_keys.push_key(key)?;
}
dump_keys.flush()?;
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if t.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
t.status = Status::Succeeded;
t.started_at = Some(started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) =
cursor.next_document().map_err(milli::Error::from)?
{
dump_content_file.push_document(&obkv_to_object(
&doc,
&documents_batch_index,
)?)?;
}
dump_content_file.flush()?;
}
}
}
dump_tasks.flush()?;
// 3. Dump the indexes
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(index, &rtxn)?;
index_dumper.settings(&settings)?;
Ok(())
})?;
let dump = self.create_dump(&task, &started_at)?;
let dump_uid = started_at.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
@ -840,6 +767,13 @@ impl IndexScheduler {
let mut index_wtxn = index.write_txn()?;
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
index_wtxn.commit()?;
Ok(tasks)
@ -938,6 +872,13 @@ impl IndexScheduler {
for swap in swaps {
self.apply_index_swap(&mut wtxn, task.uid, &swap.indexes.0, &swap.indexes.1)?;
}
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
wtxn.commit()?;
task.status = Status::Succeeded;
Ok(vec![task])
@ -945,6 +886,99 @@ impl IndexScheduler {
}
}
pub(crate) fn create_dump(
&self,
task: &Task,
started_at: &OffsetDateTime,
) -> Result<DumpWriter> {
let (keys, instance_uid) =
if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind {
(keys, instance_uid)
} else {
unreachable!();
};
let dump = dump::DumpWriter::new(*instance_uid)?;
// 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys {
dump_keys.push_key(key)?;
}
dump_keys.flush()?;
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if t.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
t.status = Status::Succeeded;
t.started_at = Some(*started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? {
dump_content_file
.push_document(&obkv_to_object(&doc, &documents_batch_index)?)?;
}
dump_content_file.flush()?;
}
}
}
dump_tasks.flush()?;
// 3. Dump the indexes
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(index, &rtxn)?;
index_dumper.settings(&settings)?;
Ok(())
})?;
Ok(dump)
}
/// Swap the index `lhs` with the index `rhs`.
fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> {
// 1. Verify that both lhs and rhs are existing indexes
@ -1375,4 +1409,274 @@ impl IndexScheduler {
Ok(content_files_to_delete)
}
pub(crate) fn get_batch_from_cluster_batch(
&self,
batch: cluster::batch::Batch,
) -> Result<Batch> {
use cluster::batch::Batch as CBatch;
let mut rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
for id in batch.ids() {
let backoff = Backoff::new();
let id = BEU32::new(id);
loop {
if self.all_tasks.get(&rtxn, &id)?.is_some() {
info!("Found the task_id");
break;
}
info!("The task is not present in the task queue, we wait");
// we need to drop the txn to make a write visible
drop(rtxn);
backoff.spin();
rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
}
}
Ok(match batch {
CBatch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => {
Batch::TaskCancelation {
task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(),
previous_started_at,
previous_processing_tasks,
}
}
CBatch::TaskDeletion(task) => {
Batch::TaskDeletion(self.get_existing_tasks(&rtxn, Some(task))?[0].clone())
}
CBatch::SnapshotCreation(tasks) => {
Batch::SnapshotCreation(self.get_existing_tasks(&rtxn, tasks)?)
}
CBatch::Dump(task) => {
Batch::Dump(self.get_existing_tasks(&rtxn, Some(task))?[0].clone())
}
CBatch::IndexOperation { op, must_create_index } => Batch::IndexOperation {
op: self.get_index_op_from_cluster_index_op(&rtxn, op)?,
must_create_index,
},
CBatch::IndexCreation { index_uid, primary_key, task } => Batch::IndexCreation {
index_uid,
primary_key,
task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(),
},
CBatch::IndexUpdate { index_uid, primary_key, task } => Batch::IndexUpdate {
index_uid,
primary_key,
task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(),
},
CBatch::IndexDeletion { index_uid, tasks, index_has_been_created } => {
Batch::IndexDeletion {
index_uid,
tasks: self.get_existing_tasks(&rtxn, tasks)?,
index_has_been_created,
}
}
CBatch::IndexSwap { task } => {
Batch::IndexSwap { task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone() }
}
})
}
pub(crate) fn get_index_op_from_cluster_index_op(
&self,
rtxn: &RoTxn,
op: cluster::batch::IndexOperation,
) -> Result<IndexOperation> {
use cluster::batch::IndexOperation as COp;
Ok(match op {
COp::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
tasks,
} => IndexOperation::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::DocumentDeletion { index_uid, documents, tasks } => {
IndexOperation::DocumentDeletion {
index_uid,
documents,
tasks: self.get_existing_tasks(rtxn, tasks)?,
}
}
COp::DocumentClear { index_uid, tasks } => IndexOperation::DocumentClear {
index_uid,
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::Settings { index_uid, settings, tasks } => IndexOperation::Settings {
index_uid,
settings,
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::DocumentClearAndSetting { index_uid, cleared_tasks, settings, settings_tasks } => {
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks: self.get_existing_tasks(rtxn, cleared_tasks)?,
settings,
settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?,
}
}
COp::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
document_import_tasks,
settings,
settings_tasks,
} => IndexOperation::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
document_import_tasks: self.get_existing_tasks(rtxn, document_import_tasks)?,
settings,
settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?,
},
})
}
}
impl From<Batch> for cluster::batch::Batch {
fn from(batch: Batch) -> Self {
use cluster::batch::Batch as CBatch;
match batch {
Batch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => {
CBatch::TaskCancelation {
task: task.uid,
previous_started_at,
previous_processing_tasks,
}
}
Batch::TaskDeletion(task) => CBatch::TaskDeletion(task.uid),
Batch::SnapshotCreation(task) => {
CBatch::SnapshotCreation(task.into_iter().map(|task| task.uid).collect())
}
Batch::Dump(task) => CBatch::Dump(task.uid),
Batch::IndexOperation { op, must_create_index } => {
CBatch::IndexOperation { op: op.into(), must_create_index }
}
Batch::IndexCreation { index_uid, primary_key, task } => {
CBatch::IndexCreation { index_uid, primary_key, task: task.uid }
}
Batch::IndexUpdate { index_uid, primary_key, task } => {
CBatch::IndexUpdate { index_uid, primary_key, task: task.uid }
}
Batch::IndexDeletion { index_uid, tasks, index_has_been_created } => {
CBatch::IndexDeletion {
index_uid,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
index_has_been_created,
}
}
Batch::IndexSwap { task } => CBatch::IndexSwap { task: task.uid },
}
}
}
impl From<IndexOperation> for cluster::batch::IndexOperation {
fn from(op: IndexOperation) -> Self {
use cluster::batch::IndexOperation as COp;
match op {
IndexOperation::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
tasks,
} => COp::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::DocumentDeletion { index_uid, documents, tasks } => {
COp::DocumentDeletion {
index_uid,
documents,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
}
}
IndexOperation::DocumentClear { index_uid, tasks } => COp::DocumentClear {
index_uid,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::Settings { index_uid, settings, tasks } => COp::Settings {
index_uid,
settings,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks,
settings,
settings_tasks,
} => COp::DocumentClearAndSetting {
index_uid,
cleared_tasks: cleared_tasks.into_iter().map(|task| task.uid).collect(),
settings,
settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
document_import_tasks,
settings,
settings_tasks,
} => COp::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
document_import_tasks: document_import_tasks
.into_iter()
.map(|task| task.uid)
.collect(),
settings,
settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(),
},
}
}
}
impl From<DocumentOperation> for cluster::batch::DocumentOperation {
fn from(op: DocumentOperation) -> Self {
use cluster::batch::DocumentOperation as COp;
match op {
DocumentOperation::Add(uuid) => COp::Add(uuid),
DocumentOperation::Delete(docs) => COp::Delete(docs),
}
}
}
impl From<cluster::batch::DocumentOperation> for DocumentOperation {
fn from(op: cluster::batch::DocumentOperation) -> Self {
use cluster::batch::DocumentOperation as COp;
match op {
COp::Add(uuid) => DocumentOperation::Add(uuid),
COp::Delete(docs) => DocumentOperation::Delete(docs),
}
}
}

View File

@ -33,6 +33,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
snapshots_path: _,
auth_path: _,
version_file_path: _,
cluster: _,
consistency_level: _,
test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,

View File

@ -31,6 +31,7 @@ mod uuid_codec;
pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32;
use std::io::Write;
use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
@ -38,9 +39,12 @@ use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use batch::Batch;
use cluster::{Cluster, Consistency};
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use file_store::FileStore;
use log::info;
use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env, RoTxn};
@ -50,6 +54,7 @@ use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
use serde::Deserialize;
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
@ -302,6 +307,11 @@ pub struct IndexScheduler {
/// The path to the version file of Meilisearch.
pub(crate) version_file_path: PathBuf,
/// The role in the cluster
pub(crate) cluster: Option<Cluster>,
/// The Consistency level used by the leader. Ignored if the node is not in a leader in cluster mode.
pub(crate) consistency_level: Consistency,
// ================= test
// The next entry is dedicated to the tests.
/// Provide a way to set a breakpoint in multiple part of the scheduler.
@ -321,6 +331,24 @@ pub struct IndexScheduler {
run_loop_iteration: Arc<RwLock<usize>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
pub enum ClusterMode {
Leader,
Follower,
}
impl std::str::FromStr for ClusterMode {
type Err = ();
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"leader" => Ok(ClusterMode::Leader),
"follower" => Ok(ClusterMode::Follower),
_ => Err(()),
}
}
}
impl IndexScheduler {
fn private_clone(&self) -> IndexScheduler {
IndexScheduler {
@ -343,6 +371,8 @@ impl IndexScheduler {
dumps_path: self.dumps_path.clone(),
auth_path: self.auth_path.clone(),
version_file_path: self.version_file_path.clone(),
cluster: self.cluster.clone(),
consistency_level: self.consistency_level,
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
#[cfg(test)]
@ -357,6 +387,8 @@ impl IndexScheduler {
/// Create an index scheduler and start its run loop.
pub fn new(
options: IndexSchedulerOptions,
cluster: Option<Cluster>,
consistency_level: Consistency,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> {
@ -416,6 +448,8 @@ impl IndexScheduler {
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
version_file_path: options.version_file_path,
cluster,
consistency_level,
#[cfg(test)]
test_breakpoint_sdr,
@ -508,6 +542,26 @@ impl IndexScheduler {
/// only once per index scheduler.
fn run(&self) {
let run = self.private_clone();
// if we're a follower we starts a thread to register the tasks coming from the leader
if let Some(Cluster::Follower(ref follower)) = self.cluster {
let this = self.private_clone();
let follower = follower.clone();
std::thread::spawn(move || loop {
let (task, content) = follower.get_new_task();
this.register_raw_task(task, content);
});
} else if let Some(Cluster::Leader(ref leader)) = self.cluster {
// we need a way to let the leader come out of its loop if a new follower joins the cluster
let cluster = leader.wake_up.clone();
let scheduler = self.wake_up.clone();
std::thread::spawn(move || loop {
cluster.wait();
scheduler.signal();
});
}
std::thread::Builder::new()
.name(String::from("scheduler"))
.spawn(move || {
@ -865,6 +919,16 @@ impl IndexScheduler {
return Err(e.into());
}
if let Some(Cluster::Leader(leader)) = &self.cluster {
let update_file = if let Some(uuid) = task.content_uuid() {
let path = self.file_store.get_update_path(uuid);
Some(std::fs::read(path).unwrap())
} else {
None
};
leader.register_new_task(task.clone(), update_file);
}
// If the registered task is a task cancelation
// we inform the processing tasks to stop (if necessary).
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
@ -994,6 +1058,44 @@ impl IndexScheduler {
Ok(task)
}
/// /!\ should only be used when you're a follower in cluster mode
pub fn register_raw_task(&self, task: Task, content_file: Option<Vec<u8>>) {
if let Some(content) = content_file {
let uuid = task.content_uuid().expect("bad task");
let (_, mut file) = self.file_store.new_update_with_uuid(uuid.as_u128()).unwrap();
file.write_all(&content).unwrap();
file.persist().unwrap();
}
let mut wtxn = self.env.write_txn().unwrap();
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task).unwrap();
for index in task.indexes() {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})
.unwrap();
}
self.update_status(&mut wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})
.unwrap();
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})
.unwrap();
utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)
.unwrap();
wtxn.commit().unwrap();
self.wake_up.signal();
}
/// Create a new index without any associated task.
pub fn create_raw_index(
&self,
@ -1050,14 +1152,15 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::Start);
}
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let batch =
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal),
};
info!("before getting a new batch");
let batch = match self.get_or_create_next_batch()? {
Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal),
};
info!("after getting a new batch");
let index_uid = batch.index_uid().map(ToOwned::to_owned);
drop(rtxn);
// TODO cluster: Should we send the starting date as well so everyone is in sync?
// 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids();
@ -1186,6 +1289,63 @@ impl IndexScheduler {
Ok(TickOutcome::TickAgain(processed_tasks))
}
/// If there is no cluster or if leader -> create a new batch
/// If follower -> wait till the leader gives us a batch to process
fn get_or_create_next_batch(&self) -> Result<Option<Batch>> {
info!("inside get or create next batch");
let batch = match &self.cluster {
None | Some(Cluster::Leader(_)) => {
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))?
}
Some(Cluster::Follower(follower)) => {
let batch = follower.get_new_batch();
Some(self.get_batch_from_cluster_batch(batch)?)
}
};
if let Some(Cluster::Leader(leader)) = &self.cluster {
// first, onboard the new followers
if leader.has_new_followers() {
info!("New followers are trying to join the cluster");
let started_at = OffsetDateTime::now_utc();
let dump = self
.create_dump(
&Task {
uid: TaskId::MAX,
enqueued_at: started_at,
started_at: Some(started_at),
finished_at: Some(started_at),
error: None,
canceled_by: None,
details: None,
status: Status::Enqueued,
kind: KindWithContent::DumpCreation {
keys: leader.get_keys(),
// TODO cluster: should we unify the instance_uid between every instances?
instance_uid: None,
},
},
&started_at,
)
.unwrap();
let mut buffer = Vec::new();
// TODO cluster: stop writing everything in RAM
dump.persist_to(&mut buffer).unwrap();
leader.join_me(buffer);
}
// second, starts processing the batch
if let Some(ref batch) = batch {
leader.starts_batch(batch.clone().into());
}
}
Ok(batch)
}
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
match task.content_uuid() {
Some(content_file) => self.delete_update_file(content_file),
@ -1301,7 +1461,8 @@ mod tests {
autobatching_enabled,
};
let index_scheduler = Self::new(options, sender, planned_failures).unwrap();
let index_scheduler =
Self::new(options, None, Consistency::default(), sender, planned_failures).unwrap();
// To be 100% consistent between all test we're going to start the scheduler right now
// and ensure it's in the expected starting state.

View File

@ -12,6 +12,7 @@ license.workspace = true
[dependencies]
base64 = "0.13.1"
cluster = { path = "../cluster" }
enum-iterator = "1.1.3"
hmac = "0.12.1"
maplit = "1.0.2"

View File

@ -6,6 +6,7 @@ use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use cluster::Cluster;
use error::{AuthControllerError, Result};
use maplit::hashset;
use meilisearch_types::index_uid_pattern::IndexUidPattern;
@ -21,17 +22,52 @@ use uuid::Uuid;
pub struct AuthController {
store: Arc<HeedAuthStore>,
master_key: Option<String>,
cluster: Option<Cluster>,
}
impl AuthController {
pub fn new(db_path: impl AsRef<Path>, master_key: &Option<String>) -> Result<Self> {
pub fn new(
db_path: impl AsRef<Path>,
master_key: &Option<String>,
cluster: Option<Cluster>,
) -> Result<Self> {
let store = HeedAuthStore::new(db_path)?;
if store.is_empty()? {
generate_default_keys(&store)?;
}
Ok(Self { store: Arc::new(store), master_key: master_key.clone() })
let this = Self {
store: Arc::new(store),
master_key: master_key.clone(),
cluster: cluster.clone(),
};
if let Some(Cluster::Follower(follower)) = cluster {
let this = this.clone();
std::thread::spawn(move || loop {
match follower.api_key_operation() {
cluster::ApiKeyOperation::Insert(key) => {
this.store.put_api_key(key).expect("Inconsistency with the leader");
}
cluster::ApiKeyOperation::Delete(uuid) => {
this.store.delete_api_key(uuid).expect("Inconsistency with the leader");
}
}
});
} else if let Some(Cluster::Leader(leader)) = cluster {
let this = this.clone();
std::thread::spawn(move || loop {
let channel = leader.needs_keys();
let keys = this.list_keys().expect("auth controller is dead");
channel.send(keys).expect("Cluster is dead");
});
}
Ok(this)
}
/// Return the size of the `AuthController` database in bytes.
@ -42,7 +78,13 @@ impl AuthController {
pub fn create_key(&self, create_key: CreateApiKey) -> Result<Key> {
match self.store.get_api_key(create_key.uid)? {
Some(_) => Err(AuthControllerError::ApiKeyAlreadyExists(create_key.uid.to_string())),
None => self.store.put_api_key(create_key.to_key()),
None => {
let key = self.store.put_api_key(create_key.to_key())?;
if let Some(Cluster::Leader(ref leader)) = self.cluster {
leader.insert_key(key.clone());
}
Ok(key)
}
}
}
@ -57,7 +99,12 @@ impl AuthController {
name => key.name = name.set(),
};
key.updated_at = OffsetDateTime::now_utc();
self.store.put_api_key(key)
let key = self.store.put_api_key(key)?;
if let Some(Cluster::Leader(ref leader)) = self.cluster {
leader.insert_key(key.clone());
}
Ok(key)
}
pub fn get_key(&self, uid: Uuid) -> Result<Key> {
@ -100,6 +147,9 @@ impl AuthController {
pub fn delete_key(&self, uid: Uuid) -> Result<()> {
if self.store.delete_api_key(uid)? {
if let Some(Cluster::Leader(ref leader)) = self.cluster {
leader.delete_key(uid);
}
Ok(())
} else {
Err(AuthControllerError::ApiKeyNotFound(uid.to_string()))

View File

@ -24,6 +24,7 @@ bstr = "1.0.1"
byte-unit = { version = "4.0.14", default-features = false, features = ["std", "serde"] }
bytes = "1.2.1"
clap = { version = "4.0.9", features = ["derive", "env"] }
cluster = { path = "../cluster" }
crossbeam-channel = "0.5.6"
deserr = "0.5.0"
dump = { path = "../dump" }

View File

@ -282,6 +282,7 @@ impl From<Opt> for Infos {
dump_dir,
log_level,
indexer_options,
cluster_configuration: _,
config_file_path,
#[cfg(all(not(debug_assertions), feature = "analytics"))]
no_analytics: _,

View File

@ -11,7 +11,8 @@ pub mod routes;
pub mod search;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::io::{BufReader, BufWriter, Write};
use std::net::ToSocketAddrs;
use std::path::Path;
use std::sync::Arc;
use std::thread;
@ -25,11 +26,12 @@ use actix_web::web::Data;
use actix_web::{web, HttpRequest};
use analytics::Analytics;
use anyhow::bail;
use cluster::{Cluster, Follower, Leader};
use error::PayloadError;
use extractors::payload::PayloadConfig;
use http::header::CONTENT_TYPE;
use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use log::error;
use log::{error, info};
use meilisearch_auth::AuthController;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
@ -143,7 +145,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
// the db is empty and the snapshot exists, import it
if empty_db && snapshot_path_exists {
match compression::from_tar_gz(snapshot_path, &opt.db_path) {
Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?,
Ok(()) => open_or_create_database_unchecked(opt, None, OnFailure::RemoveDb)?,
Err(e) => {
std::fs::remove_dir_all(&opt.db_path)?;
return Err(e);
@ -160,14 +162,14 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
bail!("snapshot doesn't exist at {}", snapshot_path.display())
// the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag
} else {
open_or_create_database(opt, empty_db)?
open_or_create_database(opt, empty_db, None)?
}
} else if let Some(ref path) = opt.import_dump {
let src_path_exists = path.exists();
// the db is empty and the dump exists, import it
if empty_db && src_path_exists {
let (mut index_scheduler, mut auth_controller) =
open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?;
open_or_create_database_unchecked(opt, None, OnFailure::RemoveDb)?;
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
Ok(()) => (index_scheduler, auth_controller),
Err(e) => {
@ -187,10 +189,62 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
// the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag
// or, the dump is missing but we can ignore that because of the ignore_missing_dump flag
} else {
open_or_create_database(opt, empty_db)?
open_or_create_database(opt, empty_db, None)?
}
} else if let Some(ref cluster) = opt.cluster_configuration.experimental_enable_ha {
match cluster.as_str() {
"leader" => {
info!("Starting as a leader");
let mut addr = opt.http_addr.to_socket_addrs().unwrap().next().unwrap();
addr.set_port(6666);
open_or_create_database(
opt,
empty_db,
Some(Cluster::Leader(Leader::new(addr, opt.master_key.clone()))),
)?
}
"follower" => {
info!("Starting as a follower");
if !empty_db {
panic!("Can't start as a follower with an already existing data.ms");
}
let mut addr = opt
.cluster_configuration
.leader
.as_ref()
.expect("Can't be a follower without a leader")
.to_socket_addrs()
.unwrap()
.next()
.unwrap();
addr.set_port(6666);
let (follower, dump) = Follower::join(addr, opt.master_key.clone());
let mut dump_file = tempfile::NamedTempFile::new().unwrap();
dump_file.write_all(&dump).unwrap();
let (mut index_scheduler, mut auth_controller) = open_or_create_database_unchecked(
opt,
Some(Cluster::Follower(follower)),
OnFailure::RemoveDb,
)?;
match import_dump(
&opt.db_path,
dump_file.path(),
&mut index_scheduler,
&mut auth_controller,
) {
Ok(()) => (index_scheduler, auth_controller),
Err(e) => {
std::fs::remove_dir_all(&opt.db_path)?;
return Err(e);
}
}
}
_ => panic!("Available values for the cluster mode are leader and follower"),
}
} else {
open_or_create_database(opt, empty_db)?
open_or_create_database(opt, empty_db, None)?
};
// We create a loop in a thread that registers snapshotCreation tasks
@ -215,27 +269,34 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
/// Try to start the IndexScheduler and AuthController without checking the VERSION file or anything.
fn open_or_create_database_unchecked(
opt: &Opt,
cluster: Option<Cluster>,
on_failure: OnFailure,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
// we don't want to create anything in the data.ms yet, thus we
// wrap our two builders in a closure that'll be executed later.
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key);
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, cluster.clone());
let index_scheduler_builder = || -> anyhow::Result<_> {
Ok(IndexScheduler::new(IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
auth_path: opt.db_path.join("auth"),
tasks_path: opt.db_path.join("tasks"),
update_file_path: opt.db_path.join("update_files"),
indexes_path: opt.db_path.join("indexes"),
snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,
})?)
Ok(IndexScheduler::new(
IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
auth_path: opt.db_path.join("auth"),
tasks_path: opt.db_path.join("tasks"),
update_file_path: opt.db_path.join("update_files"),
indexes_path: opt.db_path.join("indexes"),
snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes()
as usize,
index_count: DEFAULT_INDEX_COUNT,
},
cluster,
opt.cluster_configuration.consistency,
)?)
};
match (
@ -257,12 +318,13 @@ fn open_or_create_database_unchecked(
fn open_or_create_database(
opt: &Opt,
empty_db: bool,
cluster: Option<Cluster>,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
if !empty_db {
check_version_file(&opt.db_path)?;
}
open_or_create_database_unchecked(opt, OnFailure::KeepDb)
open_or_create_database_unchecked(opt, cluster, OnFailure::KeepDb)
}
fn import_dump(

View File

@ -12,6 +12,7 @@ use std::{env, fmt, fs};
use byte_unit::{Byte, ByteError};
use clap::Parser;
use cluster::Consistency;
use meilisearch_types::milli::update::IndexerConfig;
use rustls::server::{
AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, ServerSessionMemoryCache,
@ -297,6 +298,10 @@ pub struct Opt {
#[clap(flatten)]
pub indexer_options: IndexerOpts,
#[serde(flatten)]
#[clap(flatten)]
pub cluster_configuration: ClusterOpts,
/// Set the path to a configuration file that should be used to setup the engine.
/// Format must be TOML.
#[clap(long)]
@ -385,6 +390,7 @@ impl Opt {
#[cfg(all(not(debug_assertions), feature = "analytics"))]
no_analytics,
experimental_enable_metrics: enable_metrics_route,
cluster_configuration: _,
} = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -518,6 +524,21 @@ impl IndexerOpts {
}
}
#[derive(Debug, Default, Clone, Parser, Deserialize)]
pub struct ClusterOpts {
#[clap(long)]
#[serde(default)]
pub experimental_enable_ha: Option<String>,
#[clap(long)]
#[serde(default)]
pub leader: Option<String>,
#[clap(long, default_value_t)]
#[serde(default)]
pub consistency: Consistency,
}
impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error;

View File

@ -60,7 +60,7 @@ async fn create_api_key_bad_uid() {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###"
{
"message": "Invalid value at `.uid`: invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-zA-Z], found `o` at 2",
"message": "Invalid value at `.uid`: invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-fA-F-], found `o` at 2",
"code": "invalid_api_key_uid",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_api_key_uid"