avoid openning each index twice and remove clones

This commit is contained in:
ManyTheFish
2025-10-09 12:07:44 +02:00
parent a0ef8008d6
commit e47bf75e52

View File

@@ -62,8 +62,13 @@ pub async fn perform_federated_search(
// 1. partition queries by host and index
let mut partitioned_queries = PartitionedQueries::new();
// Store the original queries order for later metadata building
let original_queries = queries.clone();
// Preconstruct metadata keeping the original queries order for later metadata building
let precomputed_query_metadata: Vec<_> = queries
.iter()
.map(|q| {
(q.index_uid.to_string(), q.federation_options.as_ref().and_then(|o| o.remote.clone()))
})
.collect();
for (query_index, federated_query) in queries.into_iter().enumerate() {
partitioned_queries.partition(federated_query, query_index, &network, features)?
@@ -72,7 +77,7 @@ pub async fn perform_federated_search(
// 2. perform queries, merge and make hits index by index
// 2.1. start remote queries
let remote_search = RemoteSearch::start(
partitioned_queries.remote_queries_by_host.clone(),
partitioned_queries.remote_queries_by_host,
&federation,
deadline,
include_metadata,
@@ -118,69 +123,57 @@ pub async fn perform_federated_search(
let after_waiting_remote_results = std::time::Instant::now();
// 3. merge hits and metadata across indexes and hosts
// 3.1. merge federation metadata
let (estimated_total_hits, degraded, used_negative_operator, facets, max_remote_duration) =
merge_metadata(&mut results_by_index, &remote_results);
// 3.2. Build metadata in the same order as the original queries
// 3.1. Build metadata in the same order as the original queries
let query_metadata = if include_metadata {
let mut query_metadata = Vec::new();
// Create a map of (remote, index_uid) -> primary_key for quick lookup
// 3.1.1. Create a map of (remote, index_uid) -> primary_key for quick lookup
// This prevents collisions when multiple remotes have the same index_uid but different primary keys
let mut remote_primary_keys = std::collections::HashMap::new();
let mut primary_key_per_index = std::collections::HashMap::new();
// 3.1.1.1 Build metadata for remote results
for remote_result in &remote_results {
if let Some(remote_metadata) = &remote_result.metadata {
for remote_meta in remote_metadata {
if let SearchMetadata {
remote: Some(remote_name),
primary_key: Some(primary_key),
index_uid,
primary_key: Some(primary_key),
..
} = &remote_meta
{
let key = (remote_name, index_uid);
remote_primary_keys.insert(key, primary_key);
let key = (Some(remote_name), index_uid);
primary_key_per_index.insert(key, primary_key);
}
}
}
}
// Build metadata in the same order as the original queries
for original_query in original_queries {
// 3.1.1.2 Build metadata for local results
for local_meta in &results_by_index {
if let SearchResultByIndex { index, primary_key: Some(primary_key), .. } = &local_meta {
let key = (None, index);
primary_key_per_index.insert(key, primary_key);
}
}
// 3.1.2 Build metadata in the same order as the original queries
let mut query_metadata = Vec::new();
for (index_uid, remote) in precomputed_query_metadata {
let primary_key =
primary_key_per_index.get(&(remote.as_ref(), &index_uid)).map(|pk| pk.to_string());
let query_uid = Uuid::now_v7();
let index_uid = original_query.index_uid.to_string();
// Determine if this is a remote query
let (_, _, federation_options) = original_query.into_index_query_federation();
let remote = federation_options.and_then(|options| options.remote);
// Get primary key for this index
let primary_key = match &remote {
Some(remote_name) => {
// For remote queries, try to get primary key from remote results
// Use composite key (remote, index_uid) to avoid collisions
let lookup_key = (remote_name, &index_uid);
remote_primary_keys.get(&lookup_key).map(|pk| pk.to_string())
}
None => {
// For local queries, get primary key from local index
index_scheduler.index(&index_uid).ok().and_then(|index| {
index.read_txn().ok().and_then(|rtxn| {
index.primary_key(&rtxn).ok().flatten().map(|pk| pk.to_string())
})
})
}
};
query_metadata.push(SearchMetadata { query_uid, index_uid, primary_key, remote });
query_metadata.push(SearchMetadata { query_uid, primary_key, index_uid, remote });
}
Some(query_metadata)
} else {
None
};
// 3.2. merge hits
// 3.2. merge federation metadata
let (estimated_total_hits, degraded, used_negative_operator, facets, max_remote_duration) =
merge_metadata(&mut results_by_index, &remote_results);
// 3.3. merge hits
let merged_hits: Vec<_> = merge_index_global_results(results_by_index, &mut remote_results)
.skip(federation.offset)
.take(federation.limit)
@@ -195,7 +188,7 @@ pub async fn perform_federated_search(
.map(|hit| hit.hit())
.collect();
// 3.3. merge query vectors
// 3.4. merge query vectors
let query_vectors = if retrieve_vectors {
for remote_results in remote_results.iter_mut() {
if let Some(remote_vectors) = remote_results.query_vectors.take() {
@@ -214,7 +207,7 @@ pub async fn perform_federated_search(
None
};
// 3.4. merge facets
// 3.5. merge facets
let (facet_distribution, facet_stats, facets_by_index) =
facet_order.merge(federation.merge_facets, remote_results, facets);
@@ -465,6 +458,7 @@ struct SearchHitByIndex {
struct SearchResultByIndex {
index: String,
primary_key: Option<String>,
hits: Vec<SearchHitByIndex>,
estimated_total_hits: usize,
degraded: bool,
@@ -483,6 +477,7 @@ fn merge_metadata(
let mut max_remote_duration = Duration::ZERO;
for SearchResultByIndex {
index,
primary_key: _,
hits: _,
estimated_total_hits: estimated_total_hits_by_index,
facets: facets_by_index,
@@ -814,6 +809,7 @@ impl SearchByIndex {
}
};
let rtxn = index.read_txn()?;
let primary_key = index.primary_key(&rtxn)?.map(|pk| pk.to_string());
let criteria = index.criteria(&rtxn)?;
let dictionary = index.dictionary(&rtxn)?;
let dictionary: Option<Vec<_>> =
@@ -1066,6 +1062,7 @@ impl SearchByIndex {
})?;
self.results_by_index.push(SearchResultByIndex {
index: index_uid,
primary_key,
hits: merged_result,
estimated_total_hits,
degraded,