mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-18 04:11:07 +00:00
Compare commits
53 Commits
disable-ar
...
prototype-
Author | SHA1 | Date | |
---|---|---|---|
6325cda74f | |||
c71ba72f73 | |||
ecd36b15f0 | |||
8a2e8a887f | |||
309c33a418 | |||
9b01506cee | |||
f37fdceb15 | |||
f544cfa444 | |||
c158d03337 | |||
b7109c0fd2 | |||
a53a0fdb77 | |||
719fdd701b | |||
01c13c98ac | |||
5b89276fcc | |||
41697c4d65 | |||
7d85753573 | |||
76657af1f9 | |||
966cbdab69 | |||
0c68b9ed4c | |||
d7233ecdb8 | |||
95a011af13 | |||
e257710961 | |||
9dd4423054 | |||
8c3ad57ef9 | |||
2d1434da81 | |||
c488a4a351 | |||
0c7d7c68bc | |||
854745c670 | |||
777eebb759 | |||
61ccfaf9bc | |||
f0c4d36ff7 | |||
8c20d6e2fe | |||
8e437ed76c | |||
1191ec5939 | |||
0d20d08daf | |||
b66bf049b5 | |||
b2f36b9b97 | |||
b311089435 | |||
3d46e84d97 | |||
ad7f8edff8 | |||
5ce01bcb53 | |||
d5523cc6ac | |||
fe7a312ec6 | |||
57dc4b148c | |||
a325ddfe6a | |||
0cd81573b4 | |||
b0ff595f60 | |||
3eb6f4b56f | |||
49f976c8d8 | |||
84d56f3320 | |||
97e3dfd99d | |||
dc38da95c4 | |||
2ce8b42757 |
485
Cargo.lock
generated
485
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
21
Cargo.toml
21
Cargo.toml
@ -36,24 +36,3 @@ opt-level = 3
|
||||
opt-level = 3
|
||||
[profile.dev.package.roaring]
|
||||
opt-level = 3
|
||||
|
||||
[profile.dev.package.lindera-ipadic-builder]
|
||||
opt-level = 3
|
||||
[profile.dev.package.encoding]
|
||||
opt-level = 3
|
||||
[profile.dev.package.yada]
|
||||
opt-level = 3
|
||||
|
||||
[profile.release.package.lindera-ipadic-builder]
|
||||
opt-level = 3
|
||||
[profile.release.package.encoding]
|
||||
opt-level = 3
|
||||
[profile.release.package.yada]
|
||||
opt-level = 3
|
||||
|
||||
[profile.bench.package.lindera-ipadic-builder]
|
||||
opt-level = 3
|
||||
[profile.bench.package.encoding]
|
||||
opt-level = 3
|
||||
[profile.bench.package.yada]
|
||||
opt-level = 3
|
||||
|
59
docker-compose.yml
Normal file
59
docker-compose.yml
Normal file
@ -0,0 +1,59 @@
|
||||
version: "3.9"
|
||||
services:
|
||||
zk1:
|
||||
container_name: zk1
|
||||
hostname: zk1
|
||||
image: bitnami/zookeeper:3.7.1
|
||||
ports:
|
||||
- 21811:2181
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
- ZOO_SERVER_ID=1
|
||||
- ZOO_SERVERS=0.0.0.0:2888:3888,zk2:2888:3888,zk3:2888:3888
|
||||
zk2:
|
||||
container_name: zk2
|
||||
hostname: zk2
|
||||
image: bitnami/zookeeper:3.7.1
|
||||
ports:
|
||||
- 21812:2181
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
- ZOO_SERVER_ID=2
|
||||
- ZOO_SERVERS=zk1:2888:3888,0.0.0.0:2888:3888,zk3:2888:3888
|
||||
zk3:
|
||||
container_name: zk3
|
||||
hostname: zk3
|
||||
image: bitnami/zookeeper:3.7.1
|
||||
ports:
|
||||
- 21813:2181
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
- ZOO_SERVER_ID=3
|
||||
- ZOO_SERVERS=zk1:2888:3888,zk2:2888:3888,0.0.0.0:2888:3888
|
||||
zoonavigator:
|
||||
container_name: zoonavigator
|
||||
image: elkozmon/zoonavigator
|
||||
ports:
|
||||
- 9000:9000
|
||||
|
||||
# Meilisearch instances
|
||||
# m1:
|
||||
# container_name: m1
|
||||
# hostname: m1
|
||||
# image: getmeili/meilisearch:prototype-zookeeper-ha-0
|
||||
# ports:
|
||||
# - 7700:7700
|
||||
# environment:
|
||||
# - MEILI_ZK_URL=zk1:2181
|
||||
# - MEILI_MASTER_KEY=masterkey
|
||||
# restart: always
|
||||
# m2:
|
||||
# container_name: m2
|
||||
# hostname: m2
|
||||
# image: getmeili/meilisearch:prototype-zookeeper-ha-0
|
||||
# ports:
|
||||
# - 7701:7700
|
||||
# environment:
|
||||
# - MEILI_ZK_URL=zk2:2181
|
||||
# - MEILI_MASTER_KEY=masterkey
|
||||
# restart: always
|
@ -22,20 +22,6 @@ pub enum Error {
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl Deref for File {
|
||||
type Target = NamedTempFile;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.file
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for File {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.file
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FileStore {
|
||||
path: PathBuf,
|
||||
@ -146,6 +132,20 @@ impl File {
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for File {
|
||||
type Target = NamedTempFile;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.file
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for File {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.file
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::io::Write;
|
||||
|
61
ha_test/run.sh
Normal file
61
ha_test/run.sh
Normal file
@ -0,0 +1,61 @@
|
||||
#!/bin/bash
|
||||
|
||||
function is_everything_installed {
|
||||
everything_ok=yes
|
||||
|
||||
if hash zkli 2>/dev/null; then
|
||||
echo "âś… zkli installed"
|
||||
else
|
||||
everything_ok=no
|
||||
echo "🥺 zkli is missing, please run \`cargo install zkli\`"
|
||||
fi
|
||||
|
||||
if hash s3cmd 2>/dev/null; then
|
||||
echo "âś… s3cmd installed"
|
||||
else
|
||||
everything_ok=no
|
||||
echo "🥺 s3cmd is missing, see how to install it here https://s3tools.org/s3cmd"
|
||||
fi
|
||||
|
||||
if [ $everything_ok = "no" ]; then
|
||||
echo "Exiting..."
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
# param: addr of zookeeper
|
||||
function connect_to_zookeeper {
|
||||
if ! zkli -a "$1" ls > /dev/null; then
|
||||
echo "zkli can't connect"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
# param: addr of the s3 bucket
|
||||
function connect_to_s3 {
|
||||
# S3_SECRET_KEY
|
||||
# S3_ACCESS_KEY
|
||||
# S3_HOST
|
||||
# S3_BUCKET
|
||||
|
||||
s3cmd --host="$S3_HOST" --host-bucket="$S3_BUCKET" --access_key="$ACCESS_KEY" --secret_key="$S3_SECRET_KEY" ls
|
||||
|
||||
if $?; then
|
||||
echo "s3cmd can't connect"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
is_everything_installed
|
||||
|
||||
ZOOKEEPER_ADDR="localhost:2181"
|
||||
if ! connect_to_zookeeper $ZOOKEEPER_ADDR; then
|
||||
ZOOKEEPER_ADDR="localhost:21811"
|
||||
if ! connect_to_zookeeper $ZOOKEEPER_ADDR; then
|
||||
echo "Can't connect to zkli"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
|
||||
connect_to_s3
|
@ -31,6 +31,10 @@ tempfile = "3.5.0"
|
||||
thiserror = "1.0.40"
|
||||
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
|
||||
uuid = { version = "1.3.1", features = ["serde", "v4"] }
|
||||
tokio = { version = "1.27.0", features = ["full"] }
|
||||
zookeeper = "0.8.0"
|
||||
parking_lot = "0.12.1"
|
||||
rust-s3 = { version = "0.33.0", default-features = false, features = ["sync-rustls-tls"] }
|
||||
|
||||
[dev-dependencies]
|
||||
big_s = "1.0.2"
|
||||
|
@ -43,7 +43,7 @@ use uuid::Uuid;
|
||||
|
||||
use crate::autobatcher::{self, BatchKind};
|
||||
use crate::utils::{self, swap_index_uid_in_task};
|
||||
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
|
||||
use crate::{Error, IndexSchedulerInner, ProcessingTasks, Result, TaskId};
|
||||
|
||||
/// Represents a combination of tasks that can all be processed at the same time.
|
||||
///
|
||||
@ -198,6 +198,35 @@ impl Batch {
|
||||
| IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the content fields uuids associated with this batch.
|
||||
pub fn content_uuids(&self) -> Vec<Uuid> {
|
||||
match self {
|
||||
Batch::TaskCancelation { .. }
|
||||
| Batch::TaskDeletion(_)
|
||||
| Batch::Dump(_)
|
||||
| Batch::IndexCreation { .. }
|
||||
| Batch::IndexDocumentDeletionByFilter { .. }
|
||||
| Batch::IndexUpdate { .. }
|
||||
| Batch::SnapshotCreation(_)
|
||||
| Batch::IndexDeletion { .. }
|
||||
| Batch::IndexSwap { .. } => vec![],
|
||||
Batch::IndexOperation { op, .. } => match op {
|
||||
IndexOperation::DocumentOperation { operations, .. } => operations
|
||||
.iter()
|
||||
.flat_map(|op| match op {
|
||||
DocumentOperation::Add(uuid) => Some(*uuid),
|
||||
DocumentOperation::Delete(_) => None,
|
||||
})
|
||||
.collect(),
|
||||
IndexOperation::DocumentDeletion { .. }
|
||||
| IndexOperation::Settings { .. }
|
||||
| IndexOperation::DocumentClear { .. }
|
||||
| IndexOperation::SettingsAndDocumentOperation { .. }
|
||||
| IndexOperation::DocumentClearAndSetting { .. } => vec![],
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexOperation {
|
||||
@ -213,7 +242,7 @@ impl IndexOperation {
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexScheduler {
|
||||
impl IndexSchedulerInner {
|
||||
/// Convert an [`BatchKind`](crate::autobatcher::BatchKind) into a [`Batch`].
|
||||
///
|
||||
/// ## Arguments
|
||||
@ -480,8 +509,7 @@ impl IndexScheduler {
|
||||
if let Some(task_id) = to_cancel.max() {
|
||||
// We retrieve the tasks that were processing before this tasks cancelation started.
|
||||
// We must *not* reset the processing tasks before calling this method.
|
||||
let ProcessingTasks { started_at, processing } =
|
||||
&*self.processing_tasks.read().unwrap();
|
||||
let ProcessingTasks { started_at, processing, .. } = &*self.processing_tasks.read();
|
||||
return Ok(Some(Batch::TaskCancelation {
|
||||
task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?,
|
||||
previous_started_at: *started_at,
|
||||
@ -1392,7 +1420,7 @@ impl IndexScheduler {
|
||||
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> {
|
||||
// 1. Remove from this list the tasks that we are not allowed to delete
|
||||
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
|
||||
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
||||
let processing_tasks = &self.processing_tasks.read().processing.clone();
|
||||
|
||||
let all_task_ids = self.all_task_ids(wtxn)?;
|
||||
let mut to_delete_tasks = all_task_ids & matched_tasks;
|
||||
|
@ -295,6 +295,11 @@ impl IndexMap {
|
||||
"Attempt to finish deletion of an index that was being closed"
|
||||
);
|
||||
}
|
||||
|
||||
/// Returns the indexes that were opened by the `IndexMap`.
|
||||
pub fn clear(&mut self) -> Vec<Index> {
|
||||
self.available.clear().into_iter().map(|(_, (_, index))| index).collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Create or open an index in the specified path.
|
||||
@ -335,7 +340,8 @@ mod tests {
|
||||
impl IndexMapper {
|
||||
fn test() -> (Self, Env, IndexSchedulerHandle) {
|
||||
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
|
||||
(index_scheduler.index_mapper, index_scheduler.env, handle)
|
||||
let index_scheduler = index_scheduler.inner();
|
||||
(index_scheduler.index_mapper.clone(), index_scheduler.env.clone(), handle)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ pub struct IndexMapper {
|
||||
pub(crate) index_stats: Database<UuidCodec, SerdeJson<IndexStats>>,
|
||||
|
||||
/// Path to the folder where the LMDB environments of each index are.
|
||||
base_path: PathBuf,
|
||||
pub(crate) base_path: PathBuf,
|
||||
/// The map size an index is opened with on the first time.
|
||||
index_base_map_size: usize,
|
||||
/// The quantity by which the map size of an index is incremented upon reopening, in bytes.
|
||||
@ -135,7 +135,7 @@ impl IndexMapper {
|
||||
index_growth_amount: usize,
|
||||
index_count: usize,
|
||||
enable_mdb_writemap: bool,
|
||||
indexer_config: IndexerConfig,
|
||||
indexer_config: Arc<IndexerConfig>,
|
||||
) -> Result<Self> {
|
||||
let mut wtxn = env.write_txn()?;
|
||||
let index_mapping = env.create_database(&mut wtxn, Some(INDEX_MAPPING))?;
|
||||
@ -150,7 +150,7 @@ impl IndexMapper {
|
||||
index_base_map_size,
|
||||
index_growth_amount,
|
||||
enable_mdb_writemap,
|
||||
indexer_config: Arc::new(indexer_config),
|
||||
indexer_config,
|
||||
})
|
||||
}
|
||||
|
||||
@ -428,6 +428,11 @@ impl IndexMapper {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the indexes that were opened by the `IndexMapper`.
|
||||
pub fn clear(&mut self) -> Vec<Index> {
|
||||
self.index_map.write().unwrap().clear()
|
||||
}
|
||||
|
||||
/// The stats of an index.
|
||||
///
|
||||
/// If available in the cache, they are directly returned.
|
||||
|
@ -1,5 +1,6 @@
|
||||
use std::collections::BTreeSet;
|
||||
use std::fmt::Write;
|
||||
use std::ops::Deref;
|
||||
|
||||
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, RoTxn};
|
||||
@ -8,12 +9,13 @@ use meilisearch_types::tasks::{Details, Task};
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::index_mapper::IndexMapper;
|
||||
use crate::{IndexScheduler, Kind, Status, BEI128};
|
||||
use crate::{IndexScheduler, IndexSchedulerInner, Kind, Status, BEI128};
|
||||
|
||||
pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||
scheduler.assert_internally_consistent();
|
||||
|
||||
let IndexScheduler {
|
||||
let inner = scheduler.inner();
|
||||
let IndexSchedulerInner {
|
||||
autobatching_enabled,
|
||||
must_stop_processing: _,
|
||||
processing_tasks,
|
||||
@ -38,13 +40,15 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||
test_breakpoint_sdr: _,
|
||||
planned_failures: _,
|
||||
run_loop_iteration: _,
|
||||
} = scheduler;
|
||||
zookeeper: _,
|
||||
options: _,
|
||||
} = inner.deref();
|
||||
|
||||
let rtxn = env.read_txn().unwrap();
|
||||
|
||||
let mut snap = String::new();
|
||||
|
||||
let processing_tasks = processing_tasks.read().unwrap().processing.clone();
|
||||
let processing_tasks = processing_tasks.read().processing.clone();
|
||||
snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n"));
|
||||
snap.push_str("### Processing Tasks:\n");
|
||||
snap.push_str(&snapshot_bitmap(&processing_tasks));
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,6 @@
|
||||
//! Thread-safe `Vec`-backend LRU cache using [`std::sync::atomic::AtomicU64`] for synchronization.
|
||||
|
||||
use std::mem;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
/// Thread-safe `Vec`-backend LRU cache
|
||||
@ -190,6 +191,11 @@ where
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns the generation associated to the key and values of the `LruMap`.
|
||||
pub fn clear(&mut self) -> Vec<(AtomicU64, (K, V))> {
|
||||
mem::take(&mut self.0.data)
|
||||
}
|
||||
}
|
||||
|
||||
/// The result of an insertion in a LRU map.
|
||||
|
@ -10,9 +10,9 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status
|
||||
use roaring::{MultiOps, RoaringBitmap};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
|
||||
use crate::{Error, IndexSchedulerInner, Result, Task, TaskId, BEI128};
|
||||
|
||||
impl IndexScheduler {
|
||||
impl IndexSchedulerInner {
|
||||
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
|
||||
enum_iterator::all().map(|s| self.get_status(rtxn, s)).union()
|
||||
}
|
||||
@ -331,11 +331,12 @@ pub fn clamp_to_page_size(size: usize) -> usize {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl IndexScheduler {
|
||||
impl crate::IndexScheduler {
|
||||
/// Asserts that the index scheduler's content is internally consistent.
|
||||
pub fn assert_internally_consistent(&self) {
|
||||
let rtxn = self.env.read_txn().unwrap();
|
||||
for task in self.all_tasks.iter(&rtxn).unwrap() {
|
||||
let this = self.inner();
|
||||
let rtxn = this.env.read_txn().unwrap();
|
||||
for task in this.all_tasks.iter(&rtxn).unwrap() {
|
||||
let (task_id, task) = task.unwrap();
|
||||
let task_id = task_id.get();
|
||||
|
||||
@ -354,21 +355,21 @@ impl IndexScheduler {
|
||||
} = task;
|
||||
assert_eq!(uid, task.uid);
|
||||
if let Some(task_index_uid) = &task_index_uid {
|
||||
assert!(self
|
||||
assert!(this
|
||||
.index_tasks
|
||||
.get(&rtxn, task_index_uid.as_str())
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.contains(task.uid));
|
||||
}
|
||||
let db_enqueued_at = self
|
||||
let db_enqueued_at = this
|
||||
.enqueued_at
|
||||
.get(&rtxn, &BEI128::new(enqueued_at.unix_timestamp_nanos()))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(db_enqueued_at.contains(task_id));
|
||||
if let Some(started_at) = started_at {
|
||||
let db_started_at = self
|
||||
let db_started_at = this
|
||||
.started_at
|
||||
.get(&rtxn, &BEI128::new(started_at.unix_timestamp_nanos()))
|
||||
.unwrap()
|
||||
@ -376,7 +377,7 @@ impl IndexScheduler {
|
||||
assert!(db_started_at.contains(task_id));
|
||||
}
|
||||
if let Some(finished_at) = finished_at {
|
||||
let db_finished_at = self
|
||||
let db_finished_at = this
|
||||
.finished_at
|
||||
.get(&rtxn, &BEI128::new(finished_at.unix_timestamp_nanos()))
|
||||
.unwrap()
|
||||
@ -384,9 +385,9 @@ impl IndexScheduler {
|
||||
assert!(db_finished_at.contains(task_id));
|
||||
}
|
||||
if let Some(canceled_by) = canceled_by {
|
||||
let db_canceled_tasks = self.get_status(&rtxn, Status::Canceled).unwrap();
|
||||
let db_canceled_tasks = this.get_status(&rtxn, Status::Canceled).unwrap();
|
||||
assert!(db_canceled_tasks.contains(uid));
|
||||
let db_canceling_task = self.get_task(&rtxn, canceled_by).unwrap().unwrap();
|
||||
let db_canceling_task = this.get_task(&rtxn, canceled_by).unwrap().unwrap();
|
||||
assert_eq!(db_canceling_task.status, Status::Succeeded);
|
||||
match db_canceling_task.kind {
|
||||
KindWithContent::TaskCancelation { query: _, tasks } => {
|
||||
@ -427,7 +428,7 @@ impl IndexScheduler {
|
||||
Details::IndexInfo { primary_key: pk1 } => match &kind {
|
||||
KindWithContent::IndexCreation { index_uid, primary_key: pk2 }
|
||||
| KindWithContent::IndexUpdate { index_uid, primary_key: pk2 } => {
|
||||
self.index_tasks
|
||||
this.index_tasks
|
||||
.get(&rtxn, index_uid.as_str())
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
@ -535,23 +536,23 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
assert!(self.get_status(&rtxn, status).unwrap().contains(uid));
|
||||
assert!(self.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid));
|
||||
assert!(this.get_status(&rtxn, status).unwrap().contains(uid));
|
||||
assert!(this.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid));
|
||||
|
||||
if let KindWithContent::DocumentAdditionOrUpdate { content_file, .. } = kind {
|
||||
match status {
|
||||
Status::Enqueued | Status::Processing => {
|
||||
assert!(self
|
||||
assert!(this
|
||||
.file_store
|
||||
.all_uuids()
|
||||
.unwrap()
|
||||
.any(|uuid| uuid.as_ref().unwrap() == &content_file),
|
||||
"Could not find uuid `{content_file}` in the file_store. Available uuids are {:?}.",
|
||||
self.file_store.all_uuids().unwrap().collect::<std::result::Result<Vec<_>, file_store::Error>>().unwrap(),
|
||||
this.file_store.all_uuids().unwrap().collect::<std::result::Result<Vec<_>, file_store::Error>>().unwrap(),
|
||||
);
|
||||
}
|
||||
Status::Succeeded | Status::Failed | Status::Canceled => {
|
||||
assert!(self
|
||||
assert!(this
|
||||
.file_store
|
||||
.all_uuids()
|
||||
.unwrap()
|
||||
|
@ -14,6 +14,7 @@ license.workspace = true
|
||||
base64 = "0.21.0"
|
||||
enum-iterator = "1.4.0"
|
||||
hmac = "0.12.1"
|
||||
log = "0.4.19"
|
||||
maplit = "1.0.2"
|
||||
meilisearch-types = { path = "../meilisearch-types" }
|
||||
rand = "0.8.5"
|
||||
@ -24,3 +25,4 @@ sha2 = "0.10.6"
|
||||
thiserror = "1.0.40"
|
||||
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
|
||||
uuid = { version = "1.3.1", features = ["serde", "v4"] }
|
||||
zookeeper = "0.8.0"
|
||||
|
@ -19,6 +19,7 @@ internal_error!(
|
||||
AuthControllerError: meilisearch_types::milli::heed::Error,
|
||||
std::io::Error,
|
||||
serde_json::Error,
|
||||
zookeeper::ZkError,
|
||||
std::str::Utf8Error
|
||||
);
|
||||
|
||||
|
@ -16,22 +16,113 @@ pub use store::open_auth_store_env;
|
||||
use store::{generate_key_as_hexa, HeedAuthStore};
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
use zookeeper::{
|
||||
Acl, AddWatchMode, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZooKeeper,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AuthController {
|
||||
store: Arc<HeedAuthStore>,
|
||||
master_key: Option<String>,
|
||||
zookeeper: Option<Arc<ZooKeeper>>,
|
||||
}
|
||||
|
||||
impl AuthController {
|
||||
pub fn new(db_path: impl AsRef<Path>, master_key: &Option<String>) -> Result<Self> {
|
||||
pub fn new(
|
||||
db_path: impl AsRef<Path>,
|
||||
master_key: &Option<String>,
|
||||
zookeeper: Option<Arc<ZooKeeper>>,
|
||||
) -> Result<Self> {
|
||||
let store = HeedAuthStore::new(db_path)?;
|
||||
let controller = Self { store: Arc::new(store), master_key: master_key.clone(), zookeeper };
|
||||
|
||||
if store.is_empty()? {
|
||||
generate_default_keys(&store)?;
|
||||
match controller.zookeeper {
|
||||
// setup the auth zk environment, the `auth` node
|
||||
Some(ref zookeeper) => {
|
||||
// Zookeeper Event listener loop
|
||||
let controller_clone = controller.clone();
|
||||
let zkk = zookeeper.clone();
|
||||
zookeeper.add_watch("/auth", AddWatchMode::PersistentRecursive, move |event| {
|
||||
let WatchedEvent { event_type, path, keeper_state: _ } = dbg!(event);
|
||||
|
||||
match event_type {
|
||||
WatchedEventType::NodeDeleted => {
|
||||
// TODO: ugly unwraps
|
||||
let path = path.unwrap();
|
||||
let uuid = path.strip_prefix("/auth/").unwrap();
|
||||
let uuid = Uuid::parse_str(&uuid).unwrap();
|
||||
log::info!("The key {} has been deleted", uuid);
|
||||
controller_clone.store.delete_api_key(uuid).unwrap();
|
||||
}
|
||||
WatchedEventType::NodeCreated | WatchedEventType::NodeDataChanged => {
|
||||
let path = path.unwrap();
|
||||
if path.strip_prefix("/auth/").map_or(false, |s| !s.is_empty()) {
|
||||
let (key, _stat) = zkk.get_data(&path, false).unwrap();
|
||||
let key: Key = serde_json::from_slice(&key).unwrap();
|
||||
log::info!("The key {} has been deleted", key.uid);
|
||||
controller_clone.store.put_api_key(key).unwrap();
|
||||
}
|
||||
}
|
||||
otherwise => panic!("Got the unexpected `{otherwise:?}` event!"),
|
||||
}
|
||||
})?;
|
||||
|
||||
// TODO: we should catch the potential unexpected errors here https://docs.rs/zookeeper-client/latest/zookeeper_client/struct.Client.html#method.create
|
||||
// for the moment we consider that `create` only returns Error::NodeExists.
|
||||
match zookeeper.create(
|
||||
"/auth",
|
||||
vec![],
|
||||
Acl::open_unsafe().clone(),
|
||||
CreateMode::Persistent,
|
||||
) {
|
||||
// If the store is empty, we must generate and push the default api-keys.
|
||||
Ok(_) => generate_default_keys(&controller)?,
|
||||
// If the node exist we should clear our DB and download all the existing api-keys
|
||||
Err(ZkError::NodeExists) => {
|
||||
log::warn!("Auth directory already exists, we need to clear our keys + import the one in zookeeper");
|
||||
|
||||
let store = controller.store.clone();
|
||||
store.delete_all_keys()?;
|
||||
let children = zookeeper
|
||||
.get_children("/auth", false)
|
||||
.expect("Internal, the auth directory was deleted during execution.");
|
||||
|
||||
log::info!("Importing {} api-keys", children.len());
|
||||
for path in children {
|
||||
log::info!(" Importing {}", path);
|
||||
match zookeeper.get_data(&format!("/auth/{}", &path), false) {
|
||||
Ok((key, _stat)) => {
|
||||
let key = serde_json::from_slice(&key).unwrap();
|
||||
let store = controller.store.clone();
|
||||
store.put_api_key(key)?;
|
||||
}
|
||||
Err(e) => panic!("{e}"),
|
||||
}
|
||||
// else the file was deleted while we were inserting the key. We ignore it.
|
||||
// TODO: What happens if someone updates the files before we have the time
|
||||
// to setup the watcher
|
||||
}
|
||||
}
|
||||
e @ Err(
|
||||
ZkError::NoNode | ZkError::NoChildrenForEphemerals | ZkError::InvalidACL,
|
||||
) => unreachable!("{e:?}"),
|
||||
Err(e) => panic!("{e}"),
|
||||
}
|
||||
// TODO: Race condition above:
|
||||
// What happens if two node join exactly at the same moment:
|
||||
// One will create the dir
|
||||
// The second one will delete its DB, load nothing and install a watcher
|
||||
// The first one will push its keys and should wake up and update the second one.
|
||||
// / BUT, if the second one delete its DB and the first one push its files before the second one install the watcher we're fucked
|
||||
}
|
||||
None => {
|
||||
if controller.store.is_empty()? {
|
||||
generate_default_keys(&controller)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self { store: Arc::new(store), master_key: master_key.clone() })
|
||||
Ok(controller)
|
||||
}
|
||||
|
||||
/// Return `Ok(())` if the auth controller is able to access one of its database.
|
||||
@ -53,7 +144,24 @@ impl AuthController {
|
||||
pub fn create_key(&self, create_key: CreateApiKey) -> Result<Key> {
|
||||
match self.store.get_api_key(create_key.uid)? {
|
||||
Some(_) => Err(AuthControllerError::ApiKeyAlreadyExists(create_key.uid.to_string())),
|
||||
None => self.store.put_api_key(create_key.to_key()),
|
||||
None => self.put_key(create_key.to_key()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_key(&self, key: Key) -> Result<Key> {
|
||||
let store = self.store.clone();
|
||||
match &self.zookeeper {
|
||||
Some(zookeeper) => {
|
||||
zookeeper.create(
|
||||
&format!("/auth/{}", key.uid),
|
||||
serde_json::to_vec_pretty(&key)?,
|
||||
Acl::open_unsafe().clone(),
|
||||
CreateMode::Persistent,
|
||||
)?;
|
||||
|
||||
Ok(key)
|
||||
}
|
||||
None => store.put_api_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,7 +176,20 @@ impl AuthController {
|
||||
name => key.name = name.set(),
|
||||
};
|
||||
key.updated_at = OffsetDateTime::now_utc();
|
||||
self.store.put_api_key(key)
|
||||
let store = self.store.clone();
|
||||
// TODO: we may commit only after zk persisted the keys
|
||||
match &self.zookeeper {
|
||||
Some(zookeeper) => {
|
||||
zookeeper.set_data(
|
||||
&format!("/auth/{}", key.uid),
|
||||
serde_json::to_vec_pretty(&key)?,
|
||||
None,
|
||||
)?;
|
||||
|
||||
Ok(key)
|
||||
}
|
||||
None => store.put_api_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_key(&self, uid: Uuid) -> Result<Key> {
|
||||
@ -110,7 +231,19 @@ impl AuthController {
|
||||
}
|
||||
|
||||
pub fn delete_key(&self, uid: Uuid) -> Result<()> {
|
||||
if self.store.delete_api_key(uid)? {
|
||||
let deleted = match &self.zookeeper {
|
||||
Some(zookeeper) => match zookeeper.delete(&format!("/auth/{}", uid), None) {
|
||||
Ok(()) => true,
|
||||
Err(ZkError::NoNode) => false,
|
||||
Err(e) => return Err(e.into()),
|
||||
},
|
||||
None => {
|
||||
let store = self.store.clone();
|
||||
store.delete_api_key(uid)?
|
||||
}
|
||||
};
|
||||
|
||||
if deleted {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(AuthControllerError::ApiKeyNotFound(uid.to_string()))
|
||||
@ -159,7 +292,7 @@ impl AuthController {
|
||||
self.store.delete_all_keys()
|
||||
}
|
||||
|
||||
/// Delete all the keys in the DB.
|
||||
/// Insert a key in the DB without any check on its validity
|
||||
pub fn raw_insert_key(&mut self, key: Key) -> Result<()> {
|
||||
self.store.put_api_key(key)?;
|
||||
Ok(())
|
||||
@ -304,10 +437,9 @@ pub struct IndexSearchRules {
|
||||
pub filter: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
fn generate_default_keys(store: &HeedAuthStore) -> Result<()> {
|
||||
store.put_api_key(Key::default_admin())?;
|
||||
store.put_api_key(Key::default_search())?;
|
||||
|
||||
fn generate_default_keys(controller: &AuthController) -> Result<()> {
|
||||
controller.put_key(Key::default_admin())?;
|
||||
controller.put_key(Key::default_search())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -80,6 +80,7 @@ reqwest = { version = "0.11.16", features = [
|
||||
], default-features = false }
|
||||
rustls = "0.20.8"
|
||||
rustls-pemfile = "1.0.2"
|
||||
rust-s3 = { version = "0.33.0", default-features = false, features = ["sync-rustls-tls"] }
|
||||
segment = { version = "0.2.2", optional = true }
|
||||
serde = { version = "1.0.160", features = ["derive"] }
|
||||
serde_json = { version = "1.0.95", features = ["preserve_order"] }
|
||||
@ -105,6 +106,7 @@ walkdir = "2.3.3"
|
||||
yaup = "0.2.1"
|
||||
serde_urlencoded = "0.7.1"
|
||||
termcolor = "1.2.0"
|
||||
zookeeper = "0.8.0"
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.8.0"
|
||||
|
@ -312,6 +312,13 @@ impl From<Opt> for Infos {
|
||||
config_file_path,
|
||||
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
||||
no_analytics: _,
|
||||
zk_url: _,
|
||||
s3_url: _,
|
||||
s3_region: _,
|
||||
s3_bucket: _,
|
||||
s3_access_key: _,
|
||||
s3_secret_key: _,
|
||||
s3_security_token: _,
|
||||
} = options;
|
||||
|
||||
let schedule_snapshot = match schedule_snapshot {
|
||||
|
@ -39,6 +39,9 @@ use meilisearch_types::versioning::{check_version_file, create_version_file};
|
||||
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
|
||||
pub use option::Opt;
|
||||
use option::ScheduleSnapshot;
|
||||
use s3::creds::Credentials;
|
||||
use s3::{Bucket, Region};
|
||||
use zookeeper::ZooKeeper;
|
||||
|
||||
use crate::error::MeilisearchHttpError;
|
||||
|
||||
@ -136,14 +139,17 @@ enum OnFailure {
|
||||
KeepDb,
|
||||
}
|
||||
|
||||
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
|
||||
pub async fn setup_meilisearch(
|
||||
opt: &Opt,
|
||||
zookeeper: Option<Arc<ZooKeeper>>,
|
||||
) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
|
||||
let empty_db = is_empty_db(&opt.db_path);
|
||||
let (index_scheduler, auth_controller) = if let Some(ref snapshot_path) = opt.import_snapshot {
|
||||
let snapshot_path_exists = snapshot_path.exists();
|
||||
// the db is empty and the snapshot exists, import it
|
||||
if empty_db && snapshot_path_exists {
|
||||
match compression::from_tar_gz(snapshot_path, &opt.db_path) {
|
||||
Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?,
|
||||
Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb, zookeeper)?,
|
||||
Err(e) => {
|
||||
std::fs::remove_dir_all(&opt.db_path)?;
|
||||
return Err(e);
|
||||
@ -160,14 +166,14 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
bail!("snapshot doesn't exist at {}", snapshot_path.display())
|
||||
// the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag
|
||||
} else {
|
||||
open_or_create_database(opt, empty_db)?
|
||||
open_or_create_database(opt, empty_db, zookeeper)?
|
||||
}
|
||||
} else if let Some(ref path) = opt.import_dump {
|
||||
let src_path_exists = path.exists();
|
||||
// the db is empty and the dump exists, import it
|
||||
if empty_db && src_path_exists {
|
||||
let (mut index_scheduler, mut auth_controller) =
|
||||
open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?;
|
||||
open_or_create_database_unchecked(opt, OnFailure::RemoveDb, zookeeper)?;
|
||||
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
|
||||
Ok(()) => (index_scheduler, auth_controller),
|
||||
Err(e) => {
|
||||
@ -187,10 +193,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
// the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag
|
||||
// or, the dump is missing but we can ignore that because of the ignore_missing_dump flag
|
||||
} else {
|
||||
open_or_create_database(opt, empty_db)?
|
||||
open_or_create_database(opt, empty_db, zookeeper)?
|
||||
}
|
||||
} else {
|
||||
open_or_create_database(opt, empty_db)?
|
||||
open_or_create_database(opt, empty_db, zookeeper)?
|
||||
};
|
||||
|
||||
// We create a loop in a thread that registers snapshotCreation tasks
|
||||
@ -199,15 +205,12 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
if let ScheduleSnapshot::Enabled(snapshot_delay) = opt.schedule_snapshot {
|
||||
let snapshot_delay = Duration::from_secs(snapshot_delay);
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
thread::Builder::new()
|
||||
.name(String::from("register-snapshot-tasks"))
|
||||
.spawn(move || loop {
|
||||
thread::sleep(snapshot_delay);
|
||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
||||
error!("Error while registering snapshot: {}", e);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
thread::spawn(move || loop {
|
||||
thread::sleep(snapshot_delay);
|
||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
||||
error!("Error while registering snapshot: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok((index_scheduler, auth_controller))
|
||||
@ -217,34 +220,52 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
fn open_or_create_database_unchecked(
|
||||
opt: &Opt,
|
||||
on_failure: OnFailure,
|
||||
zookeeper: Option<Arc<ZooKeeper>>,
|
||||
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
||||
// we don't want to create anything in the data.ms yet, thus we
|
||||
// wrap our two builders in a closure that'll be executed later.
|
||||
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key);
|
||||
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zookeeper.clone());
|
||||
let instance_features = opt.to_instance_features();
|
||||
let index_scheduler_builder = || -> anyhow::Result<_> {
|
||||
Ok(IndexScheduler::new(IndexSchedulerOptions {
|
||||
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
|
||||
auth_path: opt.db_path.join("auth"),
|
||||
tasks_path: opt.db_path.join("tasks"),
|
||||
update_file_path: opt.db_path.join("update_files"),
|
||||
indexes_path: opt.db_path.join("indexes"),
|
||||
snapshots_path: opt.snapshot_dir.clone(),
|
||||
dumps_path: opt.dump_dir.clone(),
|
||||
task_db_size: opt.max_task_db_size.get_bytes() as usize,
|
||||
index_base_map_size: opt.max_index_size.get_bytes() as usize,
|
||||
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
||||
indexer_config: (&opt.indexer_options).try_into()?,
|
||||
autobatching_enabled: true,
|
||||
max_number_of_tasks: 1_000_000,
|
||||
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
|
||||
index_count: DEFAULT_INDEX_COUNT,
|
||||
instance_features,
|
||||
})?)
|
||||
};
|
||||
let index_scheduler = IndexScheduler::new(Arc::new(IndexSchedulerOptions {
|
||||
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
|
||||
auth_path: opt.db_path.join("auth"),
|
||||
tasks_path: opt.db_path.join("tasks"),
|
||||
update_file_path: opt.db_path.join("update_files"),
|
||||
indexes_path: opt.db_path.join("indexes"),
|
||||
snapshots_path: opt.snapshot_dir.clone(),
|
||||
dumps_path: opt.dump_dir.clone(),
|
||||
task_db_size: opt.max_task_db_size.get_bytes() as usize,
|
||||
index_base_map_size: opt.max_index_size.get_bytes() as usize,
|
||||
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
||||
indexer_config: (&opt.indexer_options).try_into().map(Arc::new)?,
|
||||
autobatching_enabled: true,
|
||||
max_number_of_tasks: 1_000_000,
|
||||
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
|
||||
index_count: DEFAULT_INDEX_COUNT,
|
||||
instance_features,
|
||||
zookeeper: zookeeper.clone(),
|
||||
s3: opt.s3_url.as_ref().map(|url| {
|
||||
Arc::new(
|
||||
Bucket::new(
|
||||
opt.s3_bucket.as_deref().unwrap(),
|
||||
Region::Custom { region: opt.s3_region.clone(), endpoint: url.clone() },
|
||||
Credentials {
|
||||
access_key: opt.s3_access_key.clone(),
|
||||
secret_key: opt.s3_secret_key.clone(),
|
||||
security_token: opt.s3_security_token.clone(),
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.with_path_style(),
|
||||
)
|
||||
}),
|
||||
}))
|
||||
.map_err(anyhow::Error::from);
|
||||
|
||||
match (
|
||||
index_scheduler_builder(),
|
||||
index_scheduler,
|
||||
auth_controller.map_err(anyhow::Error::from),
|
||||
create_version_file(&opt.db_path).map_err(anyhow::Error::from),
|
||||
) {
|
||||
@ -262,12 +283,13 @@ fn open_or_create_database_unchecked(
|
||||
fn open_or_create_database(
|
||||
opt: &Opt,
|
||||
empty_db: bool,
|
||||
zookeeper: Option<Arc<ZooKeeper>>,
|
||||
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
||||
if !empty_db {
|
||||
check_version_file(&opt.db_path)?;
|
||||
}
|
||||
|
||||
open_or_create_database_unchecked(opt, OnFailure::KeepDb)
|
||||
open_or_create_database_unchecked(opt, OnFailure::KeepDb, zookeeper)
|
||||
}
|
||||
|
||||
fn import_dump(
|
||||
@ -277,6 +299,7 @@ fn import_dump(
|
||||
auth: &mut AuthController,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let reader = File::open(dump_path)?;
|
||||
let index_scheduler = index_scheduler.inner();
|
||||
let mut dump_reader = dump::DumpReader::open(reader)?;
|
||||
|
||||
if let Some(date) = dump_reader.date() {
|
||||
|
@ -2,6 +2,7 @@ use std::env;
|
||||
use std::io::{stderr, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_web::http::KeepAlive;
|
||||
use actix_web::web::Data;
|
||||
@ -12,6 +13,7 @@ use meilisearch::analytics::Analytics;
|
||||
use meilisearch::{analytics, create_app, prototype_name, setup_meilisearch, Opt};
|
||||
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
|
||||
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
|
||||
use zookeeper::ZooKeeper;
|
||||
|
||||
#[global_allocator]
|
||||
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
||||
@ -63,7 +65,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
_ => (),
|
||||
}
|
||||
|
||||
let (index_scheduler, auth_controller) = setup_meilisearch(&opt)?;
|
||||
let timeout = Duration::from_millis(2500);
|
||||
let zookeeper =
|
||||
opt.zk_url.as_ref().map(|url| Arc::new(ZooKeeper::connect(url, timeout, drop).unwrap()));
|
||||
let (index_scheduler, auth_controller) = setup_meilisearch(&opt, zookeeper).await?;
|
||||
|
||||
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
||||
let analytics = if !opt.no_analytics {
|
||||
|
@ -28,6 +28,13 @@ const MEILI_DB_PATH: &str = "MEILI_DB_PATH";
|
||||
const MEILI_HTTP_ADDR: &str = "MEILI_HTTP_ADDR";
|
||||
const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY";
|
||||
const MEILI_ENV: &str = "MEILI_ENV";
|
||||
const MEILI_ZK_URL: &str = "MEILI_ZK_URL";
|
||||
const MEILI_S3_URL: &str = "MEILI_S3_URL";
|
||||
const MEILI_S3_BUCKET: &str = "MEILI_S3_BUCKET";
|
||||
const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
|
||||
const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
|
||||
const MEILI_S3_SECURITY_TOKEN: &str = "MEILI_S3_SECURITY_TOKEN";
|
||||
const MEILI_S3_REGION: &str = "MEILI_S3_REGION";
|
||||
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
||||
const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS";
|
||||
const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT";
|
||||
@ -56,6 +63,7 @@ const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
|
||||
const DEFAULT_DB_PATH: &str = "./data.ms";
|
||||
const DEFAULT_HTTP_ADDR: &str = "localhost:7700";
|
||||
const DEFAULT_ENV: &str = "development";
|
||||
const DEFAULT_S3_REGION: &str = "eu-central-1";
|
||||
const DEFAULT_HTTP_PAYLOAD_SIZE_LIMIT: &str = "100 MB";
|
||||
const DEFAULT_SNAPSHOT_DIR: &str = "snapshots/";
|
||||
const DEFAULT_SNAPSHOT_INTERVAL_SEC: u64 = 86400;
|
||||
@ -154,6 +162,36 @@ pub struct Opt {
|
||||
#[serde(default = "default_env")]
|
||||
pub env: String,
|
||||
|
||||
/// Sets the HTTP address and port used to communicate with the zookeeper cluster.
|
||||
/// If ran locally, the default url is `http://localhost:2181/`.
|
||||
#[clap(long, env = MEILI_ZK_URL)]
|
||||
pub zk_url: Option<String>,
|
||||
|
||||
/// Sets the address and port used to communicate with the S3 bucket.
|
||||
#[clap(long, env = MEILI_S3_URL)]
|
||||
pub s3_url: Option<String>,
|
||||
|
||||
/// Sets the region used to communicate with the s3 bucket.
|
||||
#[clap(long, env = MEILI_S3_REGION, default_value_t = default_s3_region())]
|
||||
#[serde(default = "default_s3_region")]
|
||||
pub s3_region: String,
|
||||
|
||||
/// Sets the S3 bucket name to use.
|
||||
#[clap(long, env = MEILI_S3_BUCKET)]
|
||||
pub s3_bucket: Option<String>,
|
||||
|
||||
/// Set the S3 access key. If used you must also set the secret key.
|
||||
#[clap(long, env = MEILI_S3_ACCESS_KEY)]
|
||||
pub s3_access_key: Option<String>,
|
||||
|
||||
/// Set the S3 secret key. If used you must also set the access key.
|
||||
#[clap(long, env = MEILI_S3_SECRET_KEY)]
|
||||
pub s3_secret_key: Option<String>,
|
||||
|
||||
/// Security token, can't be used with access key and secret key.
|
||||
#[clap(long, env = MEILI_S3_SECURITY_TOKEN)]
|
||||
pub s3_security_token: Option<String>,
|
||||
|
||||
/// Deactivates Meilisearch's built-in telemetry when provided.
|
||||
///
|
||||
/// Meilisearch automatically collects data from all instances that do not opt out using this flag.
|
||||
@ -368,6 +406,13 @@ impl Opt {
|
||||
http_addr,
|
||||
master_key,
|
||||
env,
|
||||
zk_url,
|
||||
s3_url,
|
||||
s3_region,
|
||||
s3_bucket,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_security_token,
|
||||
max_index_size: _,
|
||||
max_task_db_size: _,
|
||||
http_payload_size_limit,
|
||||
@ -401,6 +446,25 @@ impl Opt {
|
||||
export_to_env_if_not_present(MEILI_MASTER_KEY, master_key);
|
||||
}
|
||||
export_to_env_if_not_present(MEILI_ENV, env);
|
||||
if let Some(zk_url) = zk_url {
|
||||
export_to_env_if_not_present(MEILI_ZK_URL, zk_url);
|
||||
}
|
||||
if let Some(s3_url) = s3_url {
|
||||
export_to_env_if_not_present(MEILI_S3_URL, s3_url);
|
||||
}
|
||||
export_to_env_if_not_present(MEILI_S3_REGION, s3_region);
|
||||
if let Some(s3_bucket) = s3_bucket {
|
||||
export_to_env_if_not_present(MEILI_S3_BUCKET, s3_bucket);
|
||||
}
|
||||
if let Some(s3_access_key) = s3_access_key {
|
||||
export_to_env_if_not_present(MEILI_S3_ACCESS_KEY, s3_access_key);
|
||||
}
|
||||
if let Some(s3_secret_key) = s3_secret_key {
|
||||
export_to_env_if_not_present(MEILI_S3_SECRET_KEY, s3_secret_key);
|
||||
}
|
||||
if let Some(s3_security_token) = s3_security_token {
|
||||
export_to_env_if_not_present(MEILI_S3_SECURITY_TOKEN, s3_security_token);
|
||||
}
|
||||
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
||||
{
|
||||
export_to_env_if_not_present(MEILI_NO_ANALYTICS, no_analytics.to_string());
|
||||
@ -547,7 +611,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
Ok(Self {
|
||||
log_every_n: Some(DEFAULT_LOG_EVERY_N),
|
||||
max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize),
|
||||
thread_pool: Some(thread_pool),
|
||||
thread_pool: Some(Arc::new(thread_pool)),
|
||||
max_positions_per_attributes: None,
|
||||
skip_index_budget: other.skip_index_budget,
|
||||
..Default::default()
|
||||
@ -715,6 +779,10 @@ fn default_env() -> String {
|
||||
DEFAULT_ENV.to_string()
|
||||
}
|
||||
|
||||
fn default_s3_region() -> String {
|
||||
DEFAULT_S3_REGION.to_string()
|
||||
}
|
||||
|
||||
fn default_max_index_size() -> Byte {
|
||||
Byte::from_bytes(INDEX_SIZE)
|
||||
}
|
||||
|
@ -41,14 +41,10 @@ pub async fn create_api_key(
|
||||
_req: HttpRequest,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let v = body.into_inner();
|
||||
let res = tokio::task::spawn_blocking(move || -> Result<_, AuthControllerError> {
|
||||
let key = auth_controller.create_key(v)?;
|
||||
Ok(KeyView::from_key(key, &auth_controller))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
|
||||
let key = auth_controller.create_key(v)?;
|
||||
let key = KeyView::from_key(key, &auth_controller);
|
||||
|
||||
Ok(HttpResponse::Created().json(res))
|
||||
Ok(HttpResponse::Created().json(key))
|
||||
}
|
||||
|
||||
#[derive(Deserr, Debug, Clone, Copy)]
|
||||
@ -110,17 +106,11 @@ pub async fn patch_api_key(
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let key = path.into_inner().key;
|
||||
let patch_api_key = body.into_inner();
|
||||
let res = tokio::task::spawn_blocking(move || -> Result<_, AuthControllerError> {
|
||||
let uid =
|
||||
Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?;
|
||||
let key = auth_controller.update_key(uid, patch_api_key)?;
|
||||
let uid = Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?;
|
||||
let key = auth_controller.update_key(uid, patch_api_key)?;
|
||||
let key = KeyView::from_key(key, &auth_controller);
|
||||
|
||||
Ok(KeyView::from_key(key, &auth_controller))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
|
||||
|
||||
Ok(HttpResponse::Ok().json(res))
|
||||
Ok(HttpResponse::Ok().json(key))
|
||||
}
|
||||
|
||||
pub async fn delete_api_key(
|
||||
@ -128,13 +118,8 @@ pub async fn delete_api_key(
|
||||
path: web::Path<AuthParam>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let key = path.into_inner().key;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let uid =
|
||||
Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?;
|
||||
auth_controller.delete_key(uid)
|
||||
})
|
||||
.await
|
||||
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
|
||||
let uid = Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?;
|
||||
auth_controller.delete_key(uid)?;
|
||||
|
||||
Ok(HttpResponse::NoContent().finish())
|
||||
}
|
||||
|
@ -29,8 +29,7 @@ pub async fn create_dump(
|
||||
keys: auth_controller.list_keys()?,
|
||||
instance_uid: analytics.instance_uid().cloned(),
|
||||
};
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
|
@ -78,6 +78,6 @@ async fn patch_features(
|
||||
}),
|
||||
Some(&req),
|
||||
);
|
||||
index_scheduler.put_runtime_features(new_features)?;
|
||||
index_scheduler.inner().put_runtime_features(new_features)?;
|
||||
Ok(HttpResponse::Ok().json(new_features))
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
use std::io::ErrorKind;
|
||||
use std::io::{BufReader, ErrorKind, Seek, SeekFrom};
|
||||
|
||||
use actix_web::http::header::CONTENT_TYPE;
|
||||
use actix_web::web::Data;
|
||||
@ -129,8 +129,7 @@ pub async fn delete_document(
|
||||
index_uid: index_uid.to_string(),
|
||||
documents_ids: vec![document_id],
|
||||
};
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
}
|
||||
@ -396,11 +395,12 @@ async fn document_addition(
|
||||
return Err(MeilisearchHttpError::MissingPayload(format));
|
||||
}
|
||||
|
||||
if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await {
|
||||
if let Err(e) = buffer.seek(SeekFrom::Start(0)).await {
|
||||
return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
|
||||
}
|
||||
|
||||
let read_file = buffer.into_inner().into_std().await;
|
||||
let s3 = index_scheduler.s3.clone();
|
||||
let documents_count = tokio::task::spawn_blocking(move || {
|
||||
let documents_count = match format {
|
||||
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
|
||||
@ -409,8 +409,19 @@ async fn document_addition(
|
||||
}
|
||||
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
|
||||
};
|
||||
|
||||
if let Some(s3) = s3 {
|
||||
update_file.seek(SeekFrom::Start(0)).unwrap();
|
||||
let mut reader = BufReader::new(&*update_file);
|
||||
match s3.put_object_stream(&mut reader, format!("/update-files/{}", uuid)) {
|
||||
Ok(_) | Err(s3::error::S3Error::Http(_, _)) => (),
|
||||
Err(e) => panic!("Error {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||
update_file.persist()?;
|
||||
|
||||
Ok(documents_count)
|
||||
})
|
||||
.await;
|
||||
@ -445,7 +456,7 @@ async fn document_addition(
|
||||
};
|
||||
|
||||
let scheduler = index_scheduler.clone();
|
||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
|
||||
let task = match scheduler.register(task) {
|
||||
Ok(task) => task,
|
||||
Err(e) => {
|
||||
index_scheduler.delete_update_file(uuid)?;
|
||||
@ -476,8 +487,7 @@ pub async fn delete_documents_batch(
|
||||
|
||||
let task =
|
||||
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -512,8 +522,7 @@ pub async fn delete_documents_by_filter(
|
||||
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
||||
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
||||
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -529,8 +538,7 @@ pub async fn clear_all_documents(
|
||||
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
|
||||
|
||||
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
|
@ -135,8 +135,7 @@ pub async fn create_index(
|
||||
);
|
||||
|
||||
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
} else {
|
||||
@ -203,8 +202,7 @@ pub async fn update_index(
|
||||
primary_key: body.primary_key,
|
||||
};
|
||||
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -216,8 +214,7 @@ pub async fn delete_index(
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
}
|
||||
|
@ -55,10 +55,7 @@ macro_rules! make_setting_route {
|
||||
is_deletion: true,
|
||||
allow_index_creation,
|
||||
};
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
||||
.await??
|
||||
.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -97,10 +94,7 @@ macro_rules! make_setting_route {
|
||||
is_deletion: false,
|
||||
allow_index_creation,
|
||||
};
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
||||
.await??
|
||||
.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -586,8 +580,7 @@ pub async fn update_all(
|
||||
is_deletion: false,
|
||||
allow_index_creation,
|
||||
};
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -622,8 +615,7 @@ pub async fn delete_all(
|
||||
is_deletion: true,
|
||||
allow_index_creation,
|
||||
};
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
|
@ -18,7 +18,6 @@ use serde_json::json;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use time::macros::format_description;
|
||||
use time::{Date, Duration, OffsetDateTime, Time};
|
||||
use tokio::task;
|
||||
|
||||
use super::SummarizedTaskView;
|
||||
use crate::analytics::Analytics;
|
||||
@ -325,15 +324,12 @@ async fn cancel_tasks(
|
||||
|
||||
let query = params.into_query();
|
||||
|
||||
let (tasks, _) = index_scheduler.get_task_ids_from_authorized_indexes(
|
||||
&index_scheduler.read_txn()?,
|
||||
&query,
|
||||
index_scheduler.filters(),
|
||||
)?;
|
||||
let (tasks, _) =
|
||||
index_scheduler.get_task_ids_from_authorized_indexes(&query, index_scheduler.filters())?;
|
||||
let task_cancelation =
|
||||
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
|
||||
|
||||
let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??;
|
||||
let task = index_scheduler.register(task_cancelation)?;
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
Ok(HttpResponse::Ok().json(task))
|
||||
@ -370,15 +366,12 @@ async fn delete_tasks(
|
||||
);
|
||||
let query = params.into_query();
|
||||
|
||||
let (tasks, _) = index_scheduler.get_task_ids_from_authorized_indexes(
|
||||
&index_scheduler.read_txn()?,
|
||||
&query,
|
||||
index_scheduler.filters(),
|
||||
)?;
|
||||
let (tasks, _) =
|
||||
index_scheduler.get_task_ids_from_authorized_indexes(&query, index_scheduler.filters())?;
|
||||
let task_deletion =
|
||||
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
|
||||
|
||||
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??;
|
||||
let task = index_scheduler.register(task_deletion)?;
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
Ok(HttpResponse::Ok().json(task))
|
||||
|
@ -39,7 +39,7 @@ impl Server {
|
||||
|
||||
let options = default_settings(dir.path());
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, None).await.unwrap();
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
|
||||
Server { service, _dir: Some(dir) }
|
||||
@ -54,7 +54,7 @@ impl Server {
|
||||
|
||||
options.master_key = Some("MASTER_KEY".to_string());
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, None).await.unwrap();
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
|
||||
Server { service, _dir: Some(dir) }
|
||||
@ -67,7 +67,7 @@ impl Server {
|
||||
}
|
||||
|
||||
pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> {
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options)?;
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, None).await?;
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
|
||||
Ok(Server { service, _dir: None })
|
||||
|
@ -17,7 +17,7 @@ bincode = "1.3.3"
|
||||
bstr = "1.4.0"
|
||||
bytemuck = { version = "1.13.1", features = ["extern_crate_alloc"] }
|
||||
byteorder = "1.4.3"
|
||||
charabia = { version = "0.8.2", default-features = false }
|
||||
charabia = { version = "0.8.4", default-features = false }
|
||||
concat-arrays = "0.1.2"
|
||||
crossbeam-channel = "0.5.8"
|
||||
deserr = "0.5.0"
|
||||
|
@ -1,3 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use grenad::CompressionType;
|
||||
use rayon::ThreadPool;
|
||||
|
||||
@ -9,7 +11,7 @@ pub struct IndexerConfig {
|
||||
pub max_memory: Option<usize>,
|
||||
pub chunk_compression_type: CompressionType,
|
||||
pub chunk_compression_level: Option<u32>,
|
||||
pub thread_pool: Option<ThreadPool>,
|
||||
pub thread_pool: Option<Arc<ThreadPool>>,
|
||||
pub max_positions_per_attributes: Option<u32>,
|
||||
pub skip_index_budget: bool,
|
||||
}
|
||||
|
Reference in New Issue
Block a user