Move the documents back into the LMDB database

This commit is contained in:
Clément Renault
2020-08-29 15:14:04 +02:00
parent 816db7a0aa
commit 3db517548d
4 changed files with 53 additions and 77 deletions

View File

@ -21,7 +21,7 @@ use rayon::prelude::*;
use roaring::RoaringBitmap;
use structopt::StructOpt;
use milli::{lexer, SmallVec32, Index, DocumentId, Position, Attribute};
use milli::{lexer, SmallVec32, Index, DocumentId, Position, Attribute, BEU32};
const LMDB_MAX_KEY_LENGTH: usize = 511;
const ONE_MILLION: usize = 1_000_000;
@ -30,10 +30,12 @@ const MAX_POSITION: usize = 1000;
const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION;
const HEADERS_KEY: &[u8] = b"\0headers";
const DOCUMENTS_IDS_KEY: &[u8] = b"\x04documents-ids";
const WORDS_FST_KEY: &[u8] = b"\x05words-fst";
const WORD_POSITIONS_BYTE: u8 = 1;
const WORD_POSITION_DOCIDS_BYTE: u8 = 2;
const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3;
const DOCUMENTS_IDS_BYTE: u8 = 4;
#[cfg(target_os = "linux")]
#[global_allocator]
@ -60,14 +62,6 @@ struct Opt {
#[structopt(flatten)]
indexer: IndexerOpt,
/// The name of the compression algorithm to use when compressing the final documents database.
#[structopt(long, default_value = "zlib", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
documents_compression_type: String,
/// The level of compression of the chosen algorithm.
#[structopt(long, default_value = "9")]
documents_compression_level: u32,
/// Verbose mode (-v, -vv, -vvv, etc.)
#[structopt(short, long, parse(from_occurrences))]
verbose: usize,
@ -129,6 +123,7 @@ struct Store {
word_positions: ArcCache<SmallVec32<u8>, RoaringBitmap>,
word_position_docids: ArcCache<(SmallVec32<u8>, Position), RoaringBitmap>,
word_attribute_docids: ArcCache<(SmallVec32<u8>, Attribute), RoaringBitmap>,
documents_ids: RoaringBitmap,
sorter: Sorter<MergeFn>,
documents_sorter: Sorter<MergeFn>,
}
@ -166,6 +161,7 @@ impl Store {
word_positions: ArcCache::new(arc_cache_size),
word_position_docids: ArcCache::new(arc_cache_size),
word_attribute_docids: ArcCache::new(arc_cache_size),
documents_ids: RoaringBitmap::new(),
sorter: builder.build(),
documents_sorter: documents_builder.build(),
}
@ -200,6 +196,10 @@ impl Store {
Ok(self.sorter.insert(HEADERS_KEY, headers)?)
}
pub fn write_document_id(&mut self, id: DocumentId) {
self.documents_ids.insert(id);
}
pub fn write_document(&mut self, id: DocumentId, content: &[u8]) -> anyhow::Result<()> {
Ok(self.documents_sorter.insert(id.to_be_bytes(), content)?)
}
@ -216,6 +216,7 @@ impl Store {
key.extend_from_slice(&word);
// We serialize the positions into a buffer
buffer.clear();
buffer.reserve(positions.serialized_size());
positions.serialize_into(&mut buffer)?;
// that we write under the generated key into MTBL
if lmdb_key_valid_size(&key) {
@ -240,6 +241,7 @@ impl Store {
key.extend_from_slice(&pos.to_be_bytes());
// We serialize the document ids into a buffer
buffer.clear();
buffer.reserve(ids.serialized_size());
ids.serialize_into(&mut buffer)?;
// that we write under the generated key into MTBL
if lmdb_key_valid_size(&key) {
@ -264,6 +266,7 @@ impl Store {
key.extend_from_slice(&attr.to_be_bytes());
// We serialize the document ids into a buffer
buffer.clear();
buffer.reserve(ids.serialized_size());
ids.serialize_into(&mut buffer)?;
// that we write under the generated key into MTBL
if lmdb_key_valid_size(&key) {
@ -274,10 +277,18 @@ impl Store {
Ok(())
}
fn write_documents_ids(sorter: &mut Sorter<MergeFn>, ids: RoaringBitmap) -> anyhow::Result<()> {
let mut buffer = Vec::with_capacity(ids.serialized_size());
ids.serialize_into(&mut buffer)?;
sorter.insert(DOCUMENTS_IDS_KEY, &buffer)?;
Ok(())
}
pub fn finish(mut self) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)> {
Self::write_word_positions(&mut self.sorter, self.word_positions)?;
Self::write_word_position_docids(&mut self.sorter, self.word_position_docids)?;
Self::write_word_attribute_docids(&mut self.sorter, self.word_attribute_docids)?;
Self::write_documents_ids(&mut self.sorter, self.documents_ids)?;
let mut wtr = tempfile::tempfile().map(Writer::new)?;
let mut builder = fst::SetBuilder::memory();
@ -335,7 +346,7 @@ fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> {
Ok(values[0].to_vec())
},
key => match key[0] {
WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => {
DOCUMENTS_IDS_BYTE | WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => {
let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap();
for value in &values[1..] {
@ -363,6 +374,10 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) ->
// Write the headers
index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?;
}
else if key == DOCUMENTS_IDS_KEY {
// Write the documents ids list
index.main.put::<_, Str, ByteSlice>(wtxn, "documents-ids", val)?;
}
else if key.starts_with(&[WORD_POSITIONS_BYTE]) {
// Write the postings lists
index.word_positions.as_polymorph()
@ -458,6 +473,7 @@ fn index_csv(
writer.write_byte_record(document.as_byte_record())?;
let document = writer.into_inner()?;
store.write_document(document_id, &document)?;
store.write_document_id(document_id);
}
// Compute the document id of the the next document.
@ -497,8 +513,6 @@ fn main() -> anyhow::Result<()> {
let max_memory = opt.indexer.max_memory;
let chunk_compression_type = compression_type_from_str(&opt.indexer.chunk_compression_type);
let chunk_compression_level = opt.indexer.chunk_compression_level;
let documents_compression_type = compression_type_from_str(&opt.documents_compression_type);
let documents_compression_level = opt.documents_compression_level;
let csv_readers = match opt.csv_file {
Some(file_path) => {
@ -568,28 +582,22 @@ fn main() -> anyhow::Result<()> {
docs_stores.push(d);
});
debug!("We are writing the documents into MTBL on disk...");
// We also merge the documents into its own MTBL store.
let file = tempfile::tempfile()?;
let mut writer = Writer::builder()
.compression_type(documents_compression_type)
.compression_level(documents_compression_level)
.build(file);
let mut builder = Merger::builder(docs_merge);
builder.extend(docs_stores);
builder.build().write_into(&mut writer)?;
let file = writer.into_inner()?;
// Read back the documents MTBL database from the file.
let documents_mmap = unsafe { memmap::Mmap::map(&file)? };
let documents = Reader::new(documents_mmap)?;
debug!("We are writing the postings lists and documents into LMDB on disk...");
// We merge the postings lists into LMDB.
let mut wtxn = env.write_txn()?;
// We merge the postings lists into LMDB.
debug!("We are writing the postings lists into LMDB on disk...");
merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?;
index.put_documents(&mut wtxn, &documents)?;
// We merge the documents into LMDB.
debug!("We are writing the documents into LMDB on disk...");
merge_into_lmdb(docs_stores, |k, v| {
let id = k.try_into().map(u32::from_be_bytes)?;
Ok(index.documents.put(&mut wtxn, &BEU32::new(id), v)?)
})?;
// Retrieve the number of documents.
let count = index.number_of_documents(&wtxn)?;
wtxn.commit()?;
info!("Wrote {} documents in {:.02?}", count, before_indexing.elapsed());