make the consistency configurable

This commit is contained in:
Tamo 2023-03-21 18:25:53 +01:00
parent 8ebc2b19ea
commit 3df58831c6
6 changed files with 38 additions and 10 deletions

View File

@ -1,4 +1,5 @@
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::str::FromStr;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use batch::Batch; use batch::Batch;
@ -40,14 +41,30 @@ pub enum FollowerMsg {
RegisterNewTask(KindWithContent), RegisterNewTask(KindWithContent),
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum Consistency { pub enum Consistency {
#[default]
One, One,
Two, Two,
Quorum, Quorum,
All, All,
} }
impl std::fmt::Display for Consistency {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = serde_json::to_string(self).unwrap();
write!(f, "{s}")
}
}
impl FromStr for Consistency {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s)
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Follower { pub struct Follower {
sender: ChannelSender<FollowerMsg>, sender: ChannelSender<FollowerMsg>,

View File

@ -22,7 +22,6 @@ use std::ffi::OsStr;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::BufWriter; use std::io::BufWriter;
use cluster::Consistency;
use crossbeam::utils::Backoff; use crossbeam::utils::Backoff;
use dump::{DumpWriter, IndexMetadata}; use dump::{DumpWriter, IndexMetadata};
use log::{debug, error, info}; use log::{debug, error, info};
@ -589,7 +588,7 @@ impl IndexScheduler {
} }
match &self.cluster { match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(), Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (), None => (),
} }
@ -639,7 +638,7 @@ impl IndexScheduler {
} }
match &self.cluster { match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(), Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (), None => (),
} }
@ -770,7 +769,7 @@ impl IndexScheduler {
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
match &self.cluster { match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(), Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (), None => (),
} }
@ -875,7 +874,7 @@ impl IndexScheduler {
} }
match &self.cluster { match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(Consistency::All), Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(), Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (), None => (),
} }

View File

@ -34,6 +34,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
auth_path: _, auth_path: _,
version_file_path: _, version_file_path: _,
cluster: _, cluster: _,
consistency_level: _,
test_breakpoint_sdr: _, test_breakpoint_sdr: _,
planned_failures: _, planned_failures: _,
run_loop_iteration: _, run_loop_iteration: _,

View File

@ -40,7 +40,7 @@ use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
use batch::Batch; use batch::Batch;
use cluster::{Follower, Leader}; use cluster::{Consistency, Follower, Leader};
use dump::{KindDump, TaskDump, UpdateFile}; use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error; pub use error::Error;
use file_store::FileStore; use file_store::FileStore;
@ -309,6 +309,8 @@ pub struct IndexScheduler {
/// The role in the cluster /// The role in the cluster
pub(crate) cluster: Option<Cluster>, pub(crate) cluster: Option<Cluster>,
/// The Consistency level used by the leader. Ignored if the node is not in a leader in cluster mode.
pub(crate) consistency_level: Consistency,
// ================= test // ================= test
// The next entry is dedicated to the tests. // The next entry is dedicated to the tests.
@ -376,6 +378,7 @@ impl IndexScheduler {
auth_path: self.auth_path.clone(), auth_path: self.auth_path.clone(),
version_file_path: self.version_file_path.clone(), version_file_path: self.version_file_path.clone(),
cluster: self.cluster.clone(), cluster: self.cluster.clone(),
consistency_level: self.consistency_level,
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
#[cfg(test)] #[cfg(test)]
@ -391,6 +394,7 @@ impl IndexScheduler {
pub fn new( pub fn new(
options: IndexSchedulerOptions, options: IndexSchedulerOptions,
cluster: Option<Cluster>, cluster: Option<Cluster>,
consistency_level: Consistency,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> { ) -> Result<Self> {
@ -451,6 +455,7 @@ impl IndexScheduler {
auth_path: options.auth_path, auth_path: options.auth_path,
version_file_path: options.version_file_path, version_file_path: options.version_file_path,
cluster, cluster,
consistency_level,
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr, test_breakpoint_sdr,
@ -1461,7 +1466,8 @@ mod tests {
autobatching_enabled, autobatching_enabled,
}; };
let index_scheduler = Self::new(options, None, sender, planned_failures).unwrap(); let index_scheduler =
Self::new(options, None, Consistency::default(), sender, planned_failures).unwrap();
// To be 100% consistent between all test we're going to start the scheduler right now // To be 100% consistent between all test we're going to start the scheduler right now
// and ensure it's in the expected starting state. // and ensure it's in the expected starting state.

View File

@ -291,6 +291,7 @@ fn open_or_create_database_unchecked(
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
}, },
cluster, cluster,
opt.cluster_configuration.consistency,
)?) )?)
}; };

View File

@ -12,7 +12,7 @@ use std::{env, fmt, fs};
use byte_unit::{Byte, ByteError}; use byte_unit::{Byte, ByteError};
use clap::Parser; use clap::Parser;
use index_scheduler::ClusterMode; use cluster::Consistency;
use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::update::IndexerConfig;
use rustls::server::{ use rustls::server::{
AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, ServerSessionMemoryCache, AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, ServerSessionMemoryCache,
@ -390,7 +390,7 @@ impl Opt {
#[cfg(all(not(debug_assertions), feature = "analytics"))] #[cfg(all(not(debug_assertions), feature = "analytics"))]
no_analytics, no_analytics,
experimental_enable_metrics: enable_metrics_route, experimental_enable_metrics: enable_metrics_route,
cluster_configuration, cluster_configuration: _,
} = self; } = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -533,6 +533,10 @@ pub struct ClusterOpts {
#[clap(long)] #[clap(long)]
#[serde(default)] #[serde(default)]
pub leader: Option<String>, pub leader: Option<String>,
#[clap(long, default_value_t)]
#[serde(default)]
pub consistency: Consistency,
} }
impl TryFrom<&IndexerOpts> for IndexerConfig { impl TryFrom<&IndexerOpts> for IndexerConfig {