Change binary option and add env var support

This commit is contained in:
ManyTheFish
2022-07-28 11:55:12 +02:00
parent e3426d5b7a
commit 58d2aad309
4 changed files with 11 additions and 61 deletions

View File

@@ -29,9 +29,9 @@ pub static AUTOBATCHING_ENABLED: AtomicBool = AtomicBool::new(false);
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<MeiliSearch> { pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<MeiliSearch> {
let mut meilisearch = MeiliSearch::builder(); let mut meilisearch = MeiliSearch::builder();
// enable autobatching? // disable autobatching?
AUTOBATCHING_ENABLED.store( let _ = AUTOBATCHING_ENABLED.store(
opt.scheduler_options.enable_auto_batching, opt.scheduler_options.disable_auto_batching,
std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed,
); );

View File

@@ -41,27 +41,10 @@ pub struct IndexerOpts {
#[derive(Debug, Clone, Parser, Default, Serialize)] #[derive(Debug, Clone, Parser, Default, Serialize)]
pub struct SchedulerConfig { pub struct SchedulerConfig {
/// enable the autobatching experimental feature /// The engine will disable task auto-batching,
#[clap(long, hide = true)] /// and will sequencialy compute each task one by one.
pub enable_auto_batching: bool, #[clap(long, env = "DISABLE_AUTO_BATCHING")]
pub disable_auto_batching: bool,
// The maximum number of updates of the same type that can be batched together.
// If unspecified, this is unlimited. A value of 0 is interpreted as 1.
#[clap(long, requires = "enable-auto-batching", hide = true)]
pub max_batch_size: Option<usize>,
// The maximum number of documents in a document batch. Since batches must contain at least one
// update for the scheduler to make progress, the number of documents in a batch will be at
// least the number of documents of its first update.
#[clap(long, requires = "enable-auto-batching", hide = true)]
pub max_documents_per_batch: Option<usize>,
/// Debounce duration in seconds
///
/// When a new task is enqueued, the scheduler waits for `debounce_duration_sec` seconds for new updates before
/// starting to process a batch of updates.
#[clap(long, requires = "enable-auto-batching", hide = true)]
pub debounce_duration_sec: Option<u64>,
} }
impl TryFrom<&IndexerOpts> for IndexerConfig { impl TryFrom<&IndexerOpts> for IndexerConfig {

View File

@@ -3,7 +3,6 @@ use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque};
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::slice; use std::slice;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use atomic_refcell::AtomicRefCell; use atomic_refcell::AtomicRefCell;
use milli::update::IndexDocumentsMethod; use milli::update::IndexDocumentsMethod;
@@ -248,17 +247,10 @@ impl Scheduler {
pub fn new( pub fn new(
store: TaskStore, store: TaskStore,
performers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>>, performers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>>,
mut config: SchedulerConfig, config: SchedulerConfig,
) -> Result<Arc<RwLock<Self>>> { ) -> Result<Arc<RwLock<Self>>> {
let (notifier, rcv) = watch::channel(()); let (notifier, rcv) = watch::channel(());
let debounce_time = config.debounce_duration_sec;
// Disable autobatching
if !config.enable_auto_batching {
config.max_batch_size = Some(1);
}
let this = Self { let this = Self {
snapshots: VecDeque::new(), snapshots: VecDeque::new(),
tasks: TaskQueue::default(), tasks: TaskQueue::default(),
@@ -275,12 +267,7 @@ impl Scheduler {
let this = Arc::new(RwLock::new(this)); let this = Arc::new(RwLock::new(this));
let update_loop = UpdateLoop::new( let update_loop = UpdateLoop::new(this.clone(), performers, rcv);
this.clone(),
performers,
debounce_time.filter(|&v| v > 0).map(Duration::from_secs),
rcv,
);
tokio::task::spawn_local(update_loop.run()); tokio::task::spawn_local(update_loop.run());
@@ -497,27 +484,17 @@ fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing {
match list.peek() { match list.peek() {
Some(pending) if pending.kind == kind => { Some(pending) if pending.kind == kind => {
// We always need to process at least one task for the scheduler to make progress. // We always need to process at least one task for the scheduler to make progress.
if task_list.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) if config.disable_auto_batching && task_list.len() > 0 {
{
break; break;
} }
let pending = list.pop().unwrap(); let pending = list.pop().unwrap();
task_list.push(pending.id); task_list.push(pending.id);
// We add the number of documents to the count if we are scheduling document additions and // We add the number of documents to the count if we are scheduling document additions.
// stop adding if we already have enough.
//
// We check that bound only after adding the current task to the batch, so that a batch contains at least one task.
match pending.kind { match pending.kind {
TaskType::DocumentUpdate { number } TaskType::DocumentUpdate { number }
| TaskType::DocumentAddition { number } => { | TaskType::DocumentAddition { number } => {
doc_count += number; doc_count += number;
if doc_count
>= config.max_documents_per_batch.unwrap_or(usize::MAX)
{
break;
}
} }
_ => (), _ => (),
} }

View File

@@ -1,9 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::sync::{watch, RwLock}; use tokio::sync::{watch, RwLock};
use tokio::time::interval_at;
use super::batch::Batch; use super::batch::Batch;
use super::error::Result; use super::error::Result;
@@ -17,20 +15,17 @@ pub struct UpdateLoop {
performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>, performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>,
notifier: Option<watch::Receiver<()>>, notifier: Option<watch::Receiver<()>>,
debounce_duration: Option<Duration>,
} }
impl UpdateLoop { impl UpdateLoop {
pub fn new( pub fn new(
scheduler: Arc<RwLock<Scheduler>>, scheduler: Arc<RwLock<Scheduler>>,
performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>, performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>,
debuf_duration: Option<Duration>,
notifier: watch::Receiver<()>, notifier: watch::Receiver<()>,
) -> Self { ) -> Self {
Self { Self {
scheduler, scheduler,
performers, performers,
debounce_duration: debuf_duration,
notifier: Some(notifier), notifier: Some(notifier),
} }
} }
@@ -43,11 +38,6 @@ impl UpdateLoop {
break; break;
} }
if let Some(t) = self.debounce_duration {
let mut interval = interval_at(tokio::time::Instant::now() + t, t);
interval.tick().await;
};
if let Err(e) = self.process_next_batch().await { if let Err(e) = self.process_next_batch().await {
log::error!("an error occurred while processing an update batch: {}", e); log::error!("an error occurred while processing an update batch: {}", e);
} }