Compare commits

..

17 Commits

288 changed files with 2119 additions and 4811 deletions

View File

@ -23,8 +23,7 @@ A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Meilisearch version:**
[e.g. v0.20.0]
**Meilisearch version:** [e.g. v0.20.0]
**Additional context**
Additional information that may be relevant to the issue.

View File

@ -1,34 +0,0 @@
---
name: New sprint issue
about: ⚠️ Should only be used by the engine team ⚠️
title: ''
labels: ''
assignees: ''
---
Related product team resources: [roadmap card]() (_internal only_) and [PRD]() (_internal only_)
Related product discussion:
Related spec: WIP
## Motivation
<!---Copy/paste the information in the roadmap resources or briefly detail the product motivation. Ask product team if any hesitation.-->
## Usage
<!---Write a quick description of the usage if the usage has already been defined-->
Refer to the final spec to know the details and the final decisions about the usage.
## TODO
<!---Feel free to adapt this list with more technical/product steps-->
- [ ] Release a prototype
- [ ] If prototype validated, merge changes into `main`
- [ ] Update the spec
## Impacted teams
<!---Ping the related teams. Ask for the engine manager if any hesitation-->

View File

@ -96,12 +96,14 @@ jobs:
publish-macos-apple-silicon:
name: Publish binary for macOS silicon
runs-on: macos-12
runs-on: ${{ matrix.os }}
needs: check-version
strategy:
fail-fast: false
matrix:
include:
- target: aarch64-apple-darwin
- os: macos-12
target: aarch64-apple-darwin
asset_name: meilisearch-macos-apple-silicon
steps:
- name: Checkout repository
@ -130,29 +132,21 @@ jobs:
publish-aarch64:
name: Publish binary for aarch64
runs-on: ubuntu-latest
runs-on: ${{ matrix.os }}
needs: check-version
container:
# Use ubuntu-18.04 to compile with glibc 2.27
image: ubuntu:18.04
strategy:
fail-fast: false
matrix:
include:
- target: aarch64-unknown-linux-gnu
- build: aarch64
os: ubuntu-18.04
target: aarch64-unknown-linux-gnu
linker: gcc-aarch64-linux-gnu
use-cross: true
asset_name: meilisearch-linux-aarch64
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Install needed dependencies
run: |
apt-get update -y && apt upgrade -y
apt-get install -y curl build-essential gcc-aarch64-linux-gnu
- name: Set up Docker for cross compilation
run: |
apt-get install -y curl apt-transport-https ca-certificates software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
add-apt-repository "deb [arch=$(dpkg --print-architecture)] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
apt-get update -y && apt-get install -y docker-ce
- name: Installing Rust toolchain
uses: actions-rs/toolchain@v1
with:
@ -160,7 +154,15 @@ jobs:
profile: minimal
target: ${{ matrix.target }}
override: true
- name: APT update
run: |
sudo apt update
- name: Install target specific tools
if: matrix.use-cross
run: |
sudo apt-get install -y ${{ matrix.linker }}
- name: Configure target aarch64 GNU
if: matrix.target == 'aarch64-unknown-linux-gnu'
## Environment variable is not passed using env:
## LD gold won't work with MUSL
# env:
@ -174,10 +176,8 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: build
use-cross: true
use-cross: ${{ matrix.use-cross }}
args: --release --target ${{ matrix.target }}
env:
CROSS_DOCKER_IN_DOCKER: true
- name: List target output files
run: ls -lR ./target
- name: Upload the binary to release

View File

@ -43,7 +43,7 @@ jobs:
toolchain: nightly
override: true
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.1
uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo check without any default features
uses: actions-rs/cargo@v1
with:
@ -65,7 +65,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.1
uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo check without any default features
uses: actions-rs/cargo@v1
with:
@ -123,7 +123,7 @@ jobs:
toolchain: stable
override: true
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.1
uses: Swatinem/rust-cache@v2.2.0
- name: Run tests in debug
uses: actions-rs/cargo@v1
with:
@ -142,7 +142,7 @@ jobs:
override: true
components: clippy
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.1
uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo clippy
uses: actions-rs/cargo@v1
with:
@ -162,7 +162,7 @@ jobs:
override: true
components: rustfmt
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.1
uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo fmt
# Since we never ran the `build.rs` script in the benchmark directory we are missing one auto-generated import file.
# Since we want to trigger (and fail) this action as fast as possible, instead of building the benchmark crate

129
Cargo.lock generated
View File

@ -252,7 +252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8"
dependencies = [
"cfg-if",
"cipher",
"cipher 0.3.0",
"cpufeatures",
"opaque-debug",
]
@ -523,6 +523,17 @@ version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
[[package]]
name = "bus"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80cb4625f5b60155ff1018c9d4ce2e38bf5ae3e5780dfab9fa68bb44a6b751e2"
dependencies = [
"crossbeam-channel",
"num_cpus",
"parking_lot_core",
]
[[package]]
name = "byte-unit"
version = "4.0.18"
@ -641,6 +652,17 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7fc89c7c5b9e7a02dfe45cd2367bae382f9ed31c61ca8debe5f827c420a2f08"
dependencies = [
"cfg-if",
"cipher 0.4.4",
"cpufeatures",
]
[[package]]
name = "change-detection"
version = "1.2.0"
@ -712,6 +734,16 @@ dependencies = [
"generic-array",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "3.2.23"
@ -770,6 +802,24 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "cluster"
version = "1.1.0"
dependencies = [
"bus",
"crossbeam",
"ductile",
"log",
"meilisearch-types",
"roaring",
"serde",
"serde_json",
"synchronoise",
"thiserror",
"time",
"uuid 1.3.0",
]
[[package]]
name = "concat-arrays"
version = "0.1.2"
@ -1148,6 +1198,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "ductile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cde25956886749c891a27249630ae99471f1ba05c4a924aad1a6ffe6932812"
dependencies = [
"anyhow",
"bincode",
"chacha20",
"crossbeam-channel",
"log",
"rand",
"serde",
]
[[package]]
name = "dump"
version = "1.1.0"
@ -1170,14 +1235,14 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
name = "either"
version = "1.8.0"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
dependencies = [
"serde",
]
@ -1376,7 +1441,7 @@ dependencies = [
"faux",
"tempfile",
"thiserror",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -1895,6 +1960,7 @@ dependencies = [
"anyhow",
"big_s",
"bincode",
"cluster",
"crossbeam",
"csv",
"derive_builder",
@ -1915,7 +1981,7 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -1929,6 +1995,15 @@ dependencies = [
"serde",
]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]]
name = "insta"
version = "1.26.0"
@ -2473,6 +2548,7 @@ dependencies = [
"bytes",
"cargo_toml",
"clap 4.0.32",
"cluster",
"crossbeam-channel",
"deserr",
"dump",
@ -2533,7 +2609,7 @@ dependencies = [
"tokio-stream",
"toml",
"urlencoding",
"uuid 1.2.2",
"uuid 1.3.0",
"vergen",
"walkdir",
"yaup",
@ -2545,6 +2621,7 @@ name = "meilisearch-auth"
version = "1.1.0"
dependencies = [
"base64 0.13.1",
"cluster",
"enum-iterator",
"hmac",
"maplit",
@ -2556,7 +2633,7 @@ dependencies = [
"sha2",
"thiserror",
"time",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -2586,7 +2663,7 @@ dependencies = [
"thiserror",
"time",
"tokio",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -2661,7 +2738,7 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.2.2",
"uuid 1.3.0",
]
[[package]]
@ -3499,9 +3576,9 @@ checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a"
[[package]]
name = "serde"
version = "1.0.152"
version = "1.0.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
checksum = "71f2b4817415c6d4210bfe1c7bfcf4801b2d904cb4d0e1a8fdb651013c9e86b8"
dependencies = [
"serde_derive",
]
@ -3517,9 +3594,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.152"
version = "1.0.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
checksum = "d071a94a3fac4aff69d023a7f411e33f40f3483f8c5190b1953822b6b76d7630"
dependencies = [
"proc-macro2",
"quote",
@ -3528,9 +3605,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.91"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea"
dependencies = [
"indexmap",
"itoa 1.0.5",
@ -3816,18 +3893,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.38"
version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
checksum = "a5ab016db510546d856297882807df8da66a16fb8c4101cb8b30054b0d5b2d9c"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.38"
version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e"
dependencies = [
"proc-macro2",
"quote",
@ -3836,9 +3913,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.17"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376"
checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890"
dependencies = [
"itoa 1.0.5",
"serde",
@ -3854,9 +3931,9 @@ checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd"
[[package]]
name = "time-macros"
version = "0.2.6"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2"
checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36"
dependencies = [
"time-core",
]
@ -4101,9 +4178,9 @@ dependencies = [
[[package]]
name = "uuid"
version = "1.2.2"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c"
checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79"
dependencies = [
"getrandom",
"serde",

View File

@ -9,6 +9,7 @@ members = [
"dump",
"file-store",
"permissive-json-pointer",
"cluster",
"milli",
"filter-parser",
"flatten-serde-json",

25
cluster/Cargo.toml Normal file
View File

@ -0,0 +1,25 @@
[package]
name = "cluster"
publish = false
version.workspace = true
authors.workspace = true
description.workspace = true
homepage.workspace = true
readme.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
ductile = "0.3.0"
serde = { version = "1.0.155", features = ["derive"] }
serde_json = "1.0.94"
thiserror = "1.0.39"
meilisearch-types = { path = "../meilisearch-types" }
roaring = { version = "0.10.1", features = ["serde"] }
log = "0.4.17"
crossbeam = "0.8.2"
bus = "2.3.0"
time = "0.3.20"
uuid = { version = "1.3.0", features = ["v4"] }
synchronoise = "1.0.1"

148
cluster/src/batch.rs Normal file
View File

@ -0,0 +1,148 @@
use meilisearch_types::milli::update::IndexDocumentsMethod;
use meilisearch_types::settings::{Settings, Unchecked};
use meilisearch_types::tasks::TaskId;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
/// [`self.ids()`](Batch::ids)), as well as additional information on how to
/// be processed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Batch {
TaskCancelation {
/// The task cancelation itself.
task: TaskId,
/// The date and time at which the previously processing tasks started.
previous_started_at: OffsetDateTime,
/// The list of tasks that were processing when this task cancelation appeared.
previous_processing_tasks: RoaringBitmap,
},
TaskDeletion(TaskId),
SnapshotCreation(Vec<TaskId>),
Dump(TaskId),
IndexOperation {
op: IndexOperation,
must_create_index: bool,
},
IndexCreation {
index_uid: String,
primary_key: Option<String>,
task: TaskId,
},
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
task: TaskId,
},
IndexDeletion {
index_uid: String,
tasks: Vec<TaskId>,
index_has_been_created: bool,
},
IndexSwap {
task: TaskId,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DocumentOperation {
Add(Uuid),
Delete(Vec<String>),
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexOperation {
DocumentOperation {
index_uid: String,
primary_key: Option<String>,
method: IndexDocumentsMethod,
documents_counts: Vec<u64>,
operations: Vec<DocumentOperation>,
tasks: Vec<TaskId>,
},
DocumentDeletion {
index_uid: String,
// The vec associated with each document deletion tasks.
documents: Vec<Vec<String>>,
tasks: Vec<TaskId>,
},
DocumentClear {
index_uid: String,
tasks: Vec<TaskId>,
},
Settings {
index_uid: String,
// The boolean indicates if it's a settings deletion or creation.
settings: Vec<(bool, Settings<Unchecked>)>,
tasks: Vec<TaskId>,
},
DocumentClearAndSetting {
index_uid: String,
cleared_tasks: Vec<TaskId>,
// The boolean indicates if it's a settings deletion or creation.
settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<TaskId>,
},
SettingsAndDocumentOperation {
index_uid: String,
primary_key: Option<String>,
method: IndexDocumentsMethod,
documents_counts: Vec<u64>,
operations: Vec<DocumentOperation>,
document_import_tasks: Vec<TaskId>,
// The boolean indicates if it's a settings deletion or creation.
settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<TaskId>,
},
}
impl Batch {
pub fn ids(&self) -> impl Iterator<Item = TaskId> {
type Ret = Box<dyn Iterator<Item = TaskId>>;
match self {
Batch::TaskCancelation { task, .. } => Box::new(std::iter::once(*task)) as Ret,
Batch::TaskDeletion(task) => Box::new(std::iter::once(*task)) as Ret,
Batch::SnapshotCreation(tasks) => Box::new(tasks.clone().into_iter()) as Ret,
Batch::Dump(task) => Box::new(std::iter::once(*task)) as Ret,
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::DocumentDeletion { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::DocumentClear { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::Settings { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::DocumentClearAndSetting {
cleared_tasks, settings_tasks, ..
} => {
Box::new(cleared_tasks.clone().into_iter().chain(settings_tasks.clone())) as Ret
}
IndexOperation::SettingsAndDocumentOperation {
document_import_tasks,
settings_tasks,
..
} => Box::new(
document_import_tasks.clone().into_iter().chain(settings_tasks.clone()),
) as Ret,
},
Batch::IndexCreation { task, .. } => Box::new(std::iter::once(*task)) as Ret,
Batch::IndexUpdate { task, .. } => Box::new(std::iter::once(*task)) as Ret,
Batch::IndexDeletion { tasks, .. } => Box::new(tasks.clone().into_iter()) as Ret,
Batch::IndexSwap { task } => Box::new(std::iter::once(*task)) as Ret,
}
}
}

276
cluster/src/leader.rs Normal file
View File

@ -0,0 +1,276 @@
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{atomic, Arc, Mutex, RwLock};
use std::time::Duration;
use bus::{Bus, BusReader};
use crossbeam::channel::{unbounded, Receiver, Sender};
use ductile::{ChannelReceiver, ChannelSender, ChannelServer};
use log::{info, warn};
use meilisearch_types::keys::Key;
use meilisearch_types::tasks::Task;
use synchronoise::SignalEvent;
use uuid::Uuid;
use crate::batch::Batch;
use crate::{ApiKeyOperation, Consistency, FollowerMsg, LeaderMsg};
#[derive(Clone)]
pub struct Leader {
task_ready_to_commit: Receiver<u32>,
broadcast_to_follower: Sender<LeaderMsg>,
needs_key_sender: Sender<Sender<Vec<Key>>>,
needs_key_receiver: Receiver<Sender<Vec<Key>>>,
pub wake_up: Arc<SignalEvent>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
batch_id: Arc<RwLock<u32>>,
}
impl Leader {
pub fn new(
listen_on: impl ToSocketAddrs + Send + 'static,
master_key: Option<String>,
) -> Leader {
let new_followers = Arc::new(AtomicUsize::new(0));
let active_followers = Arc::new(AtomicUsize::new(1));
let wake_up = Arc::new(SignalEvent::auto(true));
let (broadcast_to_follower, process_batch_receiver) = unbounded();
let (task_finished_sender, task_finished_receiver) = unbounded();
let (needs_key_sender, needs_key_receiver) = unbounded();
let nf = new_followers.clone();
let af = active_followers.clone();
let wu = wake_up.clone();
std::thread::spawn(move || {
Self::listener(
listen_on,
master_key,
nf,
af,
wu,
process_batch_receiver,
task_finished_sender,
)
});
Leader {
task_ready_to_commit: task_finished_receiver,
broadcast_to_follower,
needs_key_sender,
needs_key_receiver,
wake_up,
new_followers,
active_followers,
batch_id: Arc::default(),
}
}
pub fn has_new_followers(&self) -> bool {
self.new_followers.load(Ordering::Relaxed) != 0
}
/// Takes all the necessary channels to chat with the scheduler and give them
/// to each new followers
fn listener(
listen_on: impl ToSocketAddrs,
master_key: Option<String>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
wake_up: Arc<SignalEvent>,
broadcast_to_follower: Receiver<LeaderMsg>,
task_finished: Sender<u32>,
) {
let listener: ChannelServer<LeaderMsg, FollowerMsg> = if let Some(ref master_key) =
master_key
{
let mut enc = [0; 32];
let master_key = master_key.as_bytes();
if master_key.len() < 32 {
warn!("Master key is not secure, use a longer master key (at least 32 bytes long)");
}
enc.iter_mut().zip(master_key).for_each(|(enc, mk)| *enc = *mk);
info!("Listening with encryption enabled");
ChannelServer::bind_with_enc(listen_on, enc).unwrap()
} else {
ChannelServer::bind(listen_on).unwrap()
};
info!("Ready to the receive connections");
// We're going to broadcast all the batches to all our follower
let bus: Bus<LeaderMsg> = Bus::new(10);
let bus = Arc::new(Mutex::new(bus));
let b = bus.clone();
std::thread::spawn(move || loop {
let msg = broadcast_to_follower.recv().expect("Main thread is dead");
b.lock().unwrap().broadcast(msg);
});
for (sender, receiver, _addr) in listener {
let task_finished = task_finished.clone();
let nf = new_followers.clone();
let af = active_followers.clone();
let wu = wake_up.clone();
let process_batch = bus.lock().unwrap().add_rx();
std::thread::spawn(move || {
Self::follower(sender, receiver, nf, af, wu, process_batch, task_finished)
});
}
}
/// Allow a follower to chat with the scheduler
fn follower(
sender: ChannelSender<LeaderMsg>,
receiver: ChannelReceiver<FollowerMsg>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
wake_up: Arc<SignalEvent>,
mut broadcast_to_follower: BusReader<LeaderMsg>,
task_finished: Sender<u32>,
) {
let size = new_followers.fetch_add(1, Ordering::Relaxed) + 1;
wake_up.signal();
info!("A new follower joined the cluster. {} members.", size);
loop {
if let msg @ LeaderMsg::JoinFromDump(_) =
broadcast_to_follower.recv().expect("Main thread died")
{
// we exit the new_follower state and become an active follower even though
// the dump will takes some time to index
new_followers.fetch_sub(1, Ordering::Relaxed);
let size = active_followers.fetch_add(1, Ordering::Relaxed) + 1;
info!("A new follower became active. {} active members.", size);
sender.send(msg).unwrap();
break;
}
}
// send messages to the follower
std::thread::spawn(move || loop {
let msg = broadcast_to_follower.recv().expect("Main thread died");
match msg {
LeaderMsg::JoinFromDump(_) => (),
msg => {
if sender.send(msg).is_err() {
// the follower died, the logging and cluster size update should be done
// in the other thread
break;
}
}
}
});
// receive messages from the follower
loop {
match receiver.recv() {
Err(_) => break,
Ok(msg) => match msg {
FollowerMsg::ReadyToCommit(id) => {
task_finished.send(id).expect("Can't reach the main thread")
}
FollowerMsg::RegisterNewTask(_) => todo!(),
},
}
}
// if we exited from the previous loop it means the follower is down and should
// be removed from the cluster
let size = active_followers.fetch_sub(1, atomic::Ordering::Relaxed) - 1;
info!("A follower left the cluster. {} members.", size);
}
// ============= Everything related to the setup of the cluster
pub fn join_me(&self, dump: Vec<u8>) {
self.broadcast_to_follower
.send(LeaderMsg::JoinFromDump(dump))
.expect("Lost the link with the followers");
}
// ============= Everything related to the scheduler
pub fn starts_batch(&self, batch: Batch) {
let mut batch_id = self.batch_id.write().unwrap();
info!("Send the batch to process to the followers");
*batch_id += 1;
self.broadcast_to_follower
.send(LeaderMsg::StartBatch { id: *batch_id, batch })
.expect("Can't reach the cluster");
}
pub fn commit(&self, consistency_level: Consistency) {
info!("Wait until enough followers are ready to commit a batch");
let batch_id = self.batch_id.write().unwrap();
let mut nodes_ready_to_commit = 1;
loop {
let size = self.active_followers.load(atomic::Ordering::Relaxed);
info!("{nodes_ready_to_commit} nodes are ready to commit for a cluster size of {size}");
let all = nodes_ready_to_commit == size;
match consistency_level {
Consistency::One if nodes_ready_to_commit >= 1 || all => break,
Consistency::Two if nodes_ready_to_commit >= 2 || all => break,
Consistency::Quorum if nodes_ready_to_commit >= (size / 2) || all => break,
Consistency::All if all => break,
_ => (),
}
// we can't wait forever here because if a node dies the cluster size might get updated while we're stuck
match self.task_ready_to_commit.recv_timeout(Duration::new(1, 0)) {
Ok(id) if id == *batch_id => nodes_ready_to_commit += 1,
_ => continue,
};
}
info!("Tells all the follower to commit");
self.broadcast_to_follower.send(LeaderMsg::Commit(*batch_id)).unwrap();
}
pub fn register_new_task(&self, task: Task, update_file: Option<Vec<u8>>) {
info!("Tells all the follower to register a new task");
self.broadcast_to_follower
.send(LeaderMsg::RegisterNewTask { task, update_file })
.expect("Main thread is dead");
}
// ============= Everything related to the api-keys
pub fn insert_key(&self, key: Key) {
self.broadcast_to_follower
.send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Insert(key)))
.unwrap()
}
pub fn delete_key(&self, uuid: Uuid) {
self.broadcast_to_follower
.send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Delete(uuid)))
.unwrap()
}
pub fn needs_keys(&self) -> Sender<Vec<Key>> {
self.needs_key_receiver.recv().expect("The cluster is dead")
}
pub fn get_keys(&self) -> Vec<Key> {
let (send, rcv) = crossbeam::channel::bounded(1);
self.needs_key_sender.send(send).expect("The cluster is dead");
rcv.recv().expect("The auth controller is dead")
}
}

231
cluster/src/lib.rs Normal file
View File

@ -0,0 +1,231 @@
use std::net::ToSocketAddrs;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use batch::Batch;
use crossbeam::channel::{unbounded, Receiver, Sender};
use ductile::{connect_channel, connect_channel_with_enc, ChannelReceiver, ChannelSender};
use log::{info, warn};
use meilisearch_types::keys::Key;
use meilisearch_types::tasks::{KindWithContent, Task};
use serde::{Deserialize, Serialize};
pub mod batch;
mod leader;
pub use leader::Leader;
use uuid::Uuid;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Network issue occured")]
NetworkIssue,
#[error("Internal error: {0}")]
SerdeJson(#[from] serde_json::Error),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LeaderMsg {
/// A dump to join the cluster
JoinFromDump(Vec<u8>),
/// Starts a new batch
StartBatch { id: u32, batch: Batch },
/// Tell the follower to commit the update asap
Commit(u32),
/// Tell the follower to commit the update asap
RegisterNewTask { task: Task, update_file: Option<Vec<u8>> },
/// Tell the follower to commit the update asap
ApiKeyOperation(ApiKeyOperation),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FollowerMsg {
// Let the leader knows you're ready to commit
ReadyToCommit(u32),
RegisterNewTask(KindWithContent),
}
#[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum Consistency {
One,
Two,
Quorum,
#[default]
All,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ApiKeyOperation {
Insert(Key),
Delete(Uuid),
}
impl std::fmt::Display for Consistency {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Consistency::One => write!(f, "one"),
Consistency::Two => write!(f, "two"),
Consistency::Quorum => write!(f, "quorum"),
Consistency::All => write!(f, "all"),
}
}
}
impl FromStr for Consistency {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"one" => Ok(Consistency::One),
"two" => Ok(Consistency::Two),
"quorum" => Ok(Consistency::Quorum),
"all" => Ok(Consistency::All),
s => Err(format!(
"Unexpected value `{s}`, expected one of `one`, `two`, `quorum`, `all`"
)),
}
}
}
#[derive(Clone)]
pub enum Cluster {
Leader(Leader),
Follower(Follower),
}
#[derive(Clone)]
pub struct Follower {
sender: ChannelSender<FollowerMsg>,
get_batch: Receiver<(u32, Batch)>,
must_commit: Receiver<u32>,
register_new_task: Receiver<(Task, Option<Vec<u8>>)>,
api_key_op: Receiver<ApiKeyOperation>,
batch_id: Arc<RwLock<u32>>,
}
impl Follower {
pub fn join(leader: impl ToSocketAddrs, master_key: Option<String>) -> (Follower, Vec<u8>) {
let (sender, receiver) = if let Some(master_key) = master_key {
let mut enc = [0; 32];
let master_key = master_key.as_bytes();
if master_key.len() < 32 {
warn!("Master key is not secure, use a longer master key (at least 32 bytes long)");
}
enc.iter_mut().zip(master_key).for_each(|(enc, mk)| *enc = *mk);
info!("Connecting with encryption enabled");
connect_channel_with_enc(leader, &enc).unwrap()
} else {
connect_channel(leader).unwrap()
};
info!("Connection to the leader established");
info!("Waiting for the leader to contact us");
let state = receiver.recv().unwrap();
let dump = match state {
LeaderMsg::JoinFromDump(dump) => dump,
msg => panic!("Received unexpected message {msg:?}"),
};
let (get_batch_sender, get_batch_receiver) = unbounded();
let (must_commit_sender, must_commit_receiver) = unbounded();
let (register_task_sender, register_task_receiver) = unbounded();
let (create_api_key_sender, create_api_key_receiver) = unbounded();
std::thread::spawn(move || {
Self::router(
receiver,
get_batch_sender,
must_commit_sender,
register_task_sender,
create_api_key_sender,
);
});
(
Follower {
sender,
get_batch: get_batch_receiver,
must_commit: must_commit_receiver,
register_new_task: register_task_receiver,
api_key_op: create_api_key_receiver,
batch_id: Arc::default(),
},
dump,
)
}
fn router(
receiver: ChannelReceiver<LeaderMsg>,
get_batch: Sender<(u32, Batch)>,
must_commit: Sender<u32>,
register_new_task: Sender<(Task, Option<Vec<u8>>)>,
api_key_op: Sender<ApiKeyOperation>,
) {
loop {
match receiver.recv().expect("Lost connection to the leader") {
LeaderMsg::JoinFromDump(_) => {
warn!("Received a join from dump msg but I’m already running : ignoring the message")
}
LeaderMsg::StartBatch { id, batch } => {
info!("Starting to process a new batch");
get_batch.send((id, batch)).expect("Lost connection to the main thread")
}
LeaderMsg::Commit(id) => {
info!("Must commit");
must_commit.send(id).expect("Lost connection to the main thread")
}
LeaderMsg::RegisterNewTask { task, update_file } => {
info!("Registered a new task");
register_new_task
.send((task, update_file))
.expect("Lost connection to the main thread")
}
LeaderMsg::ApiKeyOperation(key) => {
api_key_op.send(key).expect("Lost connection to the main thread")
}
}
}
}
pub fn get_new_batch(&self) -> Batch {
info!("Get new batch called");
let (id, batch) = self.get_batch.recv().expect("Lost connection to the leader");
info!("Got a new batch");
*self.batch_id.write().unwrap() = id;
batch
}
pub fn ready_to_commit(&self) {
info!("I'm ready to commit");
let batch_id = self.batch_id.read().unwrap();
self.sender.send(FollowerMsg::ReadyToCommit(*batch_id)).unwrap();
loop {
let id = self.must_commit.recv().expect("Lost connection to the leader");
#[allow(clippy::comparison_chain)]
if id == *batch_id {
break;
} else if id > *batch_id {
panic!("We missed a batch");
}
}
info!("I got the right to commit");
}
pub fn get_new_task(&self) -> (Task, Option<Vec<u8>>) {
self.register_new_task.recv().expect("Lost connection to the leader")
}
pub fn api_key_operation(&self) -> ApiKeyOperation {
info!("Creating a new api key");
self.api_key_op.recv().expect("Lost connection to the leader")
}
}

View File

@ -159,7 +159,7 @@ impl<'a> Display for Error<'a> {
writeln!(f, "The `_geoBoundingBox` filter expects two pairs of arguments: `_geoBoundingBox([latitude, longitude], [latitude, longitude])`.")?
}
ErrorKind::ReservedGeo(name) => {
writeln!(f, "`{}` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance)` or `_geoBoundingBox([latitude, longitude], [latitude, longitude])` built-in rules to filter on `_geo` coordinates.", name.escape_debug())?
writeln!(f, "`{}` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance), or _geoBoundingBox([latitude, longitude], [latitude, longitude]) built-in rules to filter on `_geo` coordinates.", name.escape_debug())?
}
ErrorKind::MisusedGeoRadius => {
writeln!(f, "The `_geoRadius` filter is an operation and can't be used as a value.")?

View File

@ -141,7 +141,7 @@ pub enum FilterCondition<'a> {
Or(Vec<Self>),
And(Vec<Self>),
GeoLowerThan { point: [Token<'a>; 2], radius: Token<'a> },
GeoBoundingBox { top_right_point: [Token<'a>; 2], bottom_left_point: [Token<'a>; 2] },
GeoBoundingBox { top_left_point: [Token<'a>; 2], bottom_right_point: [Token<'a>; 2] },
}
impl<'a> FilterCondition<'a> {
@ -362,8 +362,8 @@ fn parse_geo_bounding_box(input: Span) -> IResult<FilterCondition> {
}
let res = FilterCondition::GeoBoundingBox {
top_right_point: [args[0][0].into(), args[0][1].into()],
bottom_left_point: [args[1][0].into(), args[1][1].into()],
top_left_point: [args[0][0].into(), args[0][1].into()],
bottom_right_point: [args[1][0].into(), args[1][1].into()],
};
Ok((input, res))
}
@ -382,34 +382,6 @@ fn parse_geo_point(input: Span) -> IResult<FilterCondition> {
Err(nom::Err::Failure(Error::new_from_kind(input, ErrorKind::ReservedGeo("_geoPoint"))))
}
/// geoPoint = WS* "_geoDistance(float WS* "," WS* float WS* "," WS* float)
fn parse_geo_distance(input: Span) -> IResult<FilterCondition> {
// we want to forbid space BEFORE the _geoDistance but not after
tuple((
multispace0,
tag("_geoDistance"),
// if we were able to parse `_geoDistance` we are going to return a Failure whatever happens next.
cut(delimited(char('('), separated_list1(tag(","), ws(recognize_float)), char(')'))),
))(input)
.map_err(|e| e.map(|_| Error::new_from_kind(input, ErrorKind::ReservedGeo("_geoDistance"))))?;
// if we succeeded we still return a `Failure` because `geoDistance` filters are not allowed
Err(nom::Err::Failure(Error::new_from_kind(input, ErrorKind::ReservedGeo("_geoDistance"))))
}
/// geo = WS* "_geo(float WS* "," WS* float WS* "," WS* float)
fn parse_geo(input: Span) -> IResult<FilterCondition> {
// we want to forbid space BEFORE the _geo but not after
tuple((
multispace0,
word_exact("_geo"),
// if we were able to parse `_geo` we are going to return a Failure whatever happens next.
cut(delimited(char('('), separated_list1(tag(","), ws(recognize_float)), char(')'))),
))(input)
.map_err(|e| e.map(|_| Error::new_from_kind(input, ErrorKind::ReservedGeo("_geo"))))?;
// if we succeeded we still return a `Failure` because `_geo` filter is not allowed
Err(nom::Err::Failure(Error::new_from_kind(input, ErrorKind::ReservedGeo("_geo"))))
}
fn parse_error_reserved_keyword(input: Span) -> IResult<FilterCondition> {
match parse_condition(input) {
Ok(result) => Ok(result),
@ -446,8 +418,6 @@ fn parse_primary(input: Span, depth: usize) -> IResult<FilterCondition> {
parse_not_exists,
parse_to,
// the next lines are only for error handling and are written at the end to have the less possible performance impact
parse_geo,
parse_geo_distance,
parse_geo_point,
parse_error_reserved_keyword,
))(input)
@ -651,35 +621,15 @@ pub mod tests {
"###);
insta::assert_display_snapshot!(p("_geoPoint(12, 13, 14)"), @r###"
`_geoPoint` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance)` or `_geoBoundingBox([latitude, longitude], [latitude, longitude])` built-in rules to filter on `_geo` coordinates.
`_geoPoint` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance), or _geoBoundingBox([latitude, longitude], [latitude, longitude]) built-in rules to filter on `_geo` coordinates.
1:22 _geoPoint(12, 13, 14)
"###);
insta::assert_display_snapshot!(p("position <= _geoPoint(12, 13, 14)"), @r###"
`_geoPoint` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance)` or `_geoBoundingBox([latitude, longitude], [latitude, longitude])` built-in rules to filter on `_geo` coordinates.
`_geoPoint` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance), or _geoBoundingBox([latitude, longitude], [latitude, longitude]) built-in rules to filter on `_geo` coordinates.
13:34 position <= _geoPoint(12, 13, 14)
"###);
insta::assert_display_snapshot!(p("_geoDistance(12, 13, 14)"), @r###"
`_geoDistance` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance)` or `_geoBoundingBox([latitude, longitude], [latitude, longitude])` built-in rules to filter on `_geo` coordinates.
1:25 _geoDistance(12, 13, 14)
"###);
insta::assert_display_snapshot!(p("position <= _geoDistance(12, 13, 14)"), @r###"
`_geoDistance` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance)` or `_geoBoundingBox([latitude, longitude], [latitude, longitude])` built-in rules to filter on `_geo` coordinates.
13:37 position <= _geoDistance(12, 13, 14)
"###);
insta::assert_display_snapshot!(p("_geo(12, 13, 14)"), @r###"
`_geo` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance)` or `_geoBoundingBox([latitude, longitude], [latitude, longitude])` built-in rules to filter on `_geo` coordinates.
1:17 _geo(12, 13, 14)
"###);
insta::assert_display_snapshot!(p("position <= _geo(12, 13, 14)"), @r###"
`_geo` is a reserved keyword and thus can't be used as a filter expression. Use the `_geoRadius(latitude, longitude, distance)` or `_geoBoundingBox([latitude, longitude], [latitude, longitude])` built-in rules to filter on `_geo` coordinates.
13:29 position <= _geo(12, 13, 14)
"###);
insta::assert_display_snapshot!(p("position <= _geoRadius(12, 13, 14)"), @r###"
The `_geoRadius` filter is an operation and can't be used as a value.
13:35 position <= _geoRadius(12, 13, 14)
@ -830,10 +780,7 @@ impl<'a> std::fmt::Display for FilterCondition<'a> {
FilterCondition::GeoLowerThan { point, radius } => {
write!(f, "_geoRadius({}, {}, {})", point[0], point[1], radius)
}
FilterCondition::GeoBoundingBox {
top_right_point: top_left_point,
bottom_left_point: bottom_right_point,
} => {
FilterCondition::GeoBoundingBox { top_left_point, bottom_right_point } => {
write!(
f,
"_geoBoundingBox([{}, {}], [{}, {}])",

View File

@ -7,8 +7,8 @@ use nom::{InputIter, InputLength, InputTake, Slice};
use crate::error::{ExpectedValueKind, NomErrorExt};
use crate::{
parse_geo, parse_geo_bounding_box, parse_geo_distance, parse_geo_point, parse_geo_radius,
Error, ErrorKind, IResult, Span, Token,
parse_geo_bounding_box, parse_geo_point, parse_geo_radius, Error, ErrorKind, IResult, Span,
Token,
};
/// This function goes through all characters in the [Span] if it finds any escaped character (`\`).
@ -88,16 +88,11 @@ pub fn parse_value(input: Span) -> IResult<Token> {
// then, we want to check if the user is misusing a geo expression
// This expression can’t finish without error.
// We want to return an error in case of failure.
let geo_reserved_parse_functions = [parse_geo_point, parse_geo_distance, parse_geo];
for parser in geo_reserved_parse_functions {
if let Err(err) = parser(input) {
if err.is_failure() {
return Err(err);
}
if let Err(err) = parse_geo_point(input) {
if err.is_failure() {
return Err(err);
}
}
match parse_geo_radius(input) {
Ok(_) => {
return Err(nom::Err::Failure(Error::new_from_kind(input, ErrorKind::MisusedGeoRadius)))

View File

@ -13,6 +13,8 @@ license.workspace = true
[dependencies]
anyhow = "1.0.64"
bincode = "1.3.3"
cluster = { path = "../cluster" }
crossbeam = "0.8.2"
csv = "1.1.6"
derive_builder = "0.11.2"
dump = { path = "../dump" }

View File

@ -22,7 +22,8 @@ use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::BufWriter;
use dump::IndexMetadata;
use crossbeam::utils::Backoff;
use dump::{DumpWriter, IndexMetadata};
use log::{debug, error, info};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@ -41,14 +42,14 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
use crate::{Cluster, Error, IndexScheduler, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
/// [`self.ids()`](Batch::ids)), as well as additional information on how to
/// be processed.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum Batch {
TaskCancelation {
/// The task cancelation itself.
@ -85,14 +86,14 @@ pub(crate) enum Batch {
},
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum DocumentOperation {
Add(Uuid),
Delete(Vec<String>),
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum IndexOperation {
DocumentOperation {
index_uid: String,
@ -586,6 +587,12 @@ impl IndexScheduler {
_ => unreachable!(),
}
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
// We must only remove the content files if the transaction is successfully committed
// and if errors occurs when we are deleting files we must do our best to delete
// everything. We do not return the encountered errors when deleting the content
@ -629,6 +636,13 @@ impl IndexScheduler {
}
_ => unreachable!(),
}
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
wtxn.commit()?;
Ok(vec![task])
}
@ -675,6 +689,9 @@ impl IndexScheduler {
}
// 3. Snapshot every indexes
// TODO we are opening all of the indexes it can be too much we should unload all
// of the indexes we are trying to open. It would be even better to only unload
// the ones that were opened by us. Or maybe use a LRU in the index mapper.
for result in self.index_mapper.index_mapping.iter(&rtxn)? {
let (name, uuid) = result?;
let index = self.index_mapper.index(&rtxn, name)?;
@ -711,14 +728,6 @@ impl IndexScheduler {
// 5.3 Change the permission to make the snapshot readonly
let mut permissions = file.metadata()?.permissions();
permissions.set_readonly(true);
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
#[allow(clippy::non_octal_unix_permissions)]
// rwxrwxrwx
permissions.set_mode(0b100100100);
}
file.set_permissions(permissions)?;
for task in &mut tasks {
@ -728,96 +737,9 @@ impl IndexScheduler {
Ok(tasks)
}
Batch::Dump(mut task) => {
// TODO: It would be better to use the started_at from the task instead of generating a new one
let started_at = OffsetDateTime::now_utc();
let (keys, instance_uid) =
if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind {
(keys, instance_uid)
} else {
unreachable!();
};
let dump = dump::DumpWriter::new(*instance_uid)?;
// 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys {
dump_keys.push_key(key)?;
}
dump_keys.flush()?;
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if t.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
t.status = Status::Succeeded;
t.started_at = Some(started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) =
cursor.next_document().map_err(milli::Error::from)?
{
dump_content_file.push_document(&obkv_to_object(
&doc,
&documents_batch_index,
)?)?;
}
dump_content_file.flush()?;
}
}
}
dump_tasks.flush()?;
// 3. Dump the indexes
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(index, &rtxn)?;
index_dumper.settings(&settings)?;
Ok(())
})?;
let dump = self.create_dump(&task, &started_at)?;
let dump_uid = started_at.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
@ -833,38 +755,27 @@ impl IndexScheduler {
Ok(vec![task])
}
Batch::IndexOperation { op, must_create_index } => {
let index_uid = op.index_uid().to_string();
let index_uid = op.index_uid();
let index = if must_create_index {
// create the index if it doesn't already exist
let wtxn = self.env.write_txn()?;
self.index_mapper.create_index(wtxn, &index_uid, None)?
self.index_mapper.create_index(wtxn, index_uid, None)?
} else {
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, &index_uid)?
self.index_mapper.index(&rtxn, index_uid)?
};
let mut index_wtxn = index.write_txn()?;
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
index_wtxn.commit()?;
// if the update processed successfully, we're going to store the new
// stats of the index. Since the tasks have already been processed and
// this is a non-critical operation. If it fails, we should not fail
// the entire batch.
let res = || -> Result<()> {
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
let mut wtxn = self.env.write_txn()?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
wtxn.commit()?;
Ok(())
}();
match res {
Ok(_) => (),
Err(e) => error!("Could not write the stats of the index {}", e),
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
index_wtxn.commit()?;
Ok(tasks)
}
Batch::IndexCreation { index_uid, primary_key, task } => {
@ -895,31 +806,9 @@ impl IndexScheduler {
)?;
index_wtxn.commit()?;
}
// drop rtxn before starting a new wtxn on the same db
rtxn.commit()?;
task.status = Status::Succeeded;
task.details = Some(Details::IndexInfo { primary_key });
// if the update processed successfully, we're going to store the new
// stats of the index. Since the tasks have already been processed and
// this is a non-critical operation. If it fails, we should not fail
// the entire batch.
let res = || -> Result<()> {
let mut wtxn = self.env.write_txn()?;
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
wtxn.commit()?;
Ok(())
}();
match res {
Ok(_) => (),
Err(e) => error!("Could not write the stats of the index {}", e),
}
Ok(vec![task])
}
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
@ -983,6 +872,13 @@ impl IndexScheduler {
for swap in swaps {
self.apply_index_swap(&mut wtxn, task.uid, &swap.indexes.0, &swap.indexes.1)?;
}
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
wtxn.commit()?;
task.status = Status::Succeeded;
Ok(vec![task])
@ -990,6 +886,99 @@ impl IndexScheduler {
}
}
pub(crate) fn create_dump(
&self,
task: &Task,
started_at: &OffsetDateTime,
) -> Result<DumpWriter> {
let (keys, instance_uid) =
if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind {
(keys, instance_uid)
} else {
unreachable!();
};
let dump = dump::DumpWriter::new(*instance_uid)?;
// 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys {
dump_keys.push_key(key)?;
}
dump_keys.flush()?;
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if t.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
t.status = Status::Succeeded;
t.started_at = Some(*started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? {
dump_content_file
.push_document(&obkv_to_object(&doc, &documents_batch_index)?)?;
}
dump_content_file.flush()?;
}
}
}
dump_tasks.flush()?;
// 3. Dump the indexes
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(index, &rtxn)?;
index_dumper.settings(&settings)?;
Ok(())
})?;
Ok(dump)
}
/// Swap the index `lhs` with the index `rhs`.
fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> {
// 1. Verify that both lhs and rhs are existing indexes
@ -1420,4 +1409,274 @@ impl IndexScheduler {
Ok(content_files_to_delete)
}
pub(crate) fn get_batch_from_cluster_batch(
&self,
batch: cluster::batch::Batch,
) -> Result<Batch> {
use cluster::batch::Batch as CBatch;
let mut rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
for id in batch.ids() {
let backoff = Backoff::new();
let id = BEU32::new(id);
loop {
if self.all_tasks.get(&rtxn, &id)?.is_some() {
info!("Found the task_id");
break;
}
info!("The task is not present in the task queue, we wait");
// we need to drop the txn to make a write visible
drop(rtxn);
backoff.spin();
rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
}
}
Ok(match batch {
CBatch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => {
Batch::TaskCancelation {
task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(),
previous_started_at,
previous_processing_tasks,
}
}
CBatch::TaskDeletion(task) => {
Batch::TaskDeletion(self.get_existing_tasks(&rtxn, Some(task))?[0].clone())
}
CBatch::SnapshotCreation(tasks) => {
Batch::SnapshotCreation(self.get_existing_tasks(&rtxn, tasks)?)
}
CBatch::Dump(task) => {
Batch::Dump(self.get_existing_tasks(&rtxn, Some(task))?[0].clone())
}
CBatch::IndexOperation { op, must_create_index } => Batch::IndexOperation {
op: self.get_index_op_from_cluster_index_op(&rtxn, op)?,
must_create_index,
},
CBatch::IndexCreation { index_uid, primary_key, task } => Batch::IndexCreation {
index_uid,
primary_key,
task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(),
},
CBatch::IndexUpdate { index_uid, primary_key, task } => Batch::IndexUpdate {
index_uid,
primary_key,
task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(),
},
CBatch::IndexDeletion { index_uid, tasks, index_has_been_created } => {
Batch::IndexDeletion {
index_uid,
tasks: self.get_existing_tasks(&rtxn, tasks)?,
index_has_been_created,
}
}
CBatch::IndexSwap { task } => {
Batch::IndexSwap { task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone() }
}
})
}
pub(crate) fn get_index_op_from_cluster_index_op(
&self,
rtxn: &RoTxn,
op: cluster::batch::IndexOperation,
) -> Result<IndexOperation> {
use cluster::batch::IndexOperation as COp;
Ok(match op {
COp::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
tasks,
} => IndexOperation::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::DocumentDeletion { index_uid, documents, tasks } => {
IndexOperation::DocumentDeletion {
index_uid,
documents,
tasks: self.get_existing_tasks(rtxn, tasks)?,
}
}
COp::DocumentClear { index_uid, tasks } => IndexOperation::DocumentClear {
index_uid,
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::Settings { index_uid, settings, tasks } => IndexOperation::Settings {
index_uid,
settings,
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::DocumentClearAndSetting { index_uid, cleared_tasks, settings, settings_tasks } => {
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks: self.get_existing_tasks(rtxn, cleared_tasks)?,
settings,
settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?,
}
}
COp::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
document_import_tasks,
settings,
settings_tasks,
} => IndexOperation::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
document_import_tasks: self.get_existing_tasks(rtxn, document_import_tasks)?,
settings,
settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?,
},
})
}
}
impl From<Batch> for cluster::batch::Batch {
fn from(batch: Batch) -> Self {
use cluster::batch::Batch as CBatch;
match batch {
Batch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => {
CBatch::TaskCancelation {
task: task.uid,
previous_started_at,
previous_processing_tasks,
}
}
Batch::TaskDeletion(task) => CBatch::TaskDeletion(task.uid),
Batch::SnapshotCreation(task) => {
CBatch::SnapshotCreation(task.into_iter().map(|task| task.uid).collect())
}
Batch::Dump(task) => CBatch::Dump(task.uid),
Batch::IndexOperation { op, must_create_index } => {
CBatch::IndexOperation { op: op.into(), must_create_index }
}
Batch::IndexCreation { index_uid, primary_key, task } => {
CBatch::IndexCreation { index_uid, primary_key, task: task.uid }
}
Batch::IndexUpdate { index_uid, primary_key, task } => {
CBatch::IndexUpdate { index_uid, primary_key, task: task.uid }
}
Batch::IndexDeletion { index_uid, tasks, index_has_been_created } => {
CBatch::IndexDeletion {
index_uid,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
index_has_been_created,
}
}
Batch::IndexSwap { task } => CBatch::IndexSwap { task: task.uid },
}
}
}
impl From<IndexOperation> for cluster::batch::IndexOperation {
fn from(op: IndexOperation) -> Self {
use cluster::batch::IndexOperation as COp;
match op {
IndexOperation::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
tasks,
} => COp::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::DocumentDeletion { index_uid, documents, tasks } => {
COp::DocumentDeletion {
index_uid,
documents,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
}
}
IndexOperation::DocumentClear { index_uid, tasks } => COp::DocumentClear {
index_uid,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::Settings { index_uid, settings, tasks } => COp::Settings {
index_uid,
settings,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks,
settings,
settings_tasks,
} => COp::DocumentClearAndSetting {
index_uid,
cleared_tasks: cleared_tasks.into_iter().map(|task| task.uid).collect(),
settings,
settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
document_import_tasks,
settings,
settings_tasks,
} => COp::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
document_import_tasks: document_import_tasks
.into_iter()
.map(|task| task.uid)
.collect(),
settings,
settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(),
},
}
}
}
impl From<DocumentOperation> for cluster::batch::DocumentOperation {
fn from(op: DocumentOperation) -> Self {
use cluster::batch::DocumentOperation as COp;
match op {
DocumentOperation::Add(uuid) => COp::Add(uuid),
DocumentOperation::Delete(docs) => COp::Delete(docs),
}
}
}
impl From<cluster::batch::DocumentOperation> for DocumentOperation {
fn from(op: cluster::batch::DocumentOperation) -> Self {
use cluster::batch::DocumentOperation as COp;
match op {
COp::Add(uuid) => DocumentOperation::Add(uuid),
COp::Delete(docs) => DocumentOperation::Delete(docs),
}
}
}

View File

@ -61,8 +61,6 @@ pub enum Error {
SwapDuplicateIndexesFound(Vec<String>),
#[error("Index `{0}` not found.")]
SwapIndexNotFound(String),
#[error("No space left in database. Free some space by deleting tasks.")]
NoSpaceLeftInTaskQueue,
#[error(
"Indexes {} not found.",
.0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ")
@ -154,8 +152,6 @@ impl ErrorCode for Error {
Error::TaskNotFound(_) => Code::TaskNotFound,
Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters,
Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters,
// TODO: not sure of the Code to use
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(),
Error::ProcessBatchPanicked => Code::Internal,

View File

@ -4,11 +4,10 @@ use std::time::Duration;
use std::{fs, thread};
use log::error;
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::types::Str;
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{FieldDistribution, Index};
use serde::{Deserialize, Serialize};
use meilisearch_types::milli::Index;
use time::OffsetDateTime;
use uuid::Uuid;
@ -20,7 +19,6 @@ use crate::{Error, Result};
mod index_map;
const INDEX_MAPPING: &str = "index-mapping";
const INDEX_STATS: &str = "index-stats";
/// Structure managing meilisearch's indexes.
///
@ -54,11 +52,6 @@ pub struct IndexMapper {
/// Map an index name with an index uuid currently available on disk.
pub(crate) index_mapping: Database<Str, UuidCodec>,
/// Map an index UUID with the cached stats associated to the index.
///
/// Using an UUID forces to use the index_mapping table to recover the index behind a name, ensuring
/// consistency wrt index swapping.
pub(crate) index_stats: Database<UuidCodec, SerdeJson<IndexStats>>,
/// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf,
@ -83,39 +76,6 @@ pub enum IndexStatus {
Available(Index),
}
/// The statistics that can be computed from an `Index` object.
#[derive(Serialize, Deserialize, Debug)]
pub struct IndexStats {
/// Number of documents in the index.
pub number_of_documents: u64,
/// Size of the index' DB, in bytes.
pub database_size: u64,
/// Association of every field name with the number of times it occurs in the documents.
pub field_distribution: FieldDistribution,
/// Creation date of the index.
pub created_at: OffsetDateTime,
/// Date of the last update of the index.
pub updated_at: OffsetDateTime,
}
impl IndexStats {
/// Compute the stats of an index
///
/// # Parameters
///
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
pub fn new(index: &Index, rtxn: &RoTxn) -> Result<Self> {
let database_size = index.on_disk_size()?;
Ok(IndexStats {
number_of_documents: index.number_of_documents(rtxn)?,
database_size,
field_distribution: index.field_distribution(rtxn)?,
created_at: index.created_at(rtxn)?,
updated_at: index.updated_at(rtxn)?,
})
}
}
impl IndexMapper {
pub fn new(
env: &Env,
@ -128,7 +88,6 @@ impl IndexMapper {
Ok(Self {
index_map: Arc::new(RwLock::new(IndexMap::new(index_count))),
index_mapping: env.create_database(Some(INDEX_MAPPING))?,
index_stats: env.create_database(Some(INDEX_STATS))?,
base_path,
index_base_map_size,
index_growth_amount,
@ -181,9 +140,6 @@ impl IndexMapper {
.get(&wtxn, name)?
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
// Not an error if the index had no stats in cache.
self.index_stats.delete(&mut wtxn, &uuid)?;
// Once we retrieved the UUID of the index we remove it from the mapping table.
assert!(self.index_mapping.delete(&mut wtxn, name)?);
@ -404,45 +360,6 @@ impl IndexMapper {
Ok(())
}
/// The stats of an index.
///
/// If available in the cache, they are directly returned.
/// Otherwise, the `Index` is opened to compute the stats on the fly (the result is not cached).
/// The stats for an index are cached after each `Index` update.
pub fn stats_of(&self, rtxn: &RoTxn, index_uid: &str) -> Result<IndexStats> {
let uuid = self
.index_mapping
.get(rtxn, index_uid)?
.ok_or_else(|| Error::IndexNotFound(index_uid.to_string()))?;
match self.index_stats.get(rtxn, &uuid)? {
Some(stats) => Ok(stats),
None => {
let index = self.index(rtxn, index_uid)?;
let index_rtxn = index.read_txn()?;
IndexStats::new(&index, &index_rtxn)
}
}
}
/// Stores the new stats for an index.
///
/// Expected usage is to compute the stats the index using `IndexStats::new`, the pass it to this function.
pub fn store_stats_of(
&self,
wtxn: &mut RwTxn,
index_uid: &str,
stats: &IndexStats,
) -> Result<()> {
let uuid = self
.index_mapping
.get(wtxn, index_uid)?
.ok_or_else(|| Error::IndexNotFound(index_uid.to_string()))?;
self.index_stats.put(wtxn, &uuid, stats)?;
Ok(())
}
pub fn index_exists(&self, rtxn: &RoTxn, name: &str) -> Result<bool> {
Ok(self.index_mapping.get(rtxn, name)?.is_some())
}

View File

@ -33,6 +33,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
snapshots_path: _,
auth_path: _,
version_file_path: _,
cluster: _,
consistency_level: _,
test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,
@ -254,16 +256,6 @@ pub fn snapshot_canceled_by(
snap
}
pub fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String {
let mut s = String::new();
let names = mapper.index_names(rtxn).unwrap();
for name in names {
let stats = mapper.stats_of(rtxn, &name).unwrap();
s.push_str(&format!(
"{name}: {{ number_of_documents: {}, field_distribution: {:?} }}\n",
stats.number_of_documents, stats.field_distribution
));
}
s
format!("{names:?}")
}

View File

@ -31,7 +31,7 @@ mod uuid_codec;
pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32;
use std::collections::HashMap;
use std::io::Write;
use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
@ -39,17 +39,22 @@ use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use batch::Batch;
use cluster::{Cluster, Consistency};
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use file_store::FileStore;
use log::info;
use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{self, Database, Env, RoTxn};
use meilisearch_types::milli;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
use serde::Deserialize;
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
@ -302,6 +307,11 @@ pub struct IndexScheduler {
/// The path to the version file of Meilisearch.
pub(crate) version_file_path: PathBuf,
/// The role in the cluster
pub(crate) cluster: Option<Cluster>,
/// The Consistency level used by the leader. Ignored if the node is not in a leader in cluster mode.
pub(crate) consistency_level: Consistency,
// ================= test
// The next entry is dedicated to the tests.
/// Provide a way to set a breakpoint in multiple part of the scheduler.
@ -321,6 +331,24 @@ pub struct IndexScheduler {
run_loop_iteration: Arc<RwLock<usize>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
pub enum ClusterMode {
Leader,
Follower,
}
impl std::str::FromStr for ClusterMode {
type Err = ();
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"leader" => Ok(ClusterMode::Leader),
"follower" => Ok(ClusterMode::Follower),
_ => Err(()),
}
}
}
impl IndexScheduler {
fn private_clone(&self) -> IndexScheduler {
IndexScheduler {
@ -343,6 +371,8 @@ impl IndexScheduler {
dumps_path: self.dumps_path.clone(),
auth_path: self.auth_path.clone(),
version_file_path: self.version_file_path.clone(),
cluster: self.cluster.clone(),
consistency_level: self.consistency_level,
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
#[cfg(test)]
@ -357,6 +387,8 @@ impl IndexScheduler {
/// Create an index scheduler and start its run loop.
pub fn new(
options: IndexSchedulerOptions,
cluster: Option<Cluster>,
consistency_level: Consistency,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> {
@ -416,6 +448,8 @@ impl IndexScheduler {
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
version_file_path: options.version_file_path,
cluster,
consistency_level,
#[cfg(test)]
test_breakpoint_sdr,
@ -508,6 +542,26 @@ impl IndexScheduler {
/// only once per index scheduler.
fn run(&self) {
let run = self.private_clone();
// if we're a follower we starts a thread to register the tasks coming from the leader
if let Some(Cluster::Follower(ref follower)) = self.cluster {
let this = self.private_clone();
let follower = follower.clone();
std::thread::spawn(move || loop {
let (task, content) = follower.get_new_task();
this.register_raw_task(task, content);
});
} else if let Some(Cluster::Leader(ref leader)) = self.cluster {
// we need a way to let the leader come out of its loop if a new follower joins the cluster
let cluster = leader.wake_up.clone();
let scheduler = self.wake_up.clone();
std::thread::spawn(move || loop {
cluster.wait();
scheduler.signal();
});
}
std::thread::Builder::new()
.name(String::from("scheduler"))
.spawn(move || {
@ -566,7 +620,7 @@ impl IndexScheduler {
}
/// Return the name of all indexes without opening them.
pub fn index_names(&self) -> Result<Vec<String>> {
pub fn index_names(self) -> Result<Vec<String>> {
let rtxn = self.env.read_txn()?;
self.index_mapper.index_names(&rtxn)
}
@ -821,13 +875,6 @@ impl IndexScheduler {
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
let mut wtxn = self.env.write_txn()?;
// if the task doesn't delete anything and 90% of the task queue is full, we must refuse to enqueue the incomming task
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
&& (self.env.real_disk_size()? * 100) / self.env.map_size()? as u64 > 90
{
return Err(Error::NoSpaceLeftInTaskQueue);
}
let mut task = Task {
uid: self.next_task_id(&wtxn)?,
enqueued_at: OffsetDateTime::now_utc(),
@ -872,6 +919,16 @@ impl IndexScheduler {
return Err(e.into());
}
if let Some(Cluster::Leader(leader)) = &self.cluster {
let update_file = if let Some(uuid) = task.content_uuid() {
let path = self.file_store.get_update_path(uuid);
Some(std::fs::read(path).unwrap())
} else {
None
};
leader.register_new_task(task.clone(), update_file);
}
// If the registered task is a task cancelation
// we inform the processing tasks to stop (if necessary).
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
@ -890,8 +947,153 @@ impl IndexScheduler {
/// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_task(&mut self) -> Result<Dump> {
Dump::new(self)
pub fn register_dumped_task(
&mut self,
task: TaskDump,
content_file: Option<Box<UpdateFile>>,
) -> Result<Task> {
// Currently we don't need to access the tasks queue while loading a dump thus I can block everything.
let mut wtxn = self.env.write_txn()?;
let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.create_update_file()?;
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
for doc in content_file {
builder.append_json_object(&doc?)?;
}
builder.into_inner()?;
file.persist()?;
Some(uuid)
}
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
// in case we try to open it later.
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
_ => None,
};
let task = Task {
uid: task.uid,
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
error: task.error,
canceled_by: task.canceled_by,
details: task.details,
status: task.status,
kind: match task.kind {
KindDump::DocumentImport {
primary_key,
method,
documents_count,
allow_index_creation,
} => KindWithContent::DocumentAdditionOrUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
method,
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
documents_count,
allow_index_creation,
},
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
documents_ids,
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::DocumentClear => KindWithContent::DocumentClear {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::Settings { settings, is_deletion, allow_index_creation } => {
KindWithContent::SettingsUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
new_settings: settings,
is_deletion,
allow_index_creation,
}
}
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
KindDump::TaskCancelation { query, tasks } => {
KindWithContent::TaskCancelation { query, tasks }
}
KindDump::TasksDeletion { query, tasks } => {
KindWithContent::TaskDeletion { query, tasks }
}
KindDump::DumpCreation { keys, instance_uid } => {
KindWithContent::DumpCreation { keys, instance_uid }
}
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
},
};
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task)?;
for index in task.indexes() {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
self.update_status(&mut wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})?;
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})?;
wtxn.commit()?;
self.wake_up.signal();
Ok(task)
}
/// /!\ should only be used when you're a follower in cluster mode
pub fn register_raw_task(&self, task: Task, content_file: Option<Vec<u8>>) {
if let Some(content) = content_file {
let uuid = task.content_uuid().expect("bad task");
let (_, mut file) = self.file_store.new_update_with_uuid(uuid.as_u128()).unwrap();
file.write_all(&content).unwrap();
file.persist().unwrap();
}
let mut wtxn = self.env.write_txn().unwrap();
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task).unwrap();
for index in task.indexes() {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})
.unwrap();
}
self.update_status(&mut wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})
.unwrap();
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})
.unwrap();
utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)
.unwrap();
wtxn.commit().unwrap();
self.wake_up.signal();
}
/// Create a new index without any associated task.
@ -950,14 +1152,15 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::Start);
}
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let batch =
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal),
};
info!("before getting a new batch");
let batch = match self.get_or_create_next_batch()? {
Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal),
};
info!("after getting a new batch");
let index_uid = batch.index_uid().map(ToOwned::to_owned);
drop(rtxn);
// TODO cluster: Should we send the starting date as well so everyone is in sync?
// 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids();
@ -1086,12 +1289,61 @@ impl IndexScheduler {
Ok(TickOutcome::TickAgain(processed_tasks))
}
pub fn index_stats(&self, index_uid: &str) -> Result<IndexStats> {
let is_indexing = self.is_index_processing(index_uid)?;
let rtxn = self.read_txn()?;
let index_stats = self.index_mapper.stats_of(&rtxn, index_uid)?;
/// If there is no cluster or if leader -> create a new batch
/// If follower -> wait till the leader gives us a batch to process
fn get_or_create_next_batch(&self) -> Result<Option<Batch>> {
info!("inside get or create next batch");
Ok(IndexStats { is_indexing, inner_stats: index_stats })
let batch = match &self.cluster {
None | Some(Cluster::Leader(_)) => {
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))?
}
Some(Cluster::Follower(follower)) => {
let batch = follower.get_new_batch();
Some(self.get_batch_from_cluster_batch(batch)?)
}
};
if let Some(Cluster::Leader(leader)) = &self.cluster {
// first, onboard the new followers
if leader.has_new_followers() {
info!("New followers are trying to join the cluster");
let started_at = OffsetDateTime::now_utc();
let dump = self
.create_dump(
&Task {
uid: TaskId::MAX,
enqueued_at: started_at,
started_at: Some(started_at),
finished_at: Some(started_at),
error: None,
canceled_by: None,
details: None,
status: Status::Enqueued,
kind: KindWithContent::DumpCreation {
keys: leader.get_keys(),
// TODO cluster: should we unify the instance_uid between every instances?
instance_uid: None,
},
},
&started_at,
)
.unwrap();
let mut buffer = Vec::new();
// TODO cluster: stop writing everything in RAM
dump.persist_to(&mut buffer).unwrap();
leader.join_me(buffer);
}
// second, starts processing the batch
if let Some(ref batch) = batch {
leader.starts_batch(batch.clone().into());
}
}
Ok(batch)
}
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
@ -1126,184 +1378,6 @@ impl IndexScheduler {
}
}
pub struct Dump<'a> {
index_scheduler: &'a IndexScheduler,
wtxn: RwTxn<'a, 'a>,
indexes: HashMap<String, RoaringBitmap>,
statuses: HashMap<Status, RoaringBitmap>,
kinds: HashMap<Kind, RoaringBitmap>,
}
impl<'a> Dump<'a> {
pub(crate) fn new(index_scheduler: &'a mut IndexScheduler) -> Result<Self> {
// While loading a dump no one should be able to access the scheduler thus I can block everything.
let wtxn = index_scheduler.env.write_txn()?;
Ok(Dump {
index_scheduler,
wtxn,
indexes: HashMap::new(),
statuses: HashMap::new(),
kinds: HashMap::new(),
})
}
/// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_task(
&mut self,
task: TaskDump,
content_file: Option<Box<UpdateFile>>,
) -> Result<Task> {
let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.index_scheduler.create_update_file()?;
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
for doc in content_file {
builder.append_json_object(&doc?)?;
}
builder.into_inner()?;
file.persist()?;
Some(uuid)
}
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
// in case we try to open it later.
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
_ => None,
};
let task = Task {
uid: task.uid,
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
error: task.error,
canceled_by: task.canceled_by,
details: task.details,
status: task.status,
kind: match task.kind {
KindDump::DocumentImport {
primary_key,
method,
documents_count,
allow_index_creation,
} => KindWithContent::DocumentAdditionOrUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
method,
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
documents_count,
allow_index_creation,
},
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
documents_ids,
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::DocumentClear => KindWithContent::DocumentClear {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::Settings { settings, is_deletion, allow_index_creation } => {
KindWithContent::SettingsUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
new_settings: settings,
is_deletion,
allow_index_creation,
}
}
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
KindDump::TaskCancelation { query, tasks } => {
KindWithContent::TaskCancelation { query, tasks }
}
KindDump::TasksDeletion { query, tasks } => {
KindWithContent::TaskDeletion { query, tasks }
}
KindDump::DumpCreation { keys, instance_uid } => {
KindWithContent::DumpCreation { keys, instance_uid }
}
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
},
};
self.index_scheduler.all_tasks.put(&mut self.wtxn, &BEU32::new(task.uid), &task)?;
for index in task.indexes() {
match self.indexes.get_mut(index) {
Some(bitmap) => {
bitmap.insert(task.uid);
}
None => {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(task.uid);
self.indexes.insert(index.to_string(), bitmap);
}
};
}
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.enqueued_at,
task.enqueued_at,
task.uid,
)?;
// we can't override the started_at & finished_at, so we must only set it if the tasks is finished and won't change
if matches!(task.status, Status::Succeeded | Status::Failed | Status::Canceled) {
if let Some(started_at) = task.started_at {
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.started_at,
started_at,
task.uid,
)?;
}
if let Some(finished_at) = task.finished_at {
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.finished_at,
finished_at,
task.uid,
)?;
}
}
self.statuses.entry(task.status).or_insert(RoaringBitmap::new()).insert(task.uid);
self.kinds.entry(task.kind.as_kind()).or_insert(RoaringBitmap::new()).insert(task.uid);
Ok(task)
}
/// Commit all the changes and exit the importing dump state
pub fn finish(mut self) -> Result<()> {
for (index, bitmap) in self.indexes {
self.index_scheduler.index_tasks.put(&mut self.wtxn, &index, &bitmap)?;
}
for (status, bitmap) in self.statuses {
self.index_scheduler.put_status(&mut self.wtxn, status, &bitmap)?;
}
for (kind, bitmap) in self.kinds {
self.index_scheduler.put_kind(&mut self.wtxn, kind, &bitmap)?;
}
self.wtxn.commit()?;
self.index_scheduler.wake_up.signal();
Ok(())
}
}
/// The outcome of calling the [`IndexScheduler::tick`] function.
pub enum TickOutcome {
/// The scheduler should immediately attempt another `tick`.
@ -1324,17 +1398,6 @@ struct IndexBudget {
task_db_size: usize,
}
/// The statistics that can be computed from an `Index` object and the scheduler.
///
/// Compared with `index_mapper::IndexStats`, it adds the scheduling status.
#[derive(Debug)]
pub struct IndexStats {
/// Whether this index is currently performing indexation, according to the scheduler.
pub is_indexing: bool,
/// Internal stats computed from the index.
pub inner_stats: index_mapper::IndexStats,
}
#[cfg(test)]
mod tests {
use std::io::{BufWriter, Seek, Write};
@ -1398,7 +1461,8 @@ mod tests {
autobatching_enabled,
};
let index_scheduler = Self::new(options, sender, planned_failures).unwrap();
let index_scheduler =
Self::new(options, None, Consistency::default(), sender, planned_failures).unwrap();
// To be 100% consistent between all test we're going to start the scheduler right now
// and ensure it's in the expected starting state.

View File

@ -1,5 +1,6 @@
---
source: index-scheduler/src/lib.rs
assertion_line: 1755
---
### Autobatching Enabled = true
### Processing Tasks:
@ -22,7 +23,7 @@ canceled [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:
1 [0,]

View File

@ -20,7 +20,7 @@ enqueued [0,1,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -25,9 +25,7 @@ catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
beavero: { number_of_documents: 0, field_distribution: {} }
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
["beavero", "catto"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -1,5 +1,6 @@
---
source: index-scheduler/src/lib.rs
assertion_line: 1859
---
### Autobatching Enabled = true
### Processing Tasks:
@ -26,9 +27,7 @@ catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
beavero: { number_of_documents: 0, field_distribution: {} }
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
["beavero", "catto"]
----------------------------------------------------------------------
### Canceled By:
3 [1,2,]

View File

@ -23,8 +23,7 @@ catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
["catto"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -25,8 +25,7 @@ catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
["catto"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -20,8 +20,7 @@ enqueued [0,1,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 0, field_distribution: {} }
["catto"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -1,5 +1,6 @@
---
source: index-scheduler/src/lib.rs
assertion_line: 1818
---
### Autobatching Enabled = true
### Processing Tasks:
@ -22,8 +23,7 @@ canceled [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 0, field_distribution: {} }
["catto"]
----------------------------------------------------------------------
### Canceled By:
1 [0,]

View File

@ -20,7 +20,7 @@ enqueued [0,1,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -21,8 +21,7 @@ succeeded [0,1,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
["catto"]
----------------------------------------------------------------------
### Canceled By:
1 []

View File

@ -19,8 +19,7 @@ succeeded [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
["catto"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -27,10 +27,7 @@ doggos [0,3,]
girafos [2,5,]
----------------------------------------------------------------------
### Index Mapper:
cattos: { number_of_documents: 0, field_distribution: {} }
doggos: { number_of_documents: 0, field_distribution: {} }
girafos: { number_of_documents: 0, field_distribution: {} }
["cattos", "doggos", "girafos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -19,8 +19,7 @@ succeeded [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -21,8 +21,7 @@ succeeded [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,8 +23,7 @@ succeeded [0,]
doggos [0,1,2,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 0, field_distribution: {} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,7 +23,7 @@ succeeded [0,1,2,]
doggos [0,1,2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -22,7 +22,7 @@ enqueued [0,1,2,]
doggos [0,1,2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -21,7 +21,7 @@ succeeded [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -21,7 +21,7 @@ failed [0,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -22,8 +22,7 @@ failed [0,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 3, field_distribution: {"catto": 1, "doggo": 2, "id": 3} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -19,7 +19,7 @@ failed [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -19,7 +19,7 @@ failed [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,8 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,8 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -19,8 +19,7 @@ succeeded [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
index_a [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
index_a [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -20,7 +20,7 @@ index_a [0,]
index_b [1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -22,7 +22,7 @@ index_a [0,2,]
index_b [1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -19,7 +19,7 @@ failed [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,8 +23,7 @@ cattos [1,]
doggos [0,2,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 0, field_distribution: {} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,9 +23,7 @@ cattos [1,]
doggos [0,2,]
----------------------------------------------------------------------
### Index Mapper:
cattos: { number_of_documents: 0, field_distribution: {} }
doggos: { number_of_documents: 0, field_distribution: {} }
["cattos", "doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,8 +23,7 @@ cattos [1,]
doggos [0,2,]
----------------------------------------------------------------------
### Index Mapper:
cattos: { number_of_documents: 0, field_distribution: {} }
["cattos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -20,7 +20,7 @@ cattos [1,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -22,7 +22,7 @@ cattos [1,]
doggos [0,2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,8 +23,7 @@ succeeded [0,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 0, field_distribution: {} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,8 +23,7 @@ succeeded [0,1,2,3,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 0, field_distribution: {} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -22,7 +22,7 @@ enqueued [0,1,2,3,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -21,7 +21,7 @@ enqueued [0,1,2,]
doggos [0,1,2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,8 +23,7 @@ succeeded [0,1,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 0, field_distribution: {} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,8 +23,7 @@ succeeded [0,1,2,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 0, field_distribution: {} }
["doggos"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -26,8 +26,7 @@ catto [0,2,]
doggo [1,2,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 0, field_distribution: {} }
["catto"]
----------------------------------------------------------------------
### Canceled By:
3 [1,2,]

View File

@ -23,10 +23,7 @@ doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 0, field_distribution: {} }
doggo: { number_of_documents: 0, field_distribution: {} }
whalo: { number_of_documents: 0, field_distribution: {} }
["catto", "doggo", "whalo"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -18,7 +18,7 @@ enqueued [0,]
doggo [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -20,7 +20,7 @@ doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -22,7 +22,7 @@ doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -24,9 +24,7 @@ doggo [1,]
whalo [2,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 0, field_distribution: {} }
doggo: { number_of_documents: 0, field_distribution: {} }
["catto", "doggo"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -22,7 +22,7 @@ doggo [1,]
whalo [2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -24,7 +24,7 @@ doggo [1,2,]
whalo [3,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -23,7 +23,7 @@ catto [0,1,2,]
doggo [3,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@ -25,8 +25,7 @@ c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
["a"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -25,9 +25,7 @@ c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
["a", "b"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -25,10 +25,7 @@ c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
["a", "b", "c"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -25,11 +25,7 @@ c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
d: { number_of_documents: 0, field_distribution: {} }
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -28,11 +28,7 @@ c [3,4,5,]
d [2,4,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
d: { number_of_documents: 0, field_distribution: {} }
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -27,11 +27,7 @@ c [2,4,]
d [3,4,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
d: { number_of_documents: 0, field_distribution: {} }
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -28,11 +28,7 @@ c [1,4,5,]
d [2,4,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
d: { number_of_documents: 0, field_distribution: {} }
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -29,11 +29,7 @@ c [1,4,5,]
d [2,4,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
d: { number_of_documents: 0, field_distribution: {} }
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -28,11 +28,7 @@ c [2,4,5,]
d [3,4,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
d: { number_of_documents: 0, field_distribution: {} }
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Canceled By:

View File

@ -25,11 +25,7 @@ c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
d: { number_of_documents: 0, field_distribution: {} }
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Canceled By:

Some files were not shown because too many files have changed in this diff Show More