Initial implementation

This commit is contained in:
Mubelotix
2025-07-30 12:01:40 +02:00
parent 5567653c96
commit cc37eb870f
9 changed files with 409 additions and 73 deletions

View File

@ -182,6 +182,7 @@ impl FeatureData {
..persisted_features
}));
// Once this is stabilized, network should be stored along with webhooks in index-scheduler's persisted database
let network_db = runtime_features_db.remap_data_type::<SerdeJson<Network>>();
let network: Network = network_db.get(wtxn, db_keys::NETWORK)?.unwrap_or_default();

View File

@ -26,11 +26,11 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
version,
queue,
scheduler,
persisted,
index_mapper,
features: _,
webhook_url: _,
webhook_authorization_header: _,
cached_webhooks: _,
test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,
@ -62,6 +62,10 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
}
snap.push_str("\n----------------------------------------------------------------------\n");
snap.push_str("### Persisted:\n");
snap.push_str(&snapshot_persisted_db(&rtxn, persisted));
snap.push_str("----------------------------------------------------------------------\n");
snap.push_str("### All Tasks:\n");
snap.push_str(&snapshot_all_tasks(&rtxn, queue.tasks.all_tasks));
snap.push_str("----------------------------------------------------------------------\n");
@ -200,6 +204,16 @@ pub fn snapshot_date_db(rtxn: &RoTxn, db: Database<BEI128, CboRoaringBitmapCodec
snap
}
pub fn snapshot_persisted_db(rtxn: &RoTxn, db: &Database<Str, Str>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
let (key, value) = next.unwrap();
snap.push_str(&format!("{key}: {value}\n"));
}
snap
}
pub fn snapshot_task(task: &Task) -> String {
let mut snap = String::new();
let Task {
@ -311,6 +325,7 @@ pub fn snapshot_status(
}
snap
}
pub fn snapshot_kind(rtxn: &RoTxn, db: Database<SerdeBincode<Kind>, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
@ -331,6 +346,7 @@ pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringBitmapCodec>)
}
snap
}
pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database<BEU32, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();

View File

@ -65,6 +65,7 @@ use meilisearch_types::milli::vector::{
use meilisearch_types::milli::{self, Index};
use meilisearch_types::task_view::TaskView;
use meilisearch_types::tasks::{KindWithContent, Task};
use meilisearch_types::webhooks::{Webhook, Webhooks};
use milli::vector::db::IndexEmbeddingConfig;
use processing::ProcessingTasks;
pub use queue::Query;
@ -80,7 +81,15 @@ use crate::utils::clamp_to_page_size;
pub(crate) type BEI128 = I128<BE>;
const TASK_SCHEDULER_SIZE_THRESHOLD_PERCENT_INT: u64 = 40;
const CHAT_SETTINGS_DB_NAME: &str = "chat-settings";
mod db_name {
pub const CHAT_SETTINGS: &str = "chat-settings";
pub const PERSISTED: &str = "persisted";
}
mod db_keys {
pub const WEBHOOKS: &str = "webhooks";
}
#[derive(Debug)]
pub struct IndexSchedulerOptions {
@ -171,10 +180,11 @@ pub struct IndexScheduler {
/// Whether we should use the old document indexer or the new one.
pub(crate) experimental_no_edition_2024_for_dumps: bool,
/// The webhook url we should send tasks to after processing every batches.
pub(crate) webhook_url: Option<String>,
/// The Authorization header to send to the webhook URL.
pub(crate) webhook_authorization_header: Option<String>,
/// A database to store single-keyed data that is persisted across restarts.
persisted: Database<Str, Str>,
/// Webhook
cached_webhooks: Arc<RwLock<Webhooks>>,
/// A map to retrieve the runtime representation of an embedder depending on its configuration.
///
@ -214,8 +224,8 @@ impl IndexScheduler {
index_mapper: self.index_mapper.clone(),
cleanup_enabled: self.cleanup_enabled,
experimental_no_edition_2024_for_dumps: self.experimental_no_edition_2024_for_dumps,
webhook_url: self.webhook_url.clone(),
webhook_authorization_header: self.webhook_authorization_header.clone(),
persisted: self.persisted,
cached_webhooks: self.cached_webhooks.clone(),
embedders: self.embedders.clone(),
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
@ -284,10 +294,16 @@ impl IndexScheduler {
let version = versioning::Versioning::new(&env, from_db_version)?;
let mut wtxn = env.write_txn()?;
let features = features::FeatureData::new(&env, &mut wtxn, options.instance_features)?;
let queue = Queue::new(&env, &mut wtxn, &options)?;
let index_mapper = IndexMapper::new(&env, &mut wtxn, &options, budget)?;
let chat_settings = env.create_database(&mut wtxn, Some(CHAT_SETTINGS_DB_NAME))?;
let chat_settings = env.create_database(&mut wtxn, Some(db_name::CHAT_SETTINGS))?;
let persisted = env.create_database(&mut wtxn, Some(db_name::PERSISTED))?;
let webhooks_db = persisted.remap_data_type::<SerdeJson<Webhooks>>();
let webhooks = webhooks_db.get(&wtxn, db_keys::WEBHOOKS)?.unwrap_or_default();
wtxn.commit()?;
// allow unreachable_code to get rids of the warning in the case of a test build.
@ -303,8 +319,9 @@ impl IndexScheduler {
experimental_no_edition_2024_for_dumps: options
.indexer_config
.experimental_no_edition_2024_for_dumps,
webhook_url: options.webhook_url,
webhook_authorization_header: options.webhook_authorization_header,
persisted,
cached_webhooks: Arc::new(RwLock::new(webhooks)),
embedders: Default::default(),
#[cfg(test)]
@ -754,80 +771,103 @@ impl IndexScheduler {
/// Once the tasks changes have been committed we must send all the tasks that were updated to our webhook if there is one.
fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> {
if let Some(ref url) = self.webhook_url {
struct TaskReader<'a, 'b> {
rtxn: &'a RoTxn<'a>,
index_scheduler: &'a IndexScheduler,
tasks: &'b mut roaring::bitmap::Iter<'b>,
buffer: Vec<u8>,
written: usize,
}
let webhooks = self.cached_webhooks.read().unwrap_or_else(|poisoned| poisoned.into_inner());
if webhooks.webhooks.is_empty() {
return Ok(());
}
let webhooks = Webhooks::clone(&*webhooks);
impl Read for TaskReader<'_, '_> {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.buffer.is_empty() {
match self.tasks.next() {
None => return Ok(0),
Some(task_id) => {
let task = self
.index_scheduler
.queue
.tasks
.get_task(self.rtxn, task_id)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
Error::CorruptedTaskQueue,
)
})?;
struct TaskReader<'a, 'b> {
rtxn: &'a RoTxn<'a>,
index_scheduler: &'a IndexScheduler,
tasks: &'b mut roaring::bitmap::Iter<'b>,
buffer: Vec<u8>,
written: usize,
}
serde_json::to_writer(
&mut self.buffer,
&TaskView::from_task(&task),
)?;
self.buffer.push(b'\n');
}
impl Read for TaskReader<'_, '_> {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.buffer.is_empty() {
match self.tasks.next() {
None => return Ok(0),
Some(task_id) => {
let task = self
.index_scheduler
.queue
.tasks
.get_task(self.rtxn, task_id)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, Error::CorruptedTaskQueue)
})?;
serde_json::to_writer(&mut self.buffer, &TaskView::from_task(&task))?;
self.buffer.push(b'\n');
}
}
}
let mut to_write = &self.buffer[self.written..];
let wrote = io::copy(&mut to_write, &mut buf)?;
self.written += wrote as usize;
let mut to_write = &self.buffer[self.written..];
let wrote = io::copy(&mut to_write, &mut buf)?;
self.written += wrote as usize;
// we wrote everything and must refresh our buffer on the next call
if self.written == self.buffer.len() {
self.written = 0;
self.buffer.clear();
}
// we wrote everything and must refresh our buffer on the next call
if self.written == self.buffer.len() {
self.written = 0;
self.buffer.clear();
}
Ok(wrote as usize)
Ok(wrote as usize)
}
}
let rtxn = self.env.read_txn()?;
let task_reader = TaskReader {
rtxn: &rtxn,
index_scheduler: self,
tasks: &mut updated.into_iter(),
buffer: Vec::with_capacity(800), // on average a task is around ~600 bytes
written: 0,
};
enum EitherRead<T: Read> {
Other(T),
Data(Vec<u8>),
}
impl<T: Read> Read for &mut EitherRead<T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
EitherRead::Other(reader) => reader.read(buf),
EitherRead::Data(data) => data.as_slice().read(buf),
}
}
}
let rtxn = self.env.read_txn()?;
let mut reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let task_reader = TaskReader {
rtxn: &rtxn,
index_scheduler: self,
tasks: &mut updated.into_iter(),
buffer: Vec::with_capacity(50), // on average a task is around ~100 bytes
written: 0,
};
// When there is more than one webhook, cache the data in memory
let mut reader = match webhooks.webhooks.len() {
1 => EitherRead::Other(reader),
_ => {
let mut data = Vec::new();
reader.read_to_end(&mut data)?;
EitherRead::Data(data)
}
};
// let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let request = ureq::post(url)
for (name, Webhook { url, headers }) in webhooks.webhooks.iter() {
let mut request = ureq::post(url)
.timeout(Duration::from_secs(30))
.set("Content-Encoding", "gzip")
.set("Content-Type", "application/x-ndjson");
let request = match &self.webhook_authorization_header {
Some(header) => request.set("Authorization", header),
None => request,
};
for (header_name, header_value) in headers.iter() {
request = request.set(header_name, header_value);
}
if let Err(e) = request.send(reader) {
tracing::error!("While sending data to the webhook: {e}");
if let Err(e) = request.send(&mut reader) {
tracing::error!("While sending data to the webhook {name}: {e}");
}
}
@ -862,6 +902,20 @@ impl IndexScheduler {
self.features.network()
}
pub fn put_webhooks(&self, webhooks: Webhooks) -> Result<()> {
let mut wtxn = self.env.write_txn()?;
let webhooks_db = self.persisted.remap_data_type::<SerdeJson<Webhooks>>();
webhooks_db.put(&mut wtxn, db_keys::WEBHOOKS, &webhooks)?;
wtxn.commit()?;
*self.cached_webhooks.write().unwrap() = webhooks;
Ok(())
}
pub fn webhooks(&self) -> Webhooks {
let webhooks = self.cached_webhooks.read().unwrap_or_else(|poisoned| poisoned.into_inner());
Webhooks::clone(&*webhooks)
}
pub fn embedders(
&self,
index_uid: String,