mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-03 17:36:29 +00:00 
			
		
		
		
	Introduce a new semi ordered merge function
This commit is contained in:
		@@ -566,6 +566,116 @@ where
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Merges the caches that must be all associated to the same bucket.
 | 
			
		||||
///
 | 
			
		||||
/// It merges entries like the `merge_caches` function
 | 
			
		||||
pub fn merge_caches_alt<F>(frozen: Vec<FrozenCache>, mut f: F) -> Result<()>
 | 
			
		||||
where
 | 
			
		||||
    F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
 | 
			
		||||
{
 | 
			
		||||
    let mut maps = Vec::new();
 | 
			
		||||
    let mut readers = Vec::new();
 | 
			
		||||
    let mut current_bucket = None;
 | 
			
		||||
    for FrozenCache { bucket, cache, ref mut spilled } in frozen {
 | 
			
		||||
        assert_eq!(*current_bucket.get_or_insert(bucket), bucket);
 | 
			
		||||
        maps.push(cache);
 | 
			
		||||
        readers.append(spilled);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // First manage the spilled entries by looking into the HashMaps,
 | 
			
		||||
    // merge them and mark them as dummy.
 | 
			
		||||
    let mut heap = BinaryHeap::new();
 | 
			
		||||
    for (source_index, source) in readers.into_iter().enumerate() {
 | 
			
		||||
        let mut cursor = source.into_cursor()?;
 | 
			
		||||
        if cursor.move_on_next()?.is_some() {
 | 
			
		||||
            heap.push(Entry { cursor, source_index });
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    loop {
 | 
			
		||||
        let mut first_entry = match heap.pop() {
 | 
			
		||||
            Some(entry) => entry,
 | 
			
		||||
            None => break,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let (first_key, first_value) = match first_entry.cursor.current() {
 | 
			
		||||
            Some((key, value)) => (key, value),
 | 
			
		||||
            None => break,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let mut output = DelAddRoaringBitmap::from_bytes(first_value)?;
 | 
			
		||||
        while let Some(mut entry) = heap.peek_mut() {
 | 
			
		||||
            if let Some((key, _value)) = entry.cursor.current() {
 | 
			
		||||
                if first_key == key {
 | 
			
		||||
                    let new = DelAddRoaringBitmap::from_bytes(first_value)?;
 | 
			
		||||
                    output = output.merge(new);
 | 
			
		||||
                    // When we are done we the current value of this entry move make
 | 
			
		||||
                    // it move forward and let the heap reorganize itself (on drop)
 | 
			
		||||
                    if entry.cursor.move_on_next()?.is_none() {
 | 
			
		||||
                        PeekMut::pop(entry);
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Once we merged all of the spilled bitmaps we must also
 | 
			
		||||
        // fetch the entries from the non-spilled entries (the HashMaps).
 | 
			
		||||
        for (map_index, map) in maps.iter_mut().enumerate() {
 | 
			
		||||
            if first_entry.source_index != map_index {
 | 
			
		||||
                if let Some(new) = map.get_mut(first_key) {
 | 
			
		||||
                    output.union_and_clear_bbbul(new);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // We send the merged entry outside.
 | 
			
		||||
        (f)(first_key, output)?;
 | 
			
		||||
 | 
			
		||||
        // Don't forget to put the first entry back into the heap.
 | 
			
		||||
        if first_entry.cursor.move_on_next()?.is_some() {
 | 
			
		||||
            heap.push(first_entry)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Then manage the content on the HashMap entries that weren't taken (mem::take).
 | 
			
		||||
    let order_count = 1000;
 | 
			
		||||
    while let Some(mut map) = maps.pop() {
 | 
			
		||||
        let mut iter = map.iter_mut();
 | 
			
		||||
 | 
			
		||||
        loop {
 | 
			
		||||
            let mut ordered_buffer: Vec<_> = iter.by_ref().take(order_count).collect();
 | 
			
		||||
            ordered_buffer.sort_unstable_by_key(|(key, _)| *key);
 | 
			
		||||
 | 
			
		||||
            if ordered_buffer.is_empty() {
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            for (key, bbbul) in ordered_buffer.drain(..) {
 | 
			
		||||
                // Make sure we don't try to work with entries already managed by the spilled
 | 
			
		||||
                if bbbul.is_empty() {
 | 
			
		||||
                    continue;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                let mut output = DelAddRoaringBitmap::empty();
 | 
			
		||||
                output.union_and_clear_bbbul(bbbul);
 | 
			
		||||
 | 
			
		||||
                for rhs in maps.iter_mut() {
 | 
			
		||||
                    if let Some(new) = rhs.get_mut(key) {
 | 
			
		||||
                        output.union_and_clear_bbbul(new);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                // We send the merged entry outside.
 | 
			
		||||
                (f)(key, output)?;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct Entry<R> {
 | 
			
		||||
    cursor: ReaderCursor<R>,
 | 
			
		||||
    source_index: usize,
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user