Parquet Extension¶
The Parquet Extension provides a persistent implementation of the HNSW graph algorithm using Apache Parquet for storage. This extension is designed to efficiently store and retrieve high-dimensional vectors while maintaining the graph structure on disk.
Features¶
- Persistent Storage: Store your HNSW graph on disk using Apache Parquet files
- Efficient Vector Storage: Optimized for high-dimensional vector data
- Batch Operations: Support for adding and retrieving vectors in batches
- Memory-Mapped Access: Improved performance through memory-mapped file access
- Incremental Updates: Efficient handling of frequent vector additions, updates, and deletions
- Automatic Compaction: Periodic consolidation of changes for optimal performance
- Configurable Parameters: Customize graph parameters and storage options
Architecture¶
The Parquet-based HNSW implementation consists of several key components:
-
ParquetGraph: The main graph structure that implements the HNSW algorithm, maintaining the in-memory representation of the graph layers and handling search operations.
-
ParquetStorage: Manages the underlying Parquet files for storing vectors, layers, and neighbor connections.
-
VectorStore: Handles efficient storage and retrieval of vectors, with caching and batch operations.
-
IncrementalStore: Manages incremental updates to vectors, tracking additions, updates, and deletions.
Data Storage¶
The implementation uses four main Parquet files:
- vectors.parquet: Stores the actual vector data, mapping keys to their corresponding vectors.
- layers.parquet: Stores the layer structure of the graph, indicating which nodes exist in each layer.
- neighbors.parquet: Stores the connections between nodes in each layer.
- metadata.parquet: Stores metadata about the graph, such as dimensions and configuration.
For incremental updates, additional files are created in the vector_changes
directory, which track changes to vectors over time.
Usage¶
Installation¶
Creating a Parquet Graph¶
// Create a configuration
config := parquet.DefaultParquetGraphConfig()
config.Storage.Directory = "/path/to/storage"
config.M = 16
config.EfSearch = 100
config.Distance = hnsw.CosineDistance
// Create a new graph
graph, err := parquet.NewParquetGraph[int](config)
if err != nil {
// Handle error
}
defer graph.Close()
Adding Vectors¶
// Add a single vector
err := graph.Add(hnsw.Node[int]{
Key: 1,
Value: []float32{0.1, 0.2, 0.3},
})
// Add multiple vectors in a batch
nodes := []hnsw.Node[int]{
{Key: 2, Value: []float32{0.4, 0.5, 0.6}},
{Key: 3, Value: []float32{0.7, 0.8, 0.9}},
}
err = graph.Add(nodes...)
Searching¶
// Search for the 5 nearest neighbors
query := []float32{0.2, 0.3, 0.4}
results, err := graph.Search(query, 5)
if err != nil {
// Handle error
}
// Process results
for i, result := range results {
fmt.Printf("%d. ID: %d, Distance: %f\n", i+1, result.Key, result.Distance)
}
Deleting Vectors¶
// Delete a single vector
deleted := graph.Delete(1)
// Delete multiple vectors
keysToDelete := []int{2, 3, 4}
results := graph.BatchDelete(keysToDelete)
Incremental Updates¶
The Parquet extension supports incremental updates, which allows for efficient handling of frequent vector additions, updates, and deletions without the need to rewrite the entire dataset each time.
// Configure incremental updates
config := parquet.DefaultParquetGraphConfig()
config.Storage.Directory = "/path/to/storage"
config.Incremental.MaxChanges = 500 // Compact after 500 changes
config.Incremental.MaxAge = 30 * time.Minute // Compact after 30 minutes
// Create a new graph
graph, err := parquet.NewParquetGraph[int](config)
if err != nil {
// Handle error
}
defer graph.Close()
// Add vectors - these will be stored incrementally
for i := 0; i < 1000; i++ {
node := hnsw.Node[int]{
Key: i,
Value: generateVector(128), // Your vector generation function
}
err = graph.Add(node)
if err != nil {
// Handle error
}
}
// Delete some vectors - these will be tracked incrementally
for i := 0; i < 100; i += 10 {
graph.Delete(i)
}
// Close the graph - this will ensure all changes are persisted
graph.Close()
Configuration Options¶
ParquetGraphConfig¶
The ParquetGraphConfig
struct allows customization of various parameters:
M
: Maximum number of connections per node (default: 16)Ml
: Level generation factor (default: 0.25)EfSearch
: Size of dynamic candidate list during search (default: 20)Distance
: Distance function (default: CosineDistance)Storage
: Storage configuration (directory, compression, etc.)Incremental
: Incremental update configuration
ParquetStorageConfig¶
The ParquetStorageConfig
struct allows customization of storage parameters:
Directory
: Directory where Parquet files will be storedCompression
: Compression codec to use (default: Snappy)BatchSize
: Batch size for reading/writing (default: 64MB)MaxRowGroupLength
: Maximum row group length (default: 64MB)DataPageSize
: Data page size (default: 1MB)MemoryMap
: Whether to memory map files when reading (default: true)
IncrementalConfig¶
The IncrementalConfig
struct allows customization of incremental update behavior:
MaxChanges
: Maximum number of changes before compaction (default: 1000)MaxAge
: Maximum age of changes before compaction (default: 1 hour)
Performance Considerations¶
For optimal performance, consider the following:
- Memory Mapping: Enable memory mapping for faster file access, especially for large datasets.
- Batch Operations: Use batch operations (Add with multiple nodes) whenever possible.
- EfSearch Parameter: Adjust the EfSearch parameter based on your needs - higher values give more accurate results but slower search times.
- Compaction Frequency: Adjust the MaxChanges and MaxAge parameters based on your update patterns.
- Cache Size: The vector cache size can be adjusted based on available memory.
- Parallel Processing: The implementation uses parallel processing for vector operations, which can be tuned based on your hardware.
Benchmark Results¶
The following benchmark results were obtained on an Apple M2 Pro processor:
BenchmarkParquetGraph_Add-10 2616 5239807 ns/op 7422303 B/op 69369 allocs/op
BenchmarkParquetGraph_BatchAdd-10 100 355033085 ns/op 10.00 nodes/op 44609646461 B/op 9545 allocs/op
BenchmarkParquetGraph_Search-10 8397 127766 ns/op 119551 B/op 359 allocs/op
BenchmarkParquetGraph_Delete-10 678 2369084 ns/op 1988021 B/op 19424 allocs/op
Interpretation¶
- Add: Adding a single node takes approximately 5.24ms with moderate memory allocation.
- BatchAdd: Adding a batch of 10 nodes takes approximately 355ms, which is about 35.5ms per node. While this is slower than individual adds in total time, it's more efficient for large-scale operations as it reduces the overhead of multiple disk writes.
- Search: Searching for nearest neighbors takes approximately 128μs per operation, which is very fast.
- Delete: Deleting nodes takes approximately 2.37ms per operation with moderate memory allocation.
Reopening Performance¶
One of the key optimizations in this implementation is the improved performance when reopening a graph from disk. In our example with 1,000 vectors:
- Initial search on a newly created graph: ~56μs
- Search on a reopened graph: ~221ms
This is a significant improvement from previous versions where reopening could take several minutes. The optimizations that contributed to this improvement include:
- Automatic compaction of incremental changes during reopening
- Efficient caching of base vectors and change logs
- Parallel processing of vector retrievals
- Lazy loading of vectors instead of eager preloading
Limitations¶
- Memory Usage: For very large datasets, memory usage can be high due to caching.
- Write Performance: While incremental updates improve write performance, compaction operations can still be expensive.
- Concurrent Access: The implementation is thread-safe, but concurrent writes may be serialized.
Example¶
Here's a complete example of using the Parquet extension:
package main
import (
"fmt"
"math/rand"
"os"
"time"
"github.com/TFMV/hnsw"
parquet "github.com/TFMV/hnsw/hnsw-extensions/parquet"
)
func main() {
// Create a temporary directory for the example
tempDir, err := os.MkdirTemp("", "parquet-hnsw-example")
if err != nil {
panic(err)
}
defer os.RemoveAll(tempDir)
// Create a new Parquet-based HNSW graph with default configuration
config := parquet.DefaultParquetGraphConfig()
config.Storage.Directory = tempDir
config.M = 16 // Maximum number of connections per node
config.Ml = 0.25 // Level generation factor
config.EfSearch = 50 // Size of dynamic candidate list during search
config.Distance = hnsw.CosineDistance
// Configure incremental updates
config.Incremental.MaxChanges = 100 // Trigger compaction after 100 changes
config.Incremental.MaxAge = 5 * time.Minute // Trigger compaction after 5 minutes
graph, err := parquet.NewParquetGraph[int](config)
if err != nil {
panic(err)
}
defer graph.Close()
// Add vectors
dimensions := 128
numVectors := 1000
for i := 0; i < numVectors; i++ {
err := graph.Add(hnsw.Node[int]{
Key: i,
Value: generateRandomVector(dimensions),
})
if err != nil {
panic(err)
}
}
// Perform a search
query := generateRandomVector(dimensions)
results, err := graph.Search(query, 10)
if err != nil {
panic(err)
}
fmt.Println("Search results:")
for i, result := range results {
fmt.Printf(" %d. ID: %d, Distance: %.6f\n", i+1, result.Key, result.Distance)
}
// Close the graph
if err := graph.Close(); err != nil {
panic(err)
}
// Reopen the graph
reopenedGraph, err := parquet.NewParquetGraph[int](config)
if err != nil {
panic(err)
}
defer reopenedGraph.Close()
// Search again
results, err = reopenedGraph.Search(query, 10)
if err != nil {
panic(err)
}
fmt.Println("Search results after reopening:")
for i, result := range results {
fmt.Printf(" %d. ID: %d, Distance: %.6f\n", i+1, result.Key, result.Distance)
}
}
// generateRandomVector creates a random normalized vector
func generateRandomVector(dimensions int) []float32 {
vector := make([]float32, dimensions)
for i := range vector {
vector[i] = rand.Float32()
}
// Normalize the vector
var sum float32
for _, v := range vector {
sum += v * v
}
norm := float32(1.0 / float64(sum))
for i := range vector {
vector[i] *= norm
}
return vector
}