Compare commits

...

71 Commits

Author SHA1 Message Date
a6863a7d6e use the zookeeper-client-sync from meilisearch instead of tamo 2023-11-08 16:21:53 +01:00
e4adf7fd6d fix the path of the deps 2023-11-07 16:17:20 +01:00
1585b3ed2f re-implement the snapshot import at startup 2023-11-07 14:15:57 +01:00
c48f72e6b8 fix a few warnings 2023-11-06 16:32:49 +01:00
41178e59fc fix the leader election 2023-11-06 15:00:14 +01:00
d1bc7ec58a fix all the bugs on the snapshots export/import pipeline 2023-11-06 12:34:48 +01:00
c5ec817f52 WIP fix a lot of bugs: The follower do not wake up when a new snapshot is available 2023-11-02 18:21:17 +01:00
d0a3582a56 no deadlock on start 2023-11-02 16:45:33 +01:00
0c18962b13 it compiles, now tokio is complaining that we block the main thread (and is right), I need to patch the zookeeper client to not rely on tokio anymore 2023-11-02 12:55:16 +01:00
366144146b WIT: it compiles but the processing of tasks and loading of snapshots is still not implemented 2023-11-02 10:47:54 +01:00
03b510945b ICE: use a git version of my crate so rust team can pull the repo 2023-10-31 17:59:59 +01:00
c3a8d4b7fb ICE 2023-10-31 17:39:10 +01:00
c573261ac4 WIP: start updating the zookeeper client => leader election is missing 2023-10-31 13:04:32 +01:00
8f04353b7d bump strois to a temporary version while we wait for rusty_s3 to do a new release again 2023-10-17 11:49:18 +02:00
b9983a48c7 No more create the S3 Bucket 2023-10-17 10:26:03 +02:00
b22f1260bf bump strois to a temporary version while we wait for rusty_s3 to do a new release 2023-10-13 11:04:12 +02:00
dfb84f80da bump strois version 2023-10-10 19:25:12 +02:00
98b67f217a move to our new S3 lib 2023-09-28 11:24:18 +02:00
6325cda74f bump charabia 2023-09-21 15:18:44 +02:00
c71ba72f73 fix build in release mode 2023-09-21 11:05:57 +02:00
ecd36b15f0 exposes all the s3 arguments 2023-09-13 18:17:56 +02:00
8a2e8a887f Load the latest snapshot when we start the engine 2023-09-12 18:08:24 +02:00
309c33a418 Fix again the dots 2023-09-12 17:55:01 +02:00
9b01506cee Move the load snapshot step into a function 2023-09-12 16:05:02 +02:00
f37fdceb15 Use slashes instead of dots for the s3 paths separators 2023-09-12 15:46:15 +02:00
f544cfa444 Remove tasks and content file on the s3 2023-09-12 15:19:45 +02:00
c158d03337 Fix internal error 2023-09-12 14:46:13 +02:00
b7109c0fd2 start a script to run everything 2023-09-12 11:34:59 +02:00
a53a0fdb77 Store content files into the S3 2023-09-11 18:17:22 +02:00
719fdd701b Fix and crash when the tasks path is unknown 2023-09-07 11:31:18 +02:00
01c13c98ac Mastering minio 2023-09-06 17:54:21 +02:00
5b89276fcc starts using s3 2023-09-05 19:25:09 +02:00
41697c4d65 Introduce the zk-tasks folder 2023-09-04 18:24:34 +02:00
7d85753573 Make the snapshot download work 2023-09-04 17:38:56 +02:00
76657af1f9 Add the options into the IndexScheduler 2023-09-04 16:38:05 +02:00
966cbdab69 make the tests compile again 2023-09-04 15:39:54 +02:00
0c68b9ed4c WIP making the final snapshot swap 2023-08-31 15:56:42 +02:00
d7233ecdb8 Make things to compile again 2023-08-31 14:55:14 +02:00
95a011af13 Wrap the IndexScheduler fields into an inner struct 2023-08-31 10:36:33 +02:00
e257710961 WIP fix the tests 2023-08-30 18:03:24 +02:00
9dd4423054 Fix the watcher ordering of the auth/ node 2023-08-30 17:51:22 +02:00
8c3ad57ef9 React to changes towards the cluster members 2023-08-30 17:40:12 +02:00
2d1434da81 Keep the ZK flow when enqueuing tasks 2023-08-30 17:15:15 +02:00
c488a4a351 Fixup a lot of small issues on the ZK config 2023-08-30 16:42:55 +02:00
0c7d7c68bc WIP moving to the sync zookeeper API 2023-08-30 15:06:12 +02:00
854745c670 wip: starts working on importing the snapshots 2023-08-16 18:41:05 +02:00
777eebb759 starts creating snapshot, the import is still missing 2023-08-10 15:00:25 +02:00
61ccfaf9bc wake up after registering a task 2023-08-10 09:39:39 +02:00
f0c4d36ff7 implement the deletion of tasks after processing a batch
add a lot of comments and logs
2023-08-10 09:36:43 +02:00
8c20d6e2fe fix the leader election 2023-08-09 17:23:13 +02:00
8e437ed76c Start leader election and task processing (WIP) 2023-08-09 16:52:38 +02:00
1191ec5939 fix the register task watcher 2023-08-08 13:18:55 +02:00
0d20d08daf fix a few warnings 2023-08-08 11:39:48 +02:00
b66bf049b5 Create a task on zookeeper side when a task is created locally 2023-08-07 17:02:51 +02:00
b2f36b9b97 Comment Meilisearch container by default 2023-08-07 17:02:00 +02:00
b311089435 Update zookeeper client 2023-08-07 14:20:01 +02:00
3d46e84d97 update docker compose as an example 2023-08-03 17:15:24 +02:00
ad7f8edff8 fix auto-synchronization with zk 2023-08-03 14:43:29 +02:00
5ce01bcb53 add logs 2023-08-03 13:59:05 +02:00
d5523cc6ac fix the tests 2023-08-03 12:28:08 +02:00
fe7a312ec6 Import the already existing api keys on startup 2023-08-03 12:25:32 +02:00
57dc4b148c implement the watcher for all kind of operations 2023-08-03 10:52:13 +02:00
a325ddfe6a Forward the key deletions to zookeeper 2023-08-03 10:36:49 +02:00
0cd81573b4 Forward the keys update to zookeeper 2023-08-03 10:22:34 +02:00
b0ff595f60 Event Listener: delete local key if deleted on ZK 2023-08-02 18:36:36 +02:00
3eb6f4b56f Create api keys 2023-08-02 16:52:45 +02:00
49f976c8d8 fix analitics compilation 2023-08-02 14:17:03 +02:00
84d56f3320 send the creation of api-key to zookeeper 2023-08-02 13:57:30 +02:00
97e3dfd99d makes zk available inside the auth-controller with config coming from the cli, it compiles 2023-08-02 13:17:40 +02:00
dc38da95c4 WIP 2023-08-02 12:00:02 +02:00
2ce8b42757 REMOVE: add docker compĂ´se for tests 2023-08-02 11:59:54 +02:00
34 changed files with 2024 additions and 869 deletions

411
Cargo.lock generated
View File

@ -121,7 +121,7 @@ dependencies = [
"futures-util",
"mio",
"num_cpus",
"socket2",
"socket2 0.4.9",
"tokio",
"tracing",
]
@ -202,7 +202,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"smallvec",
"socket2",
"socket2 0.4.9",
"time",
"url",
]
@ -303,6 +303,12 @@ dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "allocator-api2"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
[[package]]
name = "anes"
version = "0.1.6"
@ -664,6 +670,15 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "castaway"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a17ed5635fc8536268e5d4de1e22e81ac34419e5f052d4d51f4e01dcc263fcc"
dependencies = [
"rustversion",
]
[[package]]
name = "cc"
version = "1.0.79"
@ -700,9 +715,9 @@ dependencies = [
[[package]]
name = "charabia"
version = "0.8.2"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57aa1b4a8dda126c03ebf2f7e31d16cfc8781c2fe80dedd1a33459efc3e07578"
checksum = "a96ce20c4634609e2e3f082925c5926b127be35f42b37214654c0bc0974bd0d2"
dependencies = [
"aho-corasick",
"cow-utils",
@ -791,7 +806,7 @@ version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "191d9573962933b4027f932c600cd252ce27a8ad5979418fe78e43c07996f27b"
dependencies = [
"heck",
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.28",
@ -809,6 +824,17 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "compact_str"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e33b5c3ee2b4ffa00ac2b00d1645cd9229ade668139bccf95f15fadcf374127b"
dependencies = [
"castaway",
"itoa",
"ryu",
]
[[package]]
name = "concat-arrays"
version = "0.1.2"
@ -832,6 +858,26 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "const_format"
version = "0.2.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3a214c7af3d04997541b18d432afaff4c455e79e2029079647e72fc2bd27673"
dependencies = [
"const_format_proc_macros",
]
[[package]]
name = "const_format_proc_macros"
version = "0.2.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7f6ff08fd20f4f299298a28e2dfa8a8ba1036e6cd2460ac1de7b425d76f2500"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
@ -1219,14 +1265,14 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.3.3",
"uuid 1.4.1",
]
[[package]]
name = "either"
version = "1.8.1"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
dependencies = [
"serde",
]
@ -1418,7 +1464,7 @@ dependencies = [
"faux",
"tempfile",
"thiserror",
"uuid 1.3.3",
"uuid 1.4.1",
]
[[package]]
@ -1468,9 +1514,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652"
dependencies = [
"percent-encoding",
]
@ -1483,9 +1529,9 @@ checksum = "7ab85b9b05e3978cc9a9cf8fea7f01b494e1a09ed3037e16ba39edc7a29eb61a"
[[package]]
name = "futures"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
dependencies = [
"futures-channel",
"futures-core",
@ -1498,9 +1544,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
dependencies = [
"futures-core",
"futures-sink",
@ -1508,15 +1554,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
[[package]]
name = "futures-executor"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc"
dependencies = [
"futures-core",
"futures-task",
@ -1525,15 +1571,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa"
[[package]]
name = "futures-macro"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [
"proc-macro2",
"quote",
@ -1542,21 +1588,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817"
[[package]]
name = "futures-task"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2"
[[package]]
name = "futures-util"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
dependencies = [
"futures-channel",
"futures-core",
@ -1715,6 +1761,19 @@ name = "hashbrown"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
dependencies = [
"ahash 0.8.3",
"allocator-api2",
]
[[package]]
name = "hashlink"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7"
dependencies = [
"hashbrown 0.14.0",
]
[[package]]
name = "heapless"
@ -1729,6 +1788,15 @@ dependencies = [
"stable_deref_trait",
]
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "heck"
version = "0.4.1"
@ -1856,7 +1924,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"socket2 0.4.9",
"tokio",
"tower-service",
"tracing",
@ -1884,14 +1952,20 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "idna"
version = "0.3.0"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "ignore-result"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "665ff4dce8edd10d490641ccb78949832f1ddbff02c584fb1f85ab888fe0e50c"
[[package]]
name = "index-scheduler"
version = "1.3.0"
@ -1912,15 +1986,19 @@ dependencies = [
"meilisearch-types",
"nelson",
"page_size 0.5.0",
"parking_lot",
"puffin",
"roaring",
"serde",
"serde_json",
"strois",
"synchronoise",
"tempfile",
"thiserror",
"time",
"uuid 1.3.3",
"tokio",
"uuid 1.4.1",
"zookeeper-client-sync",
]
[[package]]
@ -2125,9 +2203,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.147"
version = "0.2.149"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b"
[[package]]
name = "libgit2-sys"
@ -2171,9 +2249,9 @@ dependencies = [
[[package]]
name = "lindera-cc-cedict-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2e8f2ca97ddf952fe340642511b9c14b373cb2eef711d526bb8ef2ca0969b8"
checksum = "6f567a47e47b5420908424de2c6c5e424e3cafe588d0146bd128c0f3755758a3"
dependencies = [
"anyhow",
"bincode",
@ -2190,9 +2268,9 @@ dependencies = [
[[package]]
name = "lindera-compress"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f72b460559bcbe8a9cee85ea4a5056133ed3abf373031191589236e656d65b59"
checksum = "49f3e553d55ebe9881fa5e5de588b0a153456e93564d17dfbef498912caf63a2"
dependencies = [
"anyhow",
"flate2",
@ -2201,9 +2279,9 @@ dependencies = [
[[package]]
name = "lindera-core"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f586eb8a9393c32d5525e0e9336a3727bd1329674740097126f3b0bff8a1a1ea"
checksum = "a9a2440cc156a4a911a174ec68203543d1efb10df3a700a59b6bf581e453c726"
dependencies = [
"anyhow",
"bincode",
@ -2218,9 +2296,9 @@ dependencies = [
[[package]]
name = "lindera-decompress"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb1facd8da698072fcc7338bd757730db53d59f313f44dd583fa03681dcc0e1"
checksum = "e077a410e61c962cb526f71b7effd62ffc607488a8f61869c937582d2ccb529b"
dependencies = [
"anyhow",
"flate2",
@ -2229,9 +2307,9 @@ dependencies = [
[[package]]
name = "lindera-dictionary"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec7be7410b1da7017a8948986b87af67082f605e9a716f0989790d795d677f0c"
checksum = "d9f57491adf7b311a3ee87f5e4a36454df16a2ec73de4ef28b2106fac80bd782"
dependencies = [
"anyhow",
"bincode",
@ -2249,9 +2327,9 @@ dependencies = [
[[package]]
name = "lindera-ipadic-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "705d07f8a45d04fd95149f7ad41a26d1f9e56c9c00402be6f9dd05e3d88b99c6"
checksum = "a3476ec7748aebd2eb23d496ddfce5e7e0a5c031cffcd214451043e02d029f11"
dependencies = [
"anyhow",
"bincode",
@ -2270,9 +2348,9 @@ dependencies = [
[[package]]
name = "lindera-ipadic-neologd-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633a93983ba13fba42328311a501091bd4a7aff0c94ae9eaa9d4733dd2b0468a"
checksum = "7b1c7576a02d5e4af2bf62de51790a01bc4b8bc0d0b6a6b86a46b157f5cb306d"
dependencies = [
"anyhow",
"bincode",
@ -2291,9 +2369,9 @@ dependencies = [
[[package]]
name = "lindera-ko-dic"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a428e0d316b6c86f51bd919479692bc41ad840dba266ebc044663970f431ea18"
checksum = "b713ecd5b827d7d448c3c5eb3c6d5899ecaf22cd17087599996349a02c76828d"
dependencies = [
"bincode",
"byteorder",
@ -2308,9 +2386,9 @@ dependencies = [
[[package]]
name = "lindera-ko-dic-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a5288704c6b8a069c0a1705c38758e836497698b50453373ab3d56c6f9a7ef8"
checksum = "3e545752f6487be87b572529ad594cb3b48d2ef20821516f598b2d152d23277b"
dependencies = [
"anyhow",
"bincode",
@ -2328,9 +2406,9 @@ dependencies = [
[[package]]
name = "lindera-tokenizer"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106ba439b2e87529d9bbedbb88d69f635baba1195c26502b308f55a85885fc81"
checksum = "24a2d4606a5a4da62ac4a3680ee884a75da7f0c892dc967fc9cb983ceba39a8f"
dependencies = [
"bincode",
"byteorder",
@ -2343,9 +2421,9 @@ dependencies = [
[[package]]
name = "lindera-unidic"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3399b6dcfe1701333451d184ff3c677f433b320153427b146360c9e4bd8cb816"
checksum = "388b1bdf81794b5d5b8057ce0321c58ff4b90d676b637948ccc7863ae2f43d28"
dependencies = [
"bincode",
"byteorder",
@ -2360,9 +2438,9 @@ dependencies = [
[[package]]
name = "lindera-unidic-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b698227fdaeac32289173ab389b990d4eb00a40cbc9912020f69a0c491dabf55"
checksum = "cdfa3e29a22c047da57fadd960ff674b720de15a1e2fb35b5ed67f3408afb469"
dependencies = [
"anyhow",
"bincode",
@ -2442,9 +2520,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.19"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "logging_timer"
@ -2492,6 +2570,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
]
[[package]]
name = "md5"
version = "0.7.0"
@ -2580,6 +2668,7 @@ dependencies = [
"siphasher",
"slice-group-by",
"static-files",
"strois",
"sysinfo",
"tar",
"temp-env",
@ -2591,11 +2680,12 @@ dependencies = [
"tokio-stream",
"toml",
"urlencoding",
"uuid 1.3.3",
"uuid 1.4.1",
"vergen",
"walkdir",
"yaup",
"zip",
"zookeeper-client-sync",
]
[[package]]
@ -2605,6 +2695,7 @@ dependencies = [
"base64 0.21.2",
"enum-iterator",
"hmac",
"log",
"maplit",
"meilisearch-types",
"rand",
@ -2614,7 +2705,8 @@ dependencies = [
"sha2",
"thiserror",
"time",
"uuid 1.3.3",
"uuid 1.4.1",
"zookeeper-client-sync",
]
[[package]]
@ -2644,7 +2736,7 @@ dependencies = [
"thiserror",
"time",
"tokio",
"uuid 1.3.3",
"uuid 1.4.1",
]
[[package]]
@ -2724,7 +2816,7 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.3.3",
"uuid 1.4.1",
]
[[package]]
@ -2864,6 +2956,27 @@ dependencies = [
"libc",
]
[[package]]
name = "num_enum"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "object"
version = "0.30.3"
@ -2998,9 +3111,9 @@ dependencies = [
[[package]]
name = "percent-encoding"
version = "2.2.0"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "permissive-json-pointer"
@ -3094,9 +3207,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.9"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
name = "pin-utils"
@ -3159,6 +3272,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro-crate"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919"
dependencies = [
"once_cell",
"toml_edit",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@ -3257,6 +3380,16 @@ dependencies = [
"puffin",
]
[[package]]
name = "quick-xml"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quote"
version = "1.0.30"
@ -3569,6 +3702,24 @@ version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06"
[[package]]
name = "rusty-s3"
version = "0.5.0"
source = "git+http://github.com/Kerollmops/rusty-s3?branch=kero/percent-decode-list-files#1efb1eee126222d52936cb72ce3287b9b6bf5ffa"
dependencies = [
"base64 0.21.2",
"hmac",
"md-5",
"percent-encoding",
"quick-xml",
"serde",
"serde_json",
"sha2",
"time",
"url",
"zeroize",
]
[[package]]
name = "ryu"
version = "1.0.13"
@ -3622,9 +3773,9 @@ checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
[[package]]
name = "serde"
version = "1.0.180"
version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed"
checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e"
dependencies = [
"serde_derive",
]
@ -3649,9 +3800,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.180"
version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036"
checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
@ -3809,6 +3960,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
dependencies = [
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "spin"
version = "0.5.2"
@ -3847,12 +4008,49 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "strois"
version = "0.0.4"
source = "git+https://github.com/meilisearch/strois?rev=eeb945c#eeb945cfb6c666935b0f06b1f410fd83182b2b67"
dependencies = [
"http",
"log",
"quick-xml",
"rusty-s3",
"serde",
"thiserror",
"ureq",
"url",
]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strum"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb"
dependencies = [
"strum_macros",
]
[[package]]
name = "strum_macros"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38"
dependencies = [
"heck 0.3.3",
"proc-macro2",
"quote",
"rustversion",
"syn 1.0.109",
]
[[package]]
name = "subtle"
version = "2.5.0"
@ -3961,18 +4159,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.44"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90"
checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.44"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96"
checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8"
dependencies = [
"proc-macro2",
"quote",
@ -4033,11 +4231,11 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.28.2"
version = "1.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2"
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
dependencies = [
"autocfg",
"backtrace",
"bytes",
"libc",
"mio",
@ -4045,7 +4243,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.5.5",
"tokio-macros",
"windows-sys 0.48.0",
]
@ -4241,19 +4439,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b11c96ac7ee530603dcdf68ed1557050f374ce55a5a07193ebf8cbc9f8927e9"
dependencies = [
"base64 0.21.2",
"flate2",
"log",
"once_cell",
"rustls 0.21.1",
"rustls-webpki",
"serde",
"serde_json",
"url",
"webpki-roots 0.23.1",
]
[[package]]
name = "url"
version = "2.3.1"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5"
dependencies = [
"form_urlencoded",
"idna",
@ -4289,9 +4490,9 @@ dependencies = [
[[package]]
name = "uuid"
version = "1.3.3"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
dependencies = [
"getrandom",
"serde",
@ -4712,6 +4913,12 @@ dependencies = [
"synstructure",
]
[[package]]
name = "zeroize"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
[[package]]
name = "zip"
version = "0.6.6"
@ -4732,6 +4939,40 @@ dependencies = [
"zstd 0.11.2+zstd.1.5.2",
]
[[package]]
name = "zookeeper-client"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d39d162869ce27d6b4b548f39c93db8de67860def306a818726bae1e87a0c942"
dependencies = [
"bytes",
"compact_str",
"const_format",
"either",
"hashbrown 0.12.3",
"hashlink",
"ignore-result",
"log",
"num_enum",
"static_assertions",
"strum",
"thiserror",
"tokio",
"uuid 1.4.1",
]
[[package]]
name = "zookeeper-client-sync"
version = "0.1.0"
source = "git+https://github.com/meilisearch/zookeeper-client-sync.git#ff10407b7b2ef34b727ce4291729f3b8bd676f30"
dependencies = [
"futures",
"once_cell",
"thiserror",
"tokio",
"zookeeper-client",
]
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"

View File

@ -37,23 +37,5 @@ opt-level = 3
[profile.dev.package.roaring]
opt-level = 3
[profile.dev.package.lindera-ipadic-builder]
opt-level = 3
[profile.dev.package.encoding]
opt-level = 3
[profile.dev.package.yada]
opt-level = 3
[profile.release.package.lindera-ipadic-builder]
opt-level = 3
[profile.release.package.encoding]
opt-level = 3
[profile.release.package.yada]
opt-level = 3
[profile.bench.package.lindera-ipadic-builder]
opt-level = 3
[profile.bench.package.encoding]
opt-level = 3
[profile.bench.package.yada]
opt-level = 3
[patch.crates-io]
strois = { git = "https://github.com/meilisearch/strois", rev = "eeb945c" }

59
docker-compose.yml Normal file
View File

@ -0,0 +1,59 @@
version: "3.9"
services:
zk1:
container_name: zk1
hostname: zk1
image: bitnami/zookeeper:3.7.1
ports:
- 21811:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOO_SERVER_ID=1
- ZOO_SERVERS=0.0.0.0:2888:3888,zk2:2888:3888,zk3:2888:3888
zk2:
container_name: zk2
hostname: zk2
image: bitnami/zookeeper:3.7.1
ports:
- 21812:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOO_SERVER_ID=2
- ZOO_SERVERS=zk1:2888:3888,0.0.0.0:2888:3888,zk3:2888:3888
zk3:
container_name: zk3
hostname: zk3
image: bitnami/zookeeper:3.7.1
ports:
- 21813:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOO_SERVER_ID=3
- ZOO_SERVERS=zk1:2888:3888,zk2:2888:3888,0.0.0.0:2888:3888
zoonavigator:
container_name: zoonavigator
image: elkozmon/zoonavigator
ports:
- 9000:9000
# Meilisearch instances
# m1:
# container_name: m1
# hostname: m1
# image: getmeili/meilisearch:prototype-zookeeper-ha-0
# ports:
# - 7700:7700
# environment:
# - MEILI_ZK_URL=zk1:2181
# - MEILI_MASTER_KEY=masterkey
# restart: always
# m2:
# container_name: m2
# hostname: m2
# image: getmeili/meilisearch:prototype-zookeeper-ha-0
# ports:
# - 7701:7700
# environment:
# - MEILI_ZK_URL=zk2:2181
# - MEILI_MASTER_KEY=masterkey
# restart: always

View File

@ -22,20 +22,6 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>;
impl Deref for File {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
#[derive(Clone, Debug)]
pub struct FileStore {
path: PathBuf,
@ -146,6 +132,20 @@ impl File {
}
}
impl Deref for File {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
#[cfg(test)]
mod test {
use std::io::Write;

61
ha_test/run.sh Normal file
View File

@ -0,0 +1,61 @@
#!/bin/bash
function is_everything_installed {
everything_ok=yes
if hash zkli 2>/dev/null; then
echo "âś… zkli installed"
else
everything_ok=no
echo "🥺 zkli is missing, please run \`cargo install zkli\`"
fi
if hash s3cmd 2>/dev/null; then
echo "âś… s3cmd installed"
else
everything_ok=no
echo "🥺 s3cmd is missing, see how to install it here https://s3tools.org/s3cmd"
fi
if [ $everything_ok = "no" ]; then
echo "Exiting..."
exit 1
fi
}
# param: addr of zookeeper
function connect_to_zookeeper {
if ! zkli -a "$1" ls > /dev/null; then
echo "zkli can't connect"
return 1
fi
}
# param: addr of the s3 bucket
function connect_to_s3 {
# S3_SECRET_KEY
# S3_ACCESS_KEY
# S3_HOST
# S3_BUCKET
s3cmd --host="$S3_HOST" --host-bucket="$S3_BUCKET" --access_key="$ACCESS_KEY" --secret_key="$S3_SECRET_KEY" ls
if $?; then
echo "s3cmd can't connect"
return 1
fi
}
is_everything_installed
ZOOKEEPER_ADDR="localhost:2181"
if ! connect_to_zookeeper $ZOOKEEPER_ADDR; then
ZOOKEEPER_ADDR="localhost:21811"
if ! connect_to_zookeeper $ZOOKEEPER_ADDR; then
echo "Can't connect to zkli"
exit 1
fi
fi
connect_to_s3

View File

@ -31,6 +31,10 @@ tempfile = "3.5.0"
thiserror = "1.0.40"
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
uuid = { version = "1.3.1", features = ["serde", "v4"] }
tokio = { version = "1.27.0", features = ["full"] }
zookeeper-client-sync = { git = "https://github.com/meilisearch/zookeeper-client-sync.git" }
parking_lot = "0.12.1"
strois = "0.0.4"
[dev-dependencies]
big_s = "1.0.2"

View File

@ -43,7 +43,7 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
use crate::{Error, IndexSchedulerInner, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///
@ -198,6 +198,35 @@ impl Batch {
| IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid),
}
}
/// Return the content fields uuids associated with this batch.
pub fn content_uuids(&self) -> Vec<Uuid> {
match self {
Batch::TaskCancelation { .. }
| Batch::TaskDeletion(_)
| Batch::Dump(_)
| Batch::IndexCreation { .. }
| Batch::IndexDocumentDeletionByFilter { .. }
| Batch::IndexUpdate { .. }
| Batch::SnapshotCreation(_)
| Batch::IndexDeletion { .. }
| Batch::IndexSwap { .. } => vec![],
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { operations, .. } => operations
.iter()
.flat_map(|op| match op {
DocumentOperation::Add(uuid) => Some(*uuid),
DocumentOperation::Delete(_) => None,
})
.collect(),
IndexOperation::DocumentDeletion { .. }
| IndexOperation::Settings { .. }
| IndexOperation::DocumentClear { .. }
| IndexOperation::SettingsAndDocumentOperation { .. }
| IndexOperation::DocumentClearAndSetting { .. } => vec![],
},
}
}
}
impl IndexOperation {
@ -213,7 +242,7 @@ impl IndexOperation {
}
}
impl IndexScheduler {
impl IndexSchedulerInner {
/// Convert an [`BatchKind`](crate::autobatcher::BatchKind) into a [`Batch`].
///
/// ## Arguments
@ -480,8 +509,7 @@ impl IndexScheduler {
if let Some(task_id) = to_cancel.max() {
// We retrieve the tasks that were processing before this tasks cancelation started.
// We must *not* reset the processing tasks before calling this method.
let ProcessingTasks { started_at, processing } =
&*self.processing_tasks.read().unwrap();
let ProcessingTasks { started_at, processing, .. } = &*self.processing_tasks.read();
return Ok(Some(Batch::TaskCancelation {
task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?,
previous_started_at: *started_at,
@ -1392,7 +1420,7 @@ impl IndexScheduler {
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> {
// 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
let processing_tasks = &self.processing_tasks.read().processing.clone();
let all_task_ids = self.all_task_ids(wtxn)?;
let mut to_delete_tasks = all_task_ids & matched_tasks;

View File

@ -295,6 +295,11 @@ impl IndexMap {
"Attempt to finish deletion of an index that was being closed"
);
}
/// Returns the indexes that were opened by the `IndexMap`.
pub fn clear(&mut self) -> Vec<Index> {
self.available.clear().into_iter().map(|(_, (_, index))| index).collect()
}
}
/// Create or open an index in the specified path.
@ -335,7 +340,8 @@ mod tests {
impl IndexMapper {
fn test() -> (Self, Env, IndexSchedulerHandle) {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
(index_scheduler.index_mapper, index_scheduler.env, handle)
let index_scheduler = index_scheduler.inner();
(index_scheduler.index_mapper.clone(), index_scheduler.env.clone(), handle)
}
}

View File

@ -61,7 +61,7 @@ pub struct IndexMapper {
pub(crate) index_stats: Database<UuidCodec, SerdeJson<IndexStats>>,
/// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf,
pub(crate) base_path: PathBuf,
/// The map size an index is opened with on the first time.
index_base_map_size: usize,
/// The quantity by which the map size of an index is incremented upon reopening, in bytes.
@ -135,7 +135,7 @@ impl IndexMapper {
index_growth_amount: usize,
index_count: usize,
enable_mdb_writemap: bool,
indexer_config: IndexerConfig,
indexer_config: Arc<IndexerConfig>,
) -> Result<Self> {
let mut wtxn = env.write_txn()?;
let index_mapping = env.create_database(&mut wtxn, Some(INDEX_MAPPING))?;
@ -150,7 +150,7 @@ impl IndexMapper {
index_base_map_size,
index_growth_amount,
enable_mdb_writemap,
indexer_config: Arc::new(indexer_config),
indexer_config,
})
}
@ -428,6 +428,11 @@ impl IndexMapper {
Ok(())
}
/// Returns the indexes that were opened by the `IndexMapper`.
pub fn clear(&mut self) -> Vec<Index> {
self.index_map.write().unwrap().clear()
}
/// The stats of an index.
///
/// If available in the cache, they are directly returned.

View File

@ -1,5 +1,6 @@
use std::collections::BTreeSet;
use std::fmt::Write;
use std::ops::Deref;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn};
@ -8,12 +9,13 @@ use meilisearch_types::tasks::{Details, Task};
use roaring::RoaringBitmap;
use crate::index_mapper::IndexMapper;
use crate::{IndexScheduler, Kind, Status, BEI128};
use crate::{IndexScheduler, IndexSchedulerInner, Kind, Status, BEI128};
pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
scheduler.assert_internally_consistent();
let IndexScheduler {
let inner = scheduler.inner();
let IndexSchedulerInner {
autobatching_enabled,
must_stop_processing: _,
processing_tasks,
@ -38,13 +40,15 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,
} = scheduler;
zookeeper: _,
options: _,
} = inner.deref();
let rtxn = env.read_txn().unwrap();
let mut snap = String::new();
let processing_tasks = processing_tasks.read().unwrap().processing.clone();
let processing_tasks = processing_tasks.read().processing.clone();
snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n"));
snap.push_str("### Processing Tasks:\n");
snap.push_str(&snapshot_bitmap(&processing_tasks));

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
//! Thread-safe `Vec`-backend LRU cache using [`std::sync::atomic::AtomicU64`] for synchronization.
use std::mem;
use std::sync::atomic::{AtomicU64, Ordering};
/// Thread-safe `Vec`-backend LRU cache
@ -190,6 +191,11 @@ where
}
None
}
/// Returns the generation associated to the key and values of the `LruMap`.
pub fn clear(&mut self) -> Vec<(AtomicU64, (K, V))> {
mem::take(&mut self.0.data)
}
}
/// The result of an insertion in a LRU map.

View File

@ -10,9 +10,9 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status
use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime;
use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
use crate::{Error, IndexSchedulerInner, Result, Task, TaskId, BEI128};
impl IndexScheduler {
impl IndexSchedulerInner {
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
enum_iterator::all().map(|s| self.get_status(rtxn, s)).union()
}
@ -331,11 +331,12 @@ pub fn clamp_to_page_size(size: usize) -> usize {
}
#[cfg(test)]
impl IndexScheduler {
impl crate::IndexScheduler {
/// Asserts that the index scheduler's content is internally consistent.
pub fn assert_internally_consistent(&self) {
let rtxn = self.env.read_txn().unwrap();
for task in self.all_tasks.iter(&rtxn).unwrap() {
let this = self.inner();
let rtxn = this.env.read_txn().unwrap();
for task in this.all_tasks.iter(&rtxn).unwrap() {
let (task_id, task) = task.unwrap();
let task_id = task_id.get();
@ -354,21 +355,21 @@ impl IndexScheduler {
} = task;
assert_eq!(uid, task.uid);
if let Some(task_index_uid) = &task_index_uid {
assert!(self
assert!(this
.index_tasks
.get(&rtxn, task_index_uid.as_str())
.unwrap()
.unwrap()
.contains(task.uid));
}
let db_enqueued_at = self
let db_enqueued_at = this
.enqueued_at
.get(&rtxn, &BEI128::new(enqueued_at.unix_timestamp_nanos()))
.unwrap()
.unwrap();
assert!(db_enqueued_at.contains(task_id));
if let Some(started_at) = started_at {
let db_started_at = self
let db_started_at = this
.started_at
.get(&rtxn, &BEI128::new(started_at.unix_timestamp_nanos()))
.unwrap()
@ -376,7 +377,7 @@ impl IndexScheduler {
assert!(db_started_at.contains(task_id));
}
if let Some(finished_at) = finished_at {
let db_finished_at = self
let db_finished_at = this
.finished_at
.get(&rtxn, &BEI128::new(finished_at.unix_timestamp_nanos()))
.unwrap()
@ -384,9 +385,9 @@ impl IndexScheduler {
assert!(db_finished_at.contains(task_id));
}
if let Some(canceled_by) = canceled_by {
let db_canceled_tasks = self.get_status(&rtxn, Status::Canceled).unwrap();
let db_canceled_tasks = this.get_status(&rtxn, Status::Canceled).unwrap();
assert!(db_canceled_tasks.contains(uid));
let db_canceling_task = self.get_task(&rtxn, canceled_by).unwrap().unwrap();
let db_canceling_task = this.get_task(&rtxn, canceled_by).unwrap().unwrap();
assert_eq!(db_canceling_task.status, Status::Succeeded);
match db_canceling_task.kind {
KindWithContent::TaskCancelation { query: _, tasks } => {
@ -427,7 +428,7 @@ impl IndexScheduler {
Details::IndexInfo { primary_key: pk1 } => match &kind {
KindWithContent::IndexCreation { index_uid, primary_key: pk2 }
| KindWithContent::IndexUpdate { index_uid, primary_key: pk2 } => {
self.index_tasks
this.index_tasks
.get(&rtxn, index_uid.as_str())
.unwrap()
.unwrap()
@ -535,23 +536,23 @@ impl IndexScheduler {
}
}
assert!(self.get_status(&rtxn, status).unwrap().contains(uid));
assert!(self.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid));
assert!(this.get_status(&rtxn, status).unwrap().contains(uid));
assert!(this.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid));
if let KindWithContent::DocumentAdditionOrUpdate { content_file, .. } = kind {
match status {
Status::Enqueued | Status::Processing => {
assert!(self
assert!(this
.file_store
.all_uuids()
.unwrap()
.any(|uuid| uuid.as_ref().unwrap() == &content_file),
"Could not find uuid `{content_file}` in the file_store. Available uuids are {:?}.",
self.file_store.all_uuids().unwrap().collect::<std::result::Result<Vec<_>, file_store::Error>>().unwrap(),
this.file_store.all_uuids().unwrap().collect::<std::result::Result<Vec<_>, file_store::Error>>().unwrap(),
);
}
Status::Succeeded | Status::Failed | Status::Canceled => {
assert!(self
assert!(this
.file_store
.all_uuids()
.unwrap()

View File

@ -14,6 +14,7 @@ license.workspace = true
base64 = "0.21.0"
enum-iterator = "1.4.0"
hmac = "0.12.1"
log = "0.4.19"
maplit = "1.0.2"
meilisearch-types = { path = "../meilisearch-types" }
rand = "0.8.5"
@ -24,3 +25,4 @@ sha2 = "0.10.6"
thiserror = "1.0.40"
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
uuid = { version = "1.3.1", features = ["serde", "v4"] }
zookeeper-client-sync = { git = "https://github.com/meilisearch/zookeeper-client-sync.git" }

View File

@ -19,6 +19,7 @@ internal_error!(
AuthControllerError: meilisearch_types::milli::heed::Error,
std::io::Error,
serde_json::Error,
zookeeper_client_sync::Error,
std::str::Utf8Error
);

View File

@ -16,22 +16,111 @@ pub use store::open_auth_store_env;
use store::{generate_key_as_hexa, HeedAuthStore};
use time::OffsetDateTime;
use uuid::Uuid;
use zookeeper_client_sync::{
Acls, AddWatchMode, CreateMode, Error as ZkError, EventType, WatchedEvent, Zookeeper,
};
#[derive(Clone)]
pub struct AuthController {
store: Arc<HeedAuthStore>,
master_key: Option<String>,
zookeeper: Option<Arc<Zookeeper>>,
}
impl AuthController {
pub fn new(db_path: impl AsRef<Path>, master_key: &Option<String>) -> Result<Self> {
pub fn new(
db_path: impl AsRef<Path>,
master_key: &Option<String>,
zookeeper: Option<Arc<Zookeeper>>,
) -> Result<Self> {
let store = HeedAuthStore::new(db_path)?;
let controller = Self { store: Arc::new(store), master_key: master_key.clone(), zookeeper };
if store.is_empty()? {
generate_default_keys(&store)?;
match controller.zookeeper {
// setup the auth zk environment, the `auth` node
Some(ref zookeeper) => {
// Zookeeper Event listener loop
let controller_clone = controller.clone();
let zkk = zookeeper.clone();
let watcher = zookeeper.watch("/auth", AddWatchMode::PersistentRecursive)?;
watcher.run_on_change(move |event| {
let WatchedEvent { event_type, path, .. } = dbg!(event);
match event_type {
EventType::NodeDeleted => {
// TODO: ugly unwraps
let uuid = path.strip_prefix("/auth/").unwrap();
let uuid = Uuid::parse_str(uuid).unwrap();
log::info!("The key {} has been deleted", uuid);
controller_clone.store.delete_api_key(uuid).unwrap();
}
EventType::NodeCreated | EventType::NodeDataChanged => {
if path.strip_prefix("/auth/").map_or(false, |s| !s.is_empty()) {
let (key, _stat) = zkk.get_data(&path).unwrap();
let key: Key = serde_json::from_slice(&key).unwrap();
log::info!("The key {} has been inserted", key.uid);
controller_clone.store.put_api_key(key).unwrap();
}
}
otherwise => panic!("Got the unexpected `{otherwise:?}` event!"),
}
});
// TODO: we should catch the potential unexpected errors here https://docs.rs/zookeeper-client/latest/zookeeper_client/struct.Client.html#method.create
// for the moment we consider that `create` only returns Error::NodeExists.
match zookeeper.create(
"/auth",
&[],
&CreateMode::Persistent.with_acls(Acls::anyone_all()),
) {
// If the store is empty, we must generate and push the default api-keys.
Ok(_) => generate_default_keys(&controller)?,
// If the node exist we should clear our DB and download all the existing api-keys
Err(ZkError::NodeExists) => {
log::warn!("Auth directory already exists, we need to clear our keys + import the one in zookeeper");
let store = controller.store.clone();
store.delete_all_keys()?;
let (children, _) = zookeeper
.get_children("/auth")
.expect("Internal, the auth directory was deleted during execution.");
log::info!("Importing {} api-keys", children.len());
for path in children {
log::info!(" Importing {}", path);
match zookeeper.get_data(&format!("/auth/{}", &path)) {
Ok((key, _stat)) => {
let key = serde_json::from_slice(&key).unwrap();
let store = controller.store.clone();
store.put_api_key(key)?;
}
Err(e) => panic!("{e}"),
}
// else the file was deleted while we were inserting the key. We ignore it.
// TODO: What happens if someone updates the files before we have the time
// to setup the watcher
}
}
e @ Err(
ZkError::NoNode | ZkError::NoChildrenForEphemerals | ZkError::InvalidAcl,
) => unreachable!("{e:?}"),
Err(e) => panic!("{e}"),
}
// TODO: Race condition above:
// What happens if two node join exactly at the same moment:
// One will create the dir
// The second one will delete its DB, load nothing and install a watcher
// The first one will push its keys and should wake up and update the second one.
// / BUT, if the second one delete its DB and the first one push its files before the second one install the watcher we're fucked
}
None => {
if controller.store.is_empty()? {
generate_default_keys(&controller)?;
}
}
}
Ok(Self { store: Arc::new(store), master_key: master_key.clone() })
Ok(controller)
}
/// Return `Ok(())` if the auth controller is able to access one of its database.
@ -53,7 +142,23 @@ impl AuthController {
pub fn create_key(&self, create_key: CreateApiKey) -> Result<Key> {
match self.store.get_api_key(create_key.uid)? {
Some(_) => Err(AuthControllerError::ApiKeyAlreadyExists(create_key.uid.to_string())),
None => self.store.put_api_key(create_key.to_key()),
None => self.put_key(create_key.to_key()),
}
}
pub fn put_key(&self, key: Key) -> Result<Key> {
let store = self.store.clone();
match &self.zookeeper {
Some(zookeeper) => {
zookeeper.create(
&format!("/auth/{}", key.uid),
&serde_json::to_vec_pretty(&key)?,
&CreateMode::Persistent.with_acls(Acls::anyone_all()),
)?;
Ok(key)
}
None => store.put_api_key(key),
}
}
@ -68,7 +173,20 @@ impl AuthController {
name => key.name = name.set(),
};
key.updated_at = OffsetDateTime::now_utc();
self.store.put_api_key(key)
let store = self.store.clone();
// TODO: we may commit only after zk persisted the keys
match &self.zookeeper {
Some(zookeeper) => {
zookeeper.set_data(
&format!("/auth/{}", key.uid),
&serde_json::to_vec_pretty(&key)?,
None,
)?;
Ok(key)
}
None => store.put_api_key(key),
}
}
pub fn get_key(&self, uid: Uuid) -> Result<Key> {
@ -110,7 +228,19 @@ impl AuthController {
}
pub fn delete_key(&self, uid: Uuid) -> Result<()> {
if self.store.delete_api_key(uid)? {
let deleted = match &self.zookeeper {
Some(zookeeper) => match zookeeper.delete(&format!("/auth/{}", uid), None) {
Ok(()) => true,
Err(ZkError::NoNode) => false,
Err(e) => return Err(e.into()),
},
None => {
let store = self.store.clone();
store.delete_api_key(uid)?
}
};
if deleted {
Ok(())
} else {
Err(AuthControllerError::ApiKeyNotFound(uid.to_string()))
@ -159,7 +289,7 @@ impl AuthController {
self.store.delete_all_keys()
}
/// Delete all the keys in the DB.
/// Insert a key in the DB without any check on its validity
pub fn raw_insert_key(&mut self, key: Key) -> Result<()> {
self.store.put_api_key(key)?;
Ok(())
@ -304,10 +434,9 @@ pub struct IndexSearchRules {
pub filter: Option<serde_json::Value>,
}
fn generate_default_keys(store: &HeedAuthStore) -> Result<()> {
store.put_api_key(Key::default_admin())?;
store.put_api_key(Key::default_search())?;
fn generate_default_keys(controller: &AuthController) -> Result<()> {
controller.put_key(Key::default_admin())?;
controller.put_key(Key::default_search())?;
Ok(())
}

View File

@ -1,4 +1,3 @@
use std::borrow::Borrow;
use std::fmt::{self, Debug, Display};
use std::fs::File;
use std::io::{self, Seek, Write};
@ -42,7 +41,7 @@ impl Display for DocumentFormatError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(e) => write!(f, "{e}"),
Self::MalformedPayload(me, b) => match me.borrow() {
Self::MalformedPayload(me, b) => match me {
Error::Json(se) => {
let mut message = match se.classify() {
Category::Data => {

View File

@ -175,6 +175,7 @@ macro_rules! make_error_codes {
// An exhaustive list of all the error codes used by meilisearch.
make_error_codes! {
S3Error , System , INTERNAL_SERVER_ERROR;
ApiKeyAlreadyExists , InvalidRequest , CONFLICT ;
ApiKeyNotFound , InvalidRequest , NOT_FOUND ;
BadParameter , InvalidRequest , BAD_REQUEST;

View File

@ -80,6 +80,7 @@ reqwest = { version = "0.11.16", features = [
], default-features = false }
rustls = "0.20.8"
rustls-pemfile = "1.0.2"
strois = "0.0.4"
segment = { version = "0.2.2", optional = true }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = { version = "1.0.95", features = ["preserve_order"] }
@ -105,6 +106,7 @@ walkdir = "2.3.3"
yaup = "0.2.1"
serde_urlencoded = "0.7.1"
termcolor = "1.2.0"
zookeeper-client-sync = { git = "https://github.com/meilisearch/zookeeper-client-sync.git" }
[dev-dependencies]
actix-rt = "2.8.0"

View File

@ -312,6 +312,13 @@ impl From<Opt> for Infos {
config_file_path,
#[cfg(all(not(debug_assertions), feature = "analytics"))]
no_analytics: _,
zk_url: _,
s3_url: _,
s3_region: _,
s3_bucket: _,
s3_access_key: _,
s3_secret_key: _,
s3_security_token: _,
} = options;
let schedule_snapshot = match schedule_snapshot {

View File

@ -33,6 +33,8 @@ pub enum MeilisearchHttpError {
.0.iter().map(|uid| format!("\"{uid}\"")).collect::<Vec<_>>().join(", "), .0.len()
)]
SwapIndexPayloadWrongLength(Vec<IndexUid>),
#[error("S3 Error: {0}")]
S3Error(#[from] strois::Error),
#[error(transparent)]
IndexUid(#[from] IndexUidFormatError),
#[error(transparent)]
@ -65,6 +67,7 @@ impl ErrorCode for MeilisearchHttpError {
MeilisearchHttpError::InvalidExpression(_, _) => Code::InvalidSearchFilter,
MeilisearchHttpError::PayloadTooLarge(_) => Code::PayloadTooLarge,
MeilisearchHttpError::SwapIndexPayloadWrongLength(_) => Code::InvalidSwapIndexes,
MeilisearchHttpError::S3Error(_) => Code::S3Error,
MeilisearchHttpError::IndexUid(e) => e.error_code(),
MeilisearchHttpError::SerdeJson(_) => Code::Internal,
MeilisearchHttpError::HeedError(_) => Code::Internal,

View File

@ -39,6 +39,8 @@ use meilisearch_types::versioning::{check_version_file, create_version_file};
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
pub use option::Opt;
use option::ScheduleSnapshot;
use strois::Bucket;
use zookeeper_client_sync::Zookeeper;
use crate::error::MeilisearchHttpError;
@ -136,14 +138,17 @@ enum OnFailure {
KeepDb,
}
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
pub fn setup_meilisearch(
opt: &Opt,
zookeeper: Option<Arc<Zookeeper>>,
) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
let empty_db = is_empty_db(&opt.db_path);
let (index_scheduler, auth_controller) = if let Some(ref snapshot_path) = opt.import_snapshot {
let snapshot_path_exists = snapshot_path.exists();
// the db is empty and the snapshot exists, import it
if empty_db && snapshot_path_exists {
match compression::from_tar_gz(snapshot_path, &opt.db_path) {
Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?,
Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb, zookeeper)?,
Err(e) => {
std::fs::remove_dir_all(&opt.db_path)?;
return Err(e);
@ -160,14 +165,14 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
bail!("snapshot doesn't exist at {}", snapshot_path.display())
// the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag
} else {
open_or_create_database(opt, empty_db)?
open_or_create_database(opt, empty_db, zookeeper)?
}
} else if let Some(ref path) = opt.import_dump {
let src_path_exists = path.exists();
// the db is empty and the dump exists, import it
if empty_db && src_path_exists {
let (mut index_scheduler, mut auth_controller) =
open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?;
open_or_create_database_unchecked(opt, OnFailure::RemoveDb, zookeeper)?;
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
Ok(()) => (index_scheduler, auth_controller),
Err(e) => {
@ -187,10 +192,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
// the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag
// or, the dump is missing but we can ignore that because of the ignore_missing_dump flag
} else {
open_or_create_database(opt, empty_db)?
open_or_create_database(opt, empty_db, zookeeper)?
}
} else {
open_or_create_database(opt, empty_db)?
open_or_create_database(opt, empty_db, zookeeper)?
};
// We create a loop in a thread that registers snapshotCreation tasks
@ -199,15 +204,12 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
if let ScheduleSnapshot::Enabled(snapshot_delay) = opt.schedule_snapshot {
let snapshot_delay = Duration::from_secs(snapshot_delay);
let index_scheduler = index_scheduler.clone();
thread::Builder::new()
.name(String::from("register-snapshot-tasks"))
.spawn(move || loop {
thread::sleep(snapshot_delay);
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
error!("Error while registering snapshot: {}", e);
}
})
.unwrap();
thread::spawn(move || loop {
thread::sleep(snapshot_delay);
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
error!("Error while registering snapshot: {}", e);
}
});
}
Ok((index_scheduler, auth_controller))
@ -217,34 +219,48 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
fn open_or_create_database_unchecked(
opt: &Opt,
on_failure: OnFailure,
zookeeper: Option<Arc<Zookeeper>>,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
// we don't want to create anything in the data.ms yet, thus we
// wrap our two builders in a closure that'll be executed later.
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key);
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zookeeper.clone());
let instance_features = opt.to_instance_features();
let index_scheduler_builder = || -> anyhow::Result<_> {
Ok(IndexScheduler::new(IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
auth_path: opt.db_path.join("auth"),
tasks_path: opt.db_path.join("tasks"),
update_file_path: opt.db_path.join("update_files"),
indexes_path: opt.db_path.join("indexes"),
snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize,
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
max_number_of_tasks: 1_000_000,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,
instance_features,
})?)
};
let index_scheduler = IndexScheduler::new(Arc::new(IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
auth_path: opt.db_path.join("auth"),
tasks_path: opt.db_path.join("tasks"),
update_file_path: opt.db_path.join("update_files"),
indexes_path: opt.db_path.join("indexes"),
snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize,
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: (&opt.indexer_options).try_into().map(Arc::new)?,
autobatching_enabled: true,
max_number_of_tasks: 1_000_000,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,
instance_features,
zookeeper: zookeeper.clone(),
s3: opt.s3_url.as_ref().map(|url| {
Arc::new(
Bucket::builder(url)
.unwrap()
.key(opt.s3_access_key.as_ref().expect("Need s3 key to work").clone())
.secret(opt.s3_secret_key.as_ref().expect("Need s3 secret to work").clone())
.maybe_token(opt.s3_security_token.clone())
.region(&opt.s3_region)
.with_url_path_style(true)
.bucket(opt.s3_bucket.as_ref().expect("Need an s3 bucket to work"))
.unwrap(),
)
}),
}))
.map_err(anyhow::Error::from);
match (
index_scheduler_builder(),
index_scheduler,
auth_controller.map_err(anyhow::Error::from),
create_version_file(&opt.db_path).map_err(anyhow::Error::from),
) {
@ -262,12 +278,13 @@ fn open_or_create_database_unchecked(
fn open_or_create_database(
opt: &Opt,
empty_db: bool,
zookeeper: Option<Arc<Zookeeper>>,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
if !empty_db {
check_version_file(&opt.db_path)?;
}
open_or_create_database_unchecked(opt, OnFailure::KeepDb)
open_or_create_database_unchecked(opt, OnFailure::KeepDb, zookeeper)
}
fn import_dump(
@ -277,6 +294,7 @@ fn import_dump(
auth: &mut AuthController,
) -> Result<(), anyhow::Error> {
let reader = File::open(dump_path)?;
let index_scheduler = index_scheduler.inner();
let mut dump_reader = dump::DumpReader::open(reader)?;
if let Some(date) = dump_reader.date() {

View File

@ -12,6 +12,7 @@ use meilisearch::analytics::Analytics;
use meilisearch::{analytics, create_app, prototype_name, setup_meilisearch, Opt};
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
use zookeeper_client_sync::Zookeeper;
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
@ -63,7 +64,13 @@ async fn main() -> anyhow::Result<()> {
_ => (),
}
let (index_scheduler, auth_controller) = setup_meilisearch(&opt)?;
let zookeeper = match opt.zk_url {
Some(ref url) => Some(Arc::new(Zookeeper::connect(url).await.unwrap())),
None => None,
};
let optc = opt.clone();
let (index_scheduler, auth_controller) =
tokio::task::spawn_blocking(move || setup_meilisearch(&optc, zookeeper)).await.unwrap()?;
#[cfg(all(not(debug_assertions), feature = "analytics"))]
let analytics = if !opt.no_analytics {

View File

@ -28,6 +28,13 @@ const MEILI_DB_PATH: &str = "MEILI_DB_PATH";
const MEILI_HTTP_ADDR: &str = "MEILI_HTTP_ADDR";
const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY";
const MEILI_ENV: &str = "MEILI_ENV";
const MEILI_ZK_URL: &str = "MEILI_ZK_URL";
const MEILI_S3_URL: &str = "MEILI_S3_URL";
const MEILI_S3_BUCKET: &str = "MEILI_S3_BUCKET";
const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
const MEILI_S3_SECURITY_TOKEN: &str = "MEILI_S3_SECURITY_TOKEN";
const MEILI_S3_REGION: &str = "MEILI_S3_REGION";
#[cfg(all(not(debug_assertions), feature = "analytics"))]
const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS";
const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT";
@ -56,6 +63,7 @@ const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms";
const DEFAULT_HTTP_ADDR: &str = "localhost:7700";
const DEFAULT_ENV: &str = "development";
const DEFAULT_S3_REGION: &str = "eu-central-1";
const DEFAULT_HTTP_PAYLOAD_SIZE_LIMIT: &str = "100 MB";
const DEFAULT_SNAPSHOT_DIR: &str = "snapshots/";
const DEFAULT_SNAPSHOT_INTERVAL_SEC: u64 = 86400;
@ -154,6 +162,36 @@ pub struct Opt {
#[serde(default = "default_env")]
pub env: String,
/// Sets the HTTP address and port used to communicate with the zookeeper cluster.
/// If ran locally, the default url is `http://localhost:2181/`.
#[clap(long, env = MEILI_ZK_URL)]
pub zk_url: Option<String>,
/// Sets the address and port used to communicate with the S3 bucket.
#[clap(long, env = MEILI_S3_URL)]
pub s3_url: Option<String>,
/// Sets the region used to communicate with the s3 bucket.
#[clap(long, env = MEILI_S3_REGION, default_value_t = default_s3_region())]
#[serde(default = "default_s3_region")]
pub s3_region: String,
/// Sets the S3 bucket name to use.
#[clap(long, env = MEILI_S3_BUCKET)]
pub s3_bucket: Option<String>,
/// Set the S3 access key. If used you must also set the secret key.
#[clap(long, env = MEILI_S3_ACCESS_KEY)]
pub s3_access_key: Option<String>,
/// Set the S3 secret key. If used you must also set the access key.
#[clap(long, env = MEILI_S3_SECRET_KEY)]
pub s3_secret_key: Option<String>,
/// Security token, can't be used with access key and secret key.
#[clap(long, env = MEILI_S3_SECURITY_TOKEN)]
pub s3_security_token: Option<String>,
/// Deactivates Meilisearch's built-in telemetry when provided.
///
/// Meilisearch automatically collects data from all instances that do not opt out using this flag.
@ -368,6 +406,13 @@ impl Opt {
http_addr,
master_key,
env,
zk_url,
s3_url,
s3_region,
s3_bucket,
s3_access_key,
s3_secret_key,
s3_security_token,
max_index_size: _,
max_task_db_size: _,
http_payload_size_limit,
@ -401,6 +446,25 @@ impl Opt {
export_to_env_if_not_present(MEILI_MASTER_KEY, master_key);
}
export_to_env_if_not_present(MEILI_ENV, env);
if let Some(zk_url) = zk_url {
export_to_env_if_not_present(MEILI_ZK_URL, zk_url);
}
if let Some(s3_url) = s3_url {
export_to_env_if_not_present(MEILI_S3_URL, s3_url);
}
export_to_env_if_not_present(MEILI_S3_REGION, s3_region);
if let Some(s3_bucket) = s3_bucket {
export_to_env_if_not_present(MEILI_S3_BUCKET, s3_bucket);
}
if let Some(s3_access_key) = s3_access_key {
export_to_env_if_not_present(MEILI_S3_ACCESS_KEY, s3_access_key);
}
if let Some(s3_secret_key) = s3_secret_key {
export_to_env_if_not_present(MEILI_S3_SECRET_KEY, s3_secret_key);
}
if let Some(s3_security_token) = s3_security_token {
export_to_env_if_not_present(MEILI_S3_SECURITY_TOKEN, s3_security_token);
}
#[cfg(all(not(debug_assertions), feature = "analytics"))]
{
export_to_env_if_not_present(MEILI_NO_ANALYTICS, no_analytics.to_string());
@ -547,7 +611,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
Ok(Self {
log_every_n: Some(DEFAULT_LOG_EVERY_N),
max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize),
thread_pool: Some(thread_pool),
thread_pool: Some(Arc::new(thread_pool)),
max_positions_per_attributes: None,
skip_index_budget: other.skip_index_budget,
..Default::default()
@ -715,6 +779,10 @@ fn default_env() -> String {
DEFAULT_ENV.to_string()
}
fn default_s3_region() -> String {
DEFAULT_S3_REGION.to_string()
}
fn default_max_index_size() -> Byte {
Byte::from_bytes(INDEX_SIZE)
}

View File

@ -41,14 +41,10 @@ pub async fn create_api_key(
_req: HttpRequest,
) -> Result<HttpResponse, ResponseError> {
let v = body.into_inner();
let res = tokio::task::spawn_blocking(move || -> Result<_, AuthControllerError> {
let key = auth_controller.create_key(v)?;
Ok(KeyView::from_key(key, &auth_controller))
})
.await
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
let key = auth_controller.create_key(v)?;
let key = KeyView::from_key(key, &auth_controller);
Ok(HttpResponse::Created().json(res))
Ok(HttpResponse::Created().json(key))
}
#[derive(Deserr, Debug, Clone, Copy)]
@ -110,17 +106,11 @@ pub async fn patch_api_key(
) -> Result<HttpResponse, ResponseError> {
let key = path.into_inner().key;
let patch_api_key = body.into_inner();
let res = tokio::task::spawn_blocking(move || -> Result<_, AuthControllerError> {
let uid =
Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?;
let key = auth_controller.update_key(uid, patch_api_key)?;
let uid = Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?;
let key = auth_controller.update_key(uid, patch_api_key)?;
let key = KeyView::from_key(key, &auth_controller);
Ok(KeyView::from_key(key, &auth_controller))
})
.await
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
Ok(HttpResponse::Ok().json(res))
Ok(HttpResponse::Ok().json(key))
}
pub async fn delete_api_key(
@ -128,13 +118,8 @@ pub async fn delete_api_key(
path: web::Path<AuthParam>,
) -> Result<HttpResponse, ResponseError> {
let key = path.into_inner().key;
tokio::task::spawn_blocking(move || {
let uid =
Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?;
auth_controller.delete_key(uid)
})
.await
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
let uid = Uuid::parse_str(&key).or_else(|_| auth_controller.get_uid_from_encoded_key(&key))?;
auth_controller.delete_key(uid)?;
Ok(HttpResponse::NoContent().finish())
}

View File

@ -29,8 +29,7 @@ pub async fn create_dump(
keys: auth_controller.list_keys()?,
instance_uid: analytics.instance_uid().cloned(),
};
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))

View File

@ -78,6 +78,6 @@ async fn patch_features(
}),
Some(&req),
);
index_scheduler.put_runtime_features(new_features)?;
index_scheduler.inner().put_runtime_features(new_features)?;
Ok(HttpResponse::Ok().json(new_features))
}

View File

@ -1,4 +1,4 @@
use std::io::ErrorKind;
use std::io::{BufReader, ErrorKind, Seek, SeekFrom};
use actix_web::http::header::CONTENT_TYPE;
use actix_web::web::Data;
@ -129,8 +129,7 @@ pub async fn delete_document(
index_uid: index_uid.to_string(),
documents_ids: vec![document_id],
};
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
@ -396,11 +395,12 @@ async fn document_addition(
return Err(MeilisearchHttpError::MissingPayload(format));
}
if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await {
if let Err(e) = buffer.seek(SeekFrom::Start(0)).await {
return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
}
let read_file = buffer.into_inner().into_std().await;
let s3 = index_scheduler.s3.clone();
let documents_count = tokio::task::spawn_blocking(move || {
let documents_count = match format {
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
@ -409,8 +409,16 @@ async fn document_addition(
}
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
};
if let Some(s3) = s3 {
update_file.seek(SeekFrom::Start(0)).unwrap();
let mut reader = BufReader::new(&*update_file);
s3.put_object_multipart(format!("update-files/{}", uuid), &mut reader)?;
}
// we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist()?;
Ok(documents_count)
})
.await;
@ -445,7 +453,7 @@ async fn document_addition(
};
let scheduler = index_scheduler.clone();
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
let task = match scheduler.register(task) {
Ok(task) => task,
Err(e) => {
index_scheduler.delete_update_file(uuid)?;
@ -476,8 +484,7 @@ pub async fn delete_documents_batch(
let task =
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -512,8 +519,7 @@ pub async fn delete_documents_by_filter(
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -529,8 +535,7 @@ pub async fn clear_all_documents(
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))

View File

@ -135,8 +135,7 @@ pub async fn create_index(
);
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
Ok(HttpResponse::Accepted().json(task))
} else {
@ -203,8 +202,7 @@ pub async fn update_index(
primary_key: body.primary_key,
};
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -216,8 +214,7 @@ pub async fn delete_index(
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
Ok(HttpResponse::Accepted().json(task))
}

View File

@ -55,10 +55,7 @@ macro_rules! make_setting_route {
is_deletion: true,
allow_index_creation,
};
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task))
.await??
.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -97,10 +94,7 @@ macro_rules! make_setting_route {
is_deletion: false,
allow_index_creation,
};
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task))
.await??
.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -586,8 +580,7 @@ pub async fn update_all(
is_deletion: false,
allow_index_creation,
};
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -622,8 +615,7 @@ pub async fn delete_all(
is_deletion: true,
allow_index_creation,
};
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
let task: SummarizedTaskView = index_scheduler.register(task)?.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))

View File

@ -18,7 +18,6 @@ use serde_json::json;
use time::format_description::well_known::Rfc3339;
use time::macros::format_description;
use time::{Date, Duration, OffsetDateTime, Time};
use tokio::task;
use super::SummarizedTaskView;
use crate::analytics::Analytics;
@ -325,15 +324,12 @@ async fn cancel_tasks(
let query = params.into_query();
let (tasks, _) = index_scheduler.get_task_ids_from_authorized_indexes(
&index_scheduler.read_txn()?,
&query,
index_scheduler.filters(),
)?;
let (tasks, _) =
index_scheduler.get_task_ids_from_authorized_indexes(&query, index_scheduler.filters())?;
let task_cancelation =
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??;
let task = index_scheduler.register(task_cancelation)?;
let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task))
@ -370,15 +366,12 @@ async fn delete_tasks(
);
let query = params.into_query();
let (tasks, _) = index_scheduler.get_task_ids_from_authorized_indexes(
&index_scheduler.read_txn()?,
&query,
index_scheduler.filters(),
)?;
let (tasks, _) =
index_scheduler.get_task_ids_from_authorized_indexes(&query, index_scheduler.filters())?;
let task_deletion =
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??;
let task = index_scheduler.register(task_deletion)?;
let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task))

View File

@ -39,7 +39,7 @@ impl Server {
let options = default_settings(dir.path());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let (index_scheduler, auth) = setup_meilisearch(&options, None).await.unwrap();
let service = Service { index_scheduler, auth, options, api_key: None };
Server { service, _dir: Some(dir) }
@ -54,7 +54,7 @@ impl Server {
options.master_key = Some("MASTER_KEY".to_string());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let (index_scheduler, auth) = setup_meilisearch(&options, None).await.unwrap();
let service = Service { index_scheduler, auth, options, api_key: None };
Server { service, _dir: Some(dir) }
@ -67,7 +67,7 @@ impl Server {
}
pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> {
let (index_scheduler, auth) = setup_meilisearch(&options)?;
let (index_scheduler, auth) = setup_meilisearch(&options, None).await?;
let service = Service { index_scheduler, auth, options, api_key: None };
Ok(Server { service, _dir: None })

View File

@ -17,7 +17,7 @@ bincode = "1.3.3"
bstr = "1.4.0"
bytemuck = { version = "1.13.1", features = ["extern_crate_alloc"] }
byteorder = "1.4.3"
charabia = { version = "0.8.2", default-features = false }
charabia = { version = "0.8.4", default-features = false }
concat-arrays = "0.1.2"
crossbeam-channel = "0.5.8"
deserr = "0.5.0"

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use grenad::CompressionType;
use rayon::ThreadPool;
@ -9,7 +11,7 @@ pub struct IndexerConfig {
pub max_memory: Option<usize>,
pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>,
pub thread_pool: Option<ThreadPool>,
pub thread_pool: Option<Arc<ThreadPool>>,
pub max_positions_per_attributes: Option<u32>,
pub skip_index_budget: bool,
}