Skip to content

Parquet Extension

The Parquet Extension provides a persistent implementation of the HNSW graph algorithm using Apache Parquet for storage. This extension is designed to efficiently store and retrieve high-dimensional vectors while maintaining the graph structure on disk.

Features

  • Persistent Storage: Store your HNSW graph on disk using Apache Parquet files
  • Efficient Vector Storage: Optimized for high-dimensional vector data
  • Batch Operations: Support for adding and retrieving vectors in batches
  • Memory-Mapped Access: Improved performance through memory-mapped file access
  • Incremental Updates: Efficient handling of frequent vector additions, updates, and deletions
  • Automatic Compaction: Periodic consolidation of changes for optimal performance
  • Configurable Parameters: Customize graph parameters and storage options

Architecture

The Parquet-based HNSW implementation consists of several key components:

  1. ParquetGraph: The main graph structure that implements the HNSW algorithm, maintaining the in-memory representation of the graph layers and handling search operations.

  2. ParquetStorage: Manages the underlying Parquet files for storing vectors, layers, and neighbor connections.

  3. VectorStore: Handles efficient storage and retrieval of vectors, with caching and batch operations.

  4. IncrementalStore: Manages incremental updates to vectors, tracking additions, updates, and deletions.

Data Storage

The implementation uses four main Parquet files:

  1. vectors.parquet: Stores the actual vector data, mapping keys to their corresponding vectors.
  2. layers.parquet: Stores the layer structure of the graph, indicating which nodes exist in each layer.
  3. neighbors.parquet: Stores the connections between nodes in each layer.
  4. metadata.parquet: Stores metadata about the graph, such as dimensions and configuration.

For incremental updates, additional files are created in the vector_changes directory, which track changes to vectors over time.

Usage

Installation

import (
    "github.com/TFMV/hnsw"
    "github.com/TFMV/hnsw/hnsw-extensions/parquet"
)

Creating a Parquet Graph

// Create a configuration
config := parquet.DefaultParquetGraphConfig()
config.Storage.Directory = "/path/to/storage"
config.M = 16
config.EfSearch = 100
config.Distance = hnsw.CosineDistance

// Create a new graph
graph, err := parquet.NewParquetGraph[int](config)
if err != nil {
    // Handle error
}
defer graph.Close()

Adding Vectors

// Add a single vector
err := graph.Add(hnsw.Node[int]{
    Key:   1,
    Value: []float32{0.1, 0.2, 0.3},
})

// Add multiple vectors in a batch
nodes := []hnsw.Node[int]{
    {Key: 2, Value: []float32{0.4, 0.5, 0.6}},
    {Key: 3, Value: []float32{0.7, 0.8, 0.9}},
}
err = graph.Add(nodes...)

Searching

// Search for the 5 nearest neighbors
query := []float32{0.2, 0.3, 0.4}
results, err := graph.Search(query, 5)
if err != nil {
    // Handle error
}

// Process results
for i, result := range results {
    fmt.Printf("%d. ID: %d, Distance: %f\n", i+1, result.Key, result.Distance)
}

Deleting Vectors

// Delete a single vector
deleted := graph.Delete(1)

// Delete multiple vectors
keysToDelete := []int{2, 3, 4}
results := graph.BatchDelete(keysToDelete)

Incremental Updates

The Parquet extension supports incremental updates, which allows for efficient handling of frequent vector additions, updates, and deletions without the need to rewrite the entire dataset each time.

// Configure incremental updates
config := parquet.DefaultParquetGraphConfig()
config.Storage.Directory = "/path/to/storage"
config.Incremental.MaxChanges = 500 // Compact after 500 changes
config.Incremental.MaxAge = 30 * time.Minute // Compact after 30 minutes

// Create a new graph
graph, err := parquet.NewParquetGraph[int](config)
if err != nil {
    // Handle error
}
defer graph.Close()

// Add vectors - these will be stored incrementally
for i := 0; i < 1000; i++ {
    node := hnsw.Node[int]{
        Key:   i,
        Value: generateVector(128), // Your vector generation function
    }
    err = graph.Add(node)
    if err != nil {
        // Handle error
    }
}

// Delete some vectors - these will be tracked incrementally
for i := 0; i < 100; i += 10 {
    graph.Delete(i)
}

// Close the graph - this will ensure all changes are persisted
graph.Close()

Configuration Options

ParquetGraphConfig

The ParquetGraphConfig struct allows customization of various parameters:

  • M: Maximum number of connections per node (default: 16)
  • Ml: Level generation factor (default: 0.25)
  • EfSearch: Size of dynamic candidate list during search (default: 20)
  • Distance: Distance function (default: CosineDistance)
  • Storage: Storage configuration (directory, compression, etc.)
  • Incremental: Incremental update configuration

ParquetStorageConfig

The ParquetStorageConfig struct allows customization of storage parameters:

  • Directory: Directory where Parquet files will be stored
  • Compression: Compression codec to use (default: Snappy)
  • BatchSize: Batch size for reading/writing (default: 64MB)
  • MaxRowGroupLength: Maximum row group length (default: 64MB)
  • DataPageSize: Data page size (default: 1MB)
  • MemoryMap: Whether to memory map files when reading (default: true)

IncrementalConfig

The IncrementalConfig struct allows customization of incremental update behavior:

  • MaxChanges: Maximum number of changes before compaction (default: 1000)
  • MaxAge: Maximum age of changes before compaction (default: 1 hour)

Performance Considerations

For optimal performance, consider the following:

  1. Memory Mapping: Enable memory mapping for faster file access, especially for large datasets.
  2. Batch Operations: Use batch operations (Add with multiple nodes) whenever possible.
  3. EfSearch Parameter: Adjust the EfSearch parameter based on your needs - higher values give more accurate results but slower search times.
  4. Compaction Frequency: Adjust the MaxChanges and MaxAge parameters based on your update patterns.
  5. Cache Size: The vector cache size can be adjusted based on available memory.
  6. Parallel Processing: The implementation uses parallel processing for vector operations, which can be tuned based on your hardware.

Benchmark Results

The following benchmark results were obtained on an Apple M2 Pro processor:

BenchmarkParquetGraph_Add-10                2616           5239807 ns/op         7422303 B/op      69369 allocs/op
BenchmarkParquetGraph_BatchAdd-10            100         355033085 ns/op           10.00 nodes/op  44609646461 B/op        9545 allocs/op
BenchmarkParquetGraph_Search-10             8397            127766 ns/op          119551 B/op        359 allocs/op
BenchmarkParquetGraph_Delete-10              678           2369084 ns/op         1988021 B/op      19424 allocs/op

Interpretation

  • Add: Adding a single node takes approximately 5.24ms with moderate memory allocation.
  • BatchAdd: Adding a batch of 10 nodes takes approximately 355ms, which is about 35.5ms per node. While this is slower than individual adds in total time, it's more efficient for large-scale operations as it reduces the overhead of multiple disk writes.
  • Search: Searching for nearest neighbors takes approximately 128μs per operation, which is very fast.
  • Delete: Deleting nodes takes approximately 2.37ms per operation with moderate memory allocation.

Reopening Performance

One of the key optimizations in this implementation is the improved performance when reopening a graph from disk. In our example with 1,000 vectors:

  • Initial search on a newly created graph: ~56μs
  • Search on a reopened graph: ~221ms

This is a significant improvement from previous versions where reopening could take several minutes. The optimizations that contributed to this improvement include:

  1. Automatic compaction of incremental changes during reopening
  2. Efficient caching of base vectors and change logs
  3. Parallel processing of vector retrievals
  4. Lazy loading of vectors instead of eager preloading

Limitations

  1. Memory Usage: For very large datasets, memory usage can be high due to caching.
  2. Write Performance: While incremental updates improve write performance, compaction operations can still be expensive.
  3. Concurrent Access: The implementation is thread-safe, but concurrent writes may be serialized.

Example

Here's a complete example of using the Parquet extension:

package main

import (
    "fmt"
    "math/rand"
    "os"
    "time"

    "github.com/TFMV/hnsw"
    parquet "github.com/TFMV/hnsw/hnsw-extensions/parquet"
)

func main() {
    // Create a temporary directory for the example
    tempDir, err := os.MkdirTemp("", "parquet-hnsw-example")
    if err != nil {
        panic(err)
    }
    defer os.RemoveAll(tempDir)

    // Create a new Parquet-based HNSW graph with default configuration
    config := parquet.DefaultParquetGraphConfig()
    config.Storage.Directory = tempDir
    config.M = 16        // Maximum number of connections per node
    config.Ml = 0.25     // Level generation factor
    config.EfSearch = 50 // Size of dynamic candidate list during search
    config.Distance = hnsw.CosineDistance

    // Configure incremental updates
    config.Incremental.MaxChanges = 100         // Trigger compaction after 100 changes
    config.Incremental.MaxAge = 5 * time.Minute // Trigger compaction after 5 minutes

    graph, err := parquet.NewParquetGraph[int](config)
    if err != nil {
        panic(err)
    }
    defer graph.Close()

    // Add vectors
    dimensions := 128
    numVectors := 1000

    for i := 0; i < numVectors; i++ {
        err := graph.Add(hnsw.Node[int]{
            Key:   i,
            Value: generateRandomVector(dimensions),
        })
        if err != nil {
            panic(err)
        }
    }

    // Perform a search
    query := generateRandomVector(dimensions)
    results, err := graph.Search(query, 10)
    if err != nil {
        panic(err)
    }

    fmt.Println("Search results:")
    for i, result := range results {
        fmt.Printf("  %d. ID: %d, Distance: %.6f\n", i+1, result.Key, result.Distance)
    }

    // Close the graph
    if err := graph.Close(); err != nil {
        panic(err)
    }

    // Reopen the graph
    reopenedGraph, err := parquet.NewParquetGraph[int](config)
    if err != nil {
        panic(err)
    }
    defer reopenedGraph.Close()

    // Search again
    results, err = reopenedGraph.Search(query, 10)
    if err != nil {
        panic(err)
    }

    fmt.Println("Search results after reopening:")
    for i, result := range results {
        fmt.Printf("  %d. ID: %d, Distance: %.6f\n", i+1, result.Key, result.Distance)
    }
}

// generateRandomVector creates a random normalized vector
func generateRandomVector(dimensions int) []float32 {
    vector := make([]float32, dimensions)
    for i := range vector {
        vector[i] = rand.Float32()
    }

    // Normalize the vector
    var sum float32
    for _, v := range vector {
        sum += v * v
    }
    norm := float32(1.0 / float64(sum))
    for i := range vector {
        vector[i] *= norm
    }

    return vector
}