Advanced Techniques¶
This guide covers advanced techniques and patterns for using HNSW in production environments and specialized use cases.
Hybrid Search Approaches¶
Combining HNSW with Exact Search¶
For applications requiring both speed and precision, you can implement a hybrid search approach:
func hybridSearch(graph *hnsw.Graph[int], query []float32, k int, exactThreshold float32) []hnsw.SearchResult[int] {
// First, perform approximate search with HNSW
results := graph.Search(query, k)
// Check if the results meet our quality threshold
if results[0].Distance > exactThreshold {
// If quality is insufficient, perform exact search on a subset
// This could be the top N% of vectors based on a coarse filter
exactResults := performExactSearch(query, k)
return exactResults
}
return results
}
This approach gives you the best of both worlds:
- Fast approximate search for most queries
- Fallback to exact search when precision is critical
Two-Stage Search¶
For very large datasets, a two-stage search can improve performance:
func twoStageSearch(coarseGraph, fineGraph *hnsw.Graph[int], query []float32, k int) []hnsw.SearchResult[int] {
// First stage: search in a coarse graph (fewer vectors, lower dimensionality)
coarseResults := coarseGraph.Search(query, k*5) // Get more candidates
// Extract the original vectors for the coarse results
candidates := make([]int, 0, len(coarseResults))
for _, result := range coarseResults {
candidates = append(candidates, result.Key)
}
// Second stage: refined search in the fine graph, but only among candidates
fineResults := fineGraph.SearchFiltered(query, k, func(id int) bool {
// Check if this ID is in our candidate list
for _, candidate := range candidates {
if id == candidate {
return true
}
}
return false
})
return fineResults
}
This technique is particularly useful for:
- Billion-scale vector collections
- High-dimensional data that can be compressed for initial filtering
Concurrent Operations¶
Thread-Safe Updates¶
When updating the graph from multiple goroutines, proper synchronization is essential:
type ThreadSafeGraph[T comparable] struct {
graph *hnsw.Graph[T]
mu sync.RWMutex
}
func (g *ThreadSafeGraph[T]) Add(vector []float32, id T) {
g.mu.Lock()
defer g.mu.Unlock()
g.graph.Add(vector, id)
}
func (g *ThreadSafeGraph[T]) Search(query []float32, k int) []hnsw.SearchResult[T] {
g.mu.RLock()
defer g.mu.RUnlock()
return g.graph.Search(query, k)
}
func (g *ThreadSafeGraph[T]) Delete(id T) {
g.mu.Lock()
defer g.mu.Unlock()
g.graph.Delete(id)
}
Batch Processing with Worker Pools¶
For processing large batches of operations, a worker pool can improve throughput:
func processBatchWithWorkers(graph *hnsw.Graph[int], vectors [][]float32, ids []int, numWorkers int) {
// Create a worker pool
var wg sync.WaitGroup
jobs := make(chan int, len(vectors))
// Start workers
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range jobs {
// Each worker processes one vector at a time
graph.Add(vectors[idx], ids[idx])
}
}()
}
// Send jobs to workers
for i := range vectors {
jobs <- i
}
close(jobs)
// Wait for all workers to complete
wg.Wait()
}
Memory Optimization¶
Custom Vector Storage¶
For very large datasets, you can implement custom vector storage to reduce memory usage:
type QuantizedVectorStorage struct {
vectors map[int][]byte // 8-bit quantized vectors
quantScale float32
quantOffset float32
}
func (s *QuantizedVectorStorage) Store(id int, vector []float32) {
// Quantize the vector from float32 to uint8
quantized := make([]byte, len(vector))
for i, v := range vector {
// Apply quantization: float32 -> uint8
quantized[i] = byte((v - s.quantOffset) / s.quantScale)
}
s.vectors[id] = quantized
}
func (s *QuantizedVectorStorage) Load(id int) []float32 {
quantized := s.vectors[id]
// Dequantize the vector from uint8 to float32
vector := make([]float32, len(quantized))
for i, q := range quantized {
// Apply dequantization: uint8 -> float32
vector[i] = float32(q)*s.quantScale + s.quantOffset
}
return vector
}
// Connect this storage to HNSW
func NewGraphWithQuantizedStorage[T comparable](storage *QuantizedVectorStorage) *hnsw.Graph[T] {
graph := hnsw.NewGraph[T]()
// Configure the graph to use the custom storage
// Implementation depends on the specific HNSW API
return graph
}
Memory-Mapped Storage¶
For datasets larger than available RAM, memory-mapped files can be used:
type MMapVectorStorage struct {
file *os.File
mmap []byte
dimensions int
vectorSize int
index map[int]int64 // Maps ID to offset in file
}
func NewMMapVectorStorage(filename string, dimensions int) (*MMapVectorStorage, error) {
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
// Initialize or load the file
// ... implementation details ...
// Memory map the file
mmap, err := syscall.Mmap(int(file.Fd()), 0, fileSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
file.Close()
return nil, err
}
return &MMapVectorStorage{
file: file,
mmap: mmap,
dimensions: dimensions,
vectorSize: dimensions * 4, // 4 bytes per float32
index: make(map[int]int64),
}, nil
}
// Implement vector storage methods
// ... implementation details ...
func (s *MMapVectorStorage) Close() error {
if err := syscall.Munmap(s.mmap); err != nil {
return err
}
return s.file.Close()
}
Advanced Filtering¶
Dynamic Filtering¶
Implement dynamic filtering based on complex criteria:
func dynamicFilter(graph *hnsw.Graph[int], query []float32, k int, filters map[string]interface{}) []hnsw.SearchResult[int] {
// Create a filter function based on the provided criteria
filterFunc := func(id int) bool {
metadata := getMetadata(id) // Get metadata for this vector
// Apply all filters
for key, value := range filters {
switch v := value.(type) {
case []interface{}: // Range filter
if min, ok := v[0].(float64); ok {
if max, ok := v[1].(float64); ok {
if val, exists := metadata[key]; exists {
if numVal, ok := val.(float64); ok {
if numVal < min || numVal > max {
return false
}
}
}
}
}
case string: // Exact match
if val, exists := metadata[key]; exists {
if strVal, ok := val.(string); ok {
if strVal != v {
return false
}
}
}
case bool: // Boolean match
if val, exists := metadata[key]; exists {
if boolVal, ok := val.(bool); ok {
if boolVal != v {
return false
}
}
}
}
}
return true
}
// Search with the dynamic filter
return graph.SearchFiltered(query, k, filterFunc)
}
Geo-Spatial Filtering¶
For location-based searches, implement geo-spatial filtering:
type GeoPoint struct {
Lat float64
Lng float64
}
func geoFilter(center GeoPoint, radiusKm float64) func(int) bool {
return func(id int) bool {
// Get the geo coordinates for this vector
point := getGeoPoint(id)
// Calculate distance using the Haversine formula
distance := haversineDistance(center, point)
// Return true if within radius
return distance <= radiusKm
}
}
func haversineDistance(p1, p2 GeoPoint) float64 {
// Earth's radius in kilometers
const R = 6371.0
// Convert degrees to radians
lat1 := p1.Lat * math.Pi / 180.0
lng1 := p1.Lng * math.Pi / 180.0
lat2 := p2.Lat * math.Pi / 180.0
lng2 := p2.Lng * math.Pi / 180.0
// Haversine formula
dlat := lat2 - lat1
dlng := lng2 - lng1
a := math.Sin(dlat/2)*math.Sin(dlat/2) + math.Cos(lat1)*math.Cos(lat2)*math.Sin(dlng/2)*math.Sin(dlng/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
distance := R * c
return distance
}
Incremental Updates¶
Efficient Reindexing¶
For frequently changing datasets, implement incremental updates:
type IncrementalGraph[T comparable] struct {
mainGraph *hnsw.Graph[T]
pendingGraph *hnsw.Graph[T]
pendingCount int
maxPending int
mu sync.RWMutex
}
func (g *IncrementalGraph[T]) Add(vector []float32, id T) {
g.mu.Lock()
defer g.mu.Unlock()
// Add to the pending graph
g.pendingGraph.Add(vector, id)
g.pendingCount++
// If we've reached the threshold, merge into the main graph
if g.pendingCount >= g.maxPending {
g.mergeGraphs()
}
}
func (g *IncrementalGraph[T]) Search(query []float32, k int) []hnsw.SearchResult[T] {
g.mu.RLock()
defer g.mu.RUnlock()
// Search both graphs
mainResults := g.mainGraph.Search(query, k)
pendingResults := g.pendingGraph.Search(query, k)
// Merge and sort results
allResults := append(mainResults, pendingResults...)
sort.Slice(allResults, func(i, j int) bool {
return allResults[i].Distance < allResults[j].Distance
})
// Return top k
if len(allResults) > k {
allResults = allResults[:k]
}
return allResults
}
func (g *IncrementalGraph[T]) mergeGraphs() {
// Create a new main graph with all vectors
newMainGraph := hnsw.NewGraph[T]()
// Copy configuration from the old main graph
// ... implementation details ...
// Add all vectors from the main graph
// ... implementation details ...
// Add all vectors from the pending graph
// ... implementation details ...
// Replace the main graph and reset the pending graph
g.mainGraph = newMainGraph
g.pendingGraph = hnsw.NewGraph[T]()
g.pendingCount = 0
}
Distributed HNSW¶
Sharding Across Multiple Instances¶
For extremely large datasets, implement sharding:
type ShardedHNSW[T comparable] struct {
shards []*hnsw.Graph[T]
numShards int
shardFunc func(vector []float32) int // Function to determine shard
}
func (s *ShardedHNSW[T]) Add(vector []float32, id T) {
// Determine which shard this vector belongs to
shardIdx := s.shardFunc(vector)
// Add to the appropriate shard
s.shards[shardIdx].Add(vector, id)
}
func (s *ShardedHNSW[T]) Search(query []float32, k int) []hnsw.SearchResult[T] {
// Search all shards concurrently
resultChan := make(chan []hnsw.SearchResult[T], s.numShards)
var wg sync.WaitGroup
for i := 0; i < s.numShards; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
results := s.shards[idx].Search(query, k)
resultChan <- results
}(i)
}
// Wait for all searches to complete
wg.Wait()
close(resultChan)
// Collect and merge results
var allResults []hnsw.SearchResult[T]
for results := range resultChan {
allResults = append(allResults, results...)
}
// Sort by distance
sort.Slice(allResults, func(i, j int) bool {
return allResults[i].Distance < allResults[j].Distance
})
// Return top k
if len(allResults) > k {
allResults = allResults[:k]
}
return allResults
}
Performance Monitoring¶
Instrumentation¶
Add instrumentation to monitor HNSW performance:
type InstrumentedGraph[T comparable] struct {
graph *hnsw.Graph[T]
searchLatency metrics.Histogram
addLatency metrics.Histogram
searchRequests metrics.Counter
addRequests metrics.Counter
}
func (g *InstrumentedGraph[T]) Search(query []float32, k int) []hnsw.SearchResult[T] {
g.searchRequests.Inc()
start := time.Now()
results := g.graph.Search(query, k)
duration := time.Since(start)
g.searchLatency.Observe(duration.Seconds())
return results
}
func (g *InstrumentedGraph[T]) Add(vector []float32, id T) {
g.addRequests.Inc()
start := time.Now()
g.graph.Add(vector, id)
duration := time.Since(start)
g.addLatency.Observe(duration.Seconds())
}
Performance Benchmarking¶
Implement benchmarking to tune parameters:
func benchmarkParameters(vectors [][]float32, queries [][]float32, groundTruth [][]int) {
// Test different M values
mValues := []int{8, 16, 32, 64}
// Test different efConstruction values
efConstructionValues := []int{100, 200, 400, 800}
// Test different ef values
efValues := []int{10, 50, 100, 200}
bestRecall := 0.0
bestParams := struct{ m, efConstruction, ef int }{}
for _, m := range mValues {
for _, efConstruction := range efConstructionValues {
// Create and build graph with these parameters
graph := hnsw.NewGraph[int](
hnsw.WithM(m),
hnsw.WithEfConstruction(efConstruction),
)
// Add vectors
for i, vector := range vectors {
graph.Add(vector, i)
}
for _, ef := range efValues {
// Measure recall
recall := measureRecall(graph, queries, groundTruth, ef)
fmt.Printf("M=%d, efConstruction=%d, ef=%d: Recall=%.4f\n",
m, efConstruction, ef, recall)
if recall > bestRecall {
bestRecall = recall
bestParams.m = m
bestParams.efConstruction = efConstruction
bestParams.ef = ef
}
}
}
}
fmt.Printf("Best parameters: M=%d, efConstruction=%d, ef=%d, Recall=%.4f\n",
bestParams.m, bestParams.efConstruction, bestParams.ef, bestRecall)
}
func measureRecall(graph *hnsw.Graph[int], queries [][]float32, groundTruth [][]int, ef int) float64 {
var totalRecall float64
for i, query := range queries {
results := graph.SearchWithEF(query, len(groundTruth[i]), ef)
// Calculate recall for this query
foundCount := 0
for _, result := range results {
for _, gtID := range groundTruth[i] {
if result.Key == gtID {
foundCount++
break
}
}
}
recall := float64(foundCount) / float64(len(groundTruth[i]))
totalRecall += recall
}
return totalRecall / float64(len(queries))
}
Next Steps¶
Now that you've explored advanced techniques for HNSW, you might want to check out:
- Performance Tuning: Learn how to optimize HNSW for your specific use case
- Creating Extensions: Build your own custom extensions
- API Reference: Explore the complete API documentation