Reintroduce a simple HTTP server

This commit is contained in:
Kerollmops
2020-05-31 17:48:13 +02:00
parent 2a10b2275e
commit a26553c90a
9 changed files with 1458 additions and 83 deletions

View File

@ -2,16 +2,9 @@ use std::io::{self, Write};
use std::path::PathBuf;
use std::time::Instant;
use cow_utils::CowUtils;
use fst::{Streamer, IntoStreamer};
use heed::types::*;
use heed::{EnvOpenOptions, Database};
use levenshtein_automata::LevenshteinAutomatonBuilder;
use roaring::RoaringBitmap;
use heed::EnvOpenOptions;
use structopt::StructOpt;
use mega_mini_indexer::alphanumeric_tokens;
use mega_mini_indexer::BEU32;
use mega_mini_indexer::{Index, BEU32};
#[derive(Debug, StructOpt)]
#[structopt(name = "mm-indexer", about = "The server side of the daugt project.")]
@ -35,78 +28,27 @@ fn main() -> anyhow::Result<()> {
.max_dbs(5)
.open(opt.database)?;
let main = env.create_poly_database(None)?;
let postings_ids: Database<Str, ByteSlice> = env.create_database(Some("postings-ids"))?;
let documents: Database<OwnedType<BEU32>, ByteSlice> = env.create_database(Some("documents"))?;
let index = Index::new(&env)?;
let before = Instant::now();
let rtxn = env.read_txn()?;
let headers = match main.get::<_, Str, ByteSlice>(&rtxn, "headers")? {
let documents_ids = index.search(&rtxn, &opt.query)?;
let headers = match index.headers(&rtxn)? {
Some(headers) => headers,
None => return Ok(()),
};
let fst = match main.get::<_, Str, ByteSlice>(&rtxn, "words-fst")? {
Some(bytes) => fst::Set::new(bytes)?,
None => return Ok(()),
};
// Building these factories is not free.
let lev0 = LevenshteinAutomatonBuilder::new(0, true);
let lev1 = LevenshteinAutomatonBuilder::new(1, true);
let lev2 = LevenshteinAutomatonBuilder::new(2, true);
let words: Vec<_> = alphanumeric_tokens(&opt.query).collect();
let number_of_words = words.len();
let dfas = words.into_iter().enumerate().map(|(i, word)| {
let word = word.cow_to_lowercase();
let is_last = i + 1 == number_of_words;
let dfa = match word.len() {
0..=4 => if is_last { lev0.build_prefix_dfa(&word) } else { lev0.build_dfa(&word) },
5..=8 => if is_last { lev1.build_prefix_dfa(&word) } else { lev1.build_dfa(&word) },
_ => if is_last { lev2.build_prefix_dfa(&word) } else { lev2.build_dfa(&word) },
};
(word, dfa)
});
let before = Instant::now();
let mut intersect_result: Option<RoaringBitmap> = None;
for (word, dfa) in dfas {
let before = Instant::now();
let mut union_result = RoaringBitmap::default();
let mut stream = fst.search(dfa).into_stream();
while let Some(word) = stream.next() {
let word = std::str::from_utf8(word)?;
if let Some(ids) = postings_ids.get(&rtxn, word)? {
let right = RoaringBitmap::deserialize_from(ids)?;
union_result.union_with(&right);
}
}
eprintln!("union for {:?} took {:.02?}", word, before.elapsed());
intersect_result = match intersect_result.take() {
Some(mut left) => {
let before = Instant::now();
let left_len = left.len();
left.intersect_with(&union_result);
eprintln!("intersect between {:?} and {:?} took {:.02?}",
left_len, union_result.len(), before.elapsed());
Some(left)
},
None => Some(union_result),
};
}
let mut stdout = io::stdout();
stdout.write_all(&headers)?;
let total_length = intersect_result.as_ref().map_or(0, |x| x.len());
for id in intersect_result.unwrap_or_default().iter().take(20) {
if let Some(content) = documents.get(&rtxn, &BEU32::new(id))? {
for id in &documents_ids {
if let Some(content) = index.documents.get(&rtxn, &BEU32::new(*id))? {
stdout.write_all(&content)?;
}
}
eprintln!("Took {:.02?} to find {} documents", before.elapsed(), total_length);
eprintln!("Took {:.02?} to find {} documents", before.elapsed(), documents_ids.len());
Ok(())
}

115
src/bin/serve.rs Normal file
View File

@ -0,0 +1,115 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Instant;
use heed::EnvOpenOptions;
use serde::Deserialize;
use structopt::StructOpt;
use warp::{Filter, http::Response};
use mega_mini_indexer::{BEU32, Index};
#[derive(Debug, StructOpt)]
#[structopt(name = "mmi", about = "The server side of the mmi project.")]
struct Opt {
/// The database path where the LMDB database is located.
/// It is created if it doesn't already exist.
#[structopt(long = "db", parse(from_os_str))]
database: PathBuf,
/// The maximum size the database can take on disk. It is recommended to specify
/// the whole disk space (value must be a multiple of a page size).
#[structopt(long = "db-size", default_value = "107374182400")] // 100 GB
database_size: usize,
/// The ip and port on which the database will listen for HTTP requests.
#[structopt(short = "l", long, default_value = "127.0.0.1:9700")]
http_listen_addr: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
std::fs::create_dir_all(&opt.database)?;
let env = EnvOpenOptions::new()
.map_size(opt.database_size)
.max_dbs(10)
.open(&opt.database)?;
let index = Index::new(&env)?;
// We run and wait on the HTTP server
// Expose an HTML page to debug the search in a browser
let dash_html_route = warp::filters::method::get()
.and(warp::filters::path::end())
.map(|| warp::reply::html(include_str!("../../public/index.html")));
let dash_bulma_route = warp::filters::method::get()
.and(warp::path!("bulma.min.css"))
.map(|| Response::builder()
.header("content-type", "text/css; charset=utf-8")
.body(include_str!("../../public/bulma.min.css"))
);
let dash_jquery_route = warp::filters::method::get()
.and(warp::path!("jquery-3.4.1.min.js"))
.map(|| Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.body(include_str!("../../public/jquery-3.4.1.min.js"))
);
let dash_papaparse_route = warp::filters::method::get()
.and(warp::path!("papaparse.min.js"))
.map(|| Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.body(include_str!("../../public/papaparse.min.js"))
);
#[derive(Deserialize)]
struct QueryBody {
query: String,
}
let env_cloned = env.clone();
let query_route = warp::filters::method::post()
.and(warp::path!("query"))
.and(warp::body::json())
.map(move |query: QueryBody| {
let before_search = Instant::now();
let rtxn = env_cloned.read_txn().unwrap();
let documents_ids = index.search(&rtxn, &query.query).unwrap();
let mut body = Vec::new();
if let Some(headers) = index.headers(&rtxn).unwrap() {
// We write the headers
body.extend_from_slice(headers);
for id in documents_ids {
if let Some(content) = index.documents.get(&rtxn, &BEU32::new(id)).unwrap() {
body.extend_from_slice(&content);
}
}
}
Response::builder()
.header("Content-Type", "text/csv")
.header("Time-Ms", before_search.elapsed().as_millis().to_string())
.body(String::from_utf8(body).unwrap())
});
let routes = dash_html_route
.or(dash_bulma_route)
.or(dash_jquery_route)
.or(dash_papaparse_route)
.or(query_route);
let addr = SocketAddr::from_str(&opt.http_listen_addr).unwrap();
eprintln!("listening on http://{}", addr);
warp::serve(routes).run(addr).await;
Ok(())
}