Store the word positions under the documents

This commit is contained in:
Clément Renault
2020-09-05 18:03:06 +02:00
parent 580ed1119a
commit dc88a86259
7 changed files with 72 additions and 563 deletions

View File

@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::fs::File;
use std::io::{self, Read, Write};
@ -13,6 +14,7 @@ use cow_utils::CowUtils;
use csv::StringRecord;
use flate2::read::GzDecoder;
use fst::IntoStreamer;
use heed::BytesDecode;
use heed::BytesEncode;
use heed::EnvOpenOptions;
use heed::types::*;
@ -25,7 +27,7 @@ use structopt::StructOpt;
use milli::heed_codec::CsvStringRecordCodec;
use milli::tokenizer::{simple_tokenizer, only_words};
use milli::{SmallVec32, Index, DocumentId, Position, Attribute, BEU32};
use milli::{SmallVec32, Index, DocumentId, BEU32, StrBEU32Codec};
const LMDB_MAX_KEY_LENGTH: usize = 511;
const ONE_MILLION: usize = 1_000_000;
@ -37,10 +39,8 @@ const HEADERS_KEY: &[u8] = b"\0headers";
const DOCUMENTS_IDS_KEY: &[u8] = b"\x04documents-ids";
const WORDS_FST_KEY: &[u8] = b"\x06words-fst";
const DOCUMENTS_IDS_BYTE: u8 = 4;
const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3;
const WORD_FOUR_POSITIONS_DOCIDS_BYTE: u8 = 5;
const WORD_POSITION_DOCIDS_BYTE: u8 = 2;
const WORD_POSITIONS_BYTE: u8 = 1;
const WORD_DOCIDS_BYTE: u8 = 2;
const WORD_DOCID_POSITIONS_BYTE: u8 = 1;
#[cfg(target_os = "linux")]
#[global_allocator]
@ -125,10 +125,7 @@ fn lmdb_key_valid_size(key: &[u8]) -> bool {
type MergeFn = fn(&[u8], &[Vec<u8>]) -> Result<Vec<u8>, ()>;
struct Store {
word_positions: ArcCache<SmallVec32<u8>, RoaringBitmap>,
word_position_docids: ArcCache<(SmallVec32<u8>, Position), RoaringBitmap>,
word_four_positions_docids: ArcCache<(SmallVec32<u8>, Position), RoaringBitmap>,
word_attribute_docids: ArcCache<(SmallVec32<u8>, Attribute), RoaringBitmap>,
word_docids: ArcCache<SmallVec32<u8>, RoaringBitmap>,
documents_ids: RoaringBitmap,
sorter: Sorter<MergeFn>,
documents_sorter: Sorter<MergeFn>,
@ -162,10 +159,7 @@ impl Store {
}
Store {
word_positions: ArcCache::new(arc_cache_size),
word_position_docids: ArcCache::new(arc_cache_size),
word_four_positions_docids: ArcCache::new(arc_cache_size),
word_attribute_docids: ArcCache::new(arc_cache_size),
word_docids: ArcCache::new(arc_cache_size),
documents_ids: RoaringBitmap::new(),
sorter: builder.build(),
documents_sorter: documents_builder.build(),
@ -173,65 +167,48 @@ impl Store {
}
// Save the documents ids under the position and word we have seen it.
pub fn insert_word_position_docid(&mut self, word: &str, position: Position, id: DocumentId) -> anyhow::Result<()> {
pub fn insert_word_docid(&mut self, word: &str, id: DocumentId) -> anyhow::Result<()> {
let word_vec = SmallVec32::from(word.as_bytes());
let ids = RoaringBitmap::from_iter(Some(id));
let (_, lrus) = self.word_position_docids.insert((word_vec, position), ids, |old, new| old.union_with(&new));
Self::write_word_position_docids(&mut self.sorter, lrus)?;
self.insert_word_position(word, position)?;
self.insert_word_four_positions_docid(word, position, id)?;
self.insert_word_attribute_docid(word, position / MAX_POSITION as u32, id)?;
let (_, lrus) = self.word_docids.insert(word_vec, ids, |old, new| old.union_with(&new));
Self::write_word_docids(&mut self.sorter, lrus)?;
Ok(())
}
pub fn insert_word_four_positions_docid(&mut self, word: &str, position: Position, id: DocumentId) -> anyhow::Result<()> {
let position = position - position % 4;
let word_vec = SmallVec32::from(word.as_bytes());
let ids = RoaringBitmap::from_iter(Some(id));
let (_, lrus) = self.word_four_positions_docids.insert((word_vec, position), ids, |old, new| old.union_with(&new));
Self::write_word_four_positions_docids(&mut self.sorter, lrus)
}
// Save the positions where this word has been seen.
pub fn insert_word_position(&mut self, word: &str, position: Position) -> anyhow::Result<()> {
let word = SmallVec32::from(word.as_bytes());
let position = RoaringBitmap::from_iter(Some(position));
let (_, lrus) = self.word_positions.insert(word, position, |old, new| old.union_with(&new));
Self::write_word_positions(&mut self.sorter, lrus)
}
// Save the documents ids under the attribute and word we have seen it.
fn insert_word_attribute_docid(&mut self, word: &str, attribute: Attribute, id: DocumentId) -> anyhow::Result<()> {
let word = SmallVec32::from(word.as_bytes());
let ids = RoaringBitmap::from_iter(Some(id));
let (_, lrus) = self.word_attribute_docids.insert((word, attribute), ids, |old, new| old.union_with(&new));
Self::write_word_attribute_docids(&mut self.sorter, lrus)
}
pub fn write_headers(&mut self, headers: &StringRecord) -> anyhow::Result<()> {
let headers = CsvStringRecordCodec::bytes_encode(headers)
.with_context(|| format!("could not encode csv record"))?;
Ok(self.sorter.insert(HEADERS_KEY, headers)?)
}
pub fn write_document(&mut self, id: DocumentId, record: &StringRecord) -> anyhow::Result<()> {
pub fn write_document(
&mut self,
id: DocumentId,
iter: impl IntoIterator<Item=(String, RoaringBitmap)>,
record: &StringRecord,
) -> anyhow::Result<()>
{
let record = CsvStringRecordCodec::bytes_encode(record)
.with_context(|| format!("could not encode csv record"))?;
self.documents_ids.insert(id);
Ok(self.documents_sorter.insert(id.to_be_bytes(), record)?)
self.documents_sorter.insert(id.to_be_bytes(), record)?;
Self::write_docid_word_positions(&mut self.sorter, id, iter)?;
Ok(())
}
fn write_word_positions<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()>
where I: IntoIterator<Item=(SmallVec32<u8>, RoaringBitmap)>
fn write_docid_word_positions<I>(sorter: &mut Sorter<MergeFn>, id: DocumentId, iter: I) -> anyhow::Result<()>
where I: IntoIterator<Item=(String, RoaringBitmap)>
{
// postings ids keys are all prefixed
let mut key = vec![WORD_POSITIONS_BYTE];
// postings positions ids keys are all prefixed
let mut key = vec![WORD_DOCID_POSITIONS_BYTE];
let mut buffer = Vec::new();
for (word, positions) in iter {
key.truncate(1);
key.extend_from_slice(&word);
// We serialize the positions into a buffer
key.extend_from_slice(word.as_bytes());
// We prefix the words by the document id.
key.extend_from_slice(&id.to_be_bytes());
// We serialize the document ids into a buffer
buffer.clear();
buffer.reserve(positions.serialized_size());
positions.serialize_into(&mut buffer)?;
@ -244,68 +221,16 @@ impl Store {
Ok(())
}
fn write_word_position_docids<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()>
where I: IntoIterator<Item=((SmallVec32<u8>, Position), RoaringBitmap)>
fn write_word_docids<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()>
where I: IntoIterator<Item=(SmallVec32<u8>, RoaringBitmap)>
{
// postings positions ids keys are all prefixed
let mut key = vec![WORD_POSITION_DOCIDS_BYTE];
let mut key = vec![WORD_DOCIDS_BYTE];
let mut buffer = Vec::new();
for ((word, pos), ids) in iter {
for (word, ids) in iter {
key.truncate(1);
key.extend_from_slice(&word);
// we postfix the word by the positions it appears in
key.extend_from_slice(&pos.to_be_bytes());
// We serialize the document ids into a buffer
buffer.clear();
buffer.reserve(ids.serialized_size());
ids.serialize_into(&mut buffer)?;
// that we write under the generated key into MTBL
if lmdb_key_valid_size(&key) {
sorter.insert(&key, &buffer)?;
}
}
Ok(())
}
fn write_word_four_positions_docids<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()>
where I: IntoIterator<Item=((SmallVec32<u8>, Position), RoaringBitmap)>
{
// postings positions ids keys are all prefixed
let mut key = vec![WORD_FOUR_POSITIONS_DOCIDS_BYTE];
let mut buffer = Vec::new();
for ((word, pos), ids) in iter {
key.truncate(1);
key.extend_from_slice(&word);
// we postfix the word by the positions it appears in
key.extend_from_slice(&pos.to_be_bytes());
// We serialize the document ids into a buffer
buffer.clear();
buffer.reserve(ids.serialized_size());
ids.serialize_into(&mut buffer)?;
// that we write under the generated key into MTBL
if lmdb_key_valid_size(&key) {
sorter.insert(&key, &buffer)?;
}
}
Ok(())
}
fn write_word_attribute_docids<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()>
where I: IntoIterator<Item=((SmallVec32<u8>, Attribute), RoaringBitmap)>
{
// postings attributes keys are all prefixed
let mut key = vec![WORD_ATTRIBUTE_DOCIDS_BYTE];
let mut buffer = Vec::new();
for ((word, attr), ids) in iter {
key.truncate(1);
key.extend_from_slice(&word);
// we postfix the word by the positions it appears in
key.extend_from_slice(&attr.to_be_bytes());
// We serialize the document ids into a buffer
buffer.clear();
buffer.reserve(ids.serialized_size());
@ -327,10 +252,7 @@ impl Store {
}
pub fn finish(mut self) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)> {
Self::write_word_positions(&mut self.sorter, self.word_positions)?;
Self::write_word_position_docids(&mut self.sorter, self.word_position_docids)?;
Self::write_word_four_positions_docids(&mut self.sorter, self.word_four_positions_docids)?;
Self::write_word_attribute_docids(&mut self.sorter, self.word_attribute_docids)?;
Self::write_word_docids(&mut self.sorter, self.word_docids)?;
Self::write_documents_ids(&mut self.sorter, self.documents_ids)?;
let mut wtr = tempfile::tempfile().map(Writer::new)?;
@ -339,7 +261,8 @@ impl Store {
let mut iter = self.sorter.into_iter()?;
while let Some(result) = iter.next() {
let (key, val) = result?;
if let Some((&1, word)) = key.split_first() {
if let Some((&1, bytes)) = key.split_first() {
let (word, _docid) = StrBEU32Codec::bytes_decode(bytes).unwrap();
// This is a lexicographically ordered word position
// we use the key to construct the words fst.
builder.insert(word)?;
@ -389,12 +312,7 @@ fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> {
Ok(values[0].to_vec())
},
key => match key[0] {
DOCUMENTS_IDS_BYTE
| WORD_POSITIONS_BYTE
| WORD_POSITION_DOCIDS_BYTE
| WORD_FOUR_POSITIONS_DOCIDS_BYTE
| WORD_ATTRIBUTE_DOCIDS_BYTE =>
{
DOCUMENTS_IDS_BYTE | WORD_DOCIDS_BYTE | WORD_DOCID_POSITIONS_BYTE => {
let (head, tail) = values.split_first().unwrap();
let mut head = RoaringBitmap::deserialize_from(head.as_slice()).unwrap();
@ -427,24 +345,14 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) ->
// Write the documents ids list
index.main.put::<_, Str, ByteSlice>(wtxn, "documents-ids", val)?;
}
else if key.starts_with(&[WORD_POSITIONS_BYTE]) {
else if key.starts_with(&[WORD_DOCIDS_BYTE]) {
// Write the postings lists
index.word_positions.as_polymorph()
index.word_docids.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
else if key.starts_with(&[WORD_POSITION_DOCIDS_BYTE]) {
else if key.starts_with(&[WORD_DOCID_POSITIONS_BYTE]) {
// Write the postings lists
index.word_position_docids.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
else if key.starts_with(&[WORD_FOUR_POSITIONS_DOCIDS_BYTE]) {
// Write the postings lists
index.word_four_positions_docids.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
else if key.starts_with(&[WORD_ATTRIBUTE_DOCIDS_BYTE]) {
// Write the attribute postings lists
index.word_attribute_docids.as_polymorph()
index.word_docid_positions.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
@ -499,6 +407,7 @@ fn index_csv(
let mut before = Instant::now();
let mut document_id: usize = 0;
let mut document = csv::StringRecord::new();
let mut word_positions = HashMap::new();
while rdr.read_record(&mut document)? {
// We skip documents that must not be indexed by this thread.
@ -512,14 +421,15 @@ fn index_csv(
let document_id = DocumentId::try_from(document_id).context("generated id is too big")?;
for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) {
for (pos, (_, token)) in simple_tokenizer(&content).filter(only_words).enumerate().take(MAX_POSITION) {
let word = token.cow_to_lowercase();
let word = token.to_lowercase();
let position = (attr * MAX_POSITION + pos) as u32;
store.insert_word_position_docid(&word, position, document_id)?;
store.insert_word_docid(&word, document_id)?;
word_positions.entry(word).or_insert_with(RoaringBitmap::new).insert(position);
}
}
// We write the document in the database.
store.write_document(document_id, &document)?;
store.write_document(document_id, word_positions.drain(), &document)?;
}
// Compute the document id of the the next document.