Scalable Objects Persistence
This cookbook provides practical examples and patterns for using the SOP AI library.
The Vector Store is the core component for RAG and semantic search.
Initialize a local, file-backed vector database.
package main
import (
"fmt"
"context"
"github.com/sharedcode/sop/ai"
"github.com/sharedcode/sop/ai/database"
)
func main() {
// 1. Create the Database Manager
// Standalone mode = Local Filesystem + In-Memory Cache
db := database.NewDatabase(sop.DatabaseOptions{
Type: sop.Standalone,
StoresFolders: []string{"./my_vector_db"},
})
// 2. Configure Storage Path
// db.SetStoragePath("./my_vector_db") // Done in NewDatabase
// 3. Open a Domain (Index)
// This creates/opens a specific "table" or "collection" named "documents"
ctx := context.Background()
trans, _ := db.BeginTransaction(ctx, sop.ForWriting)
idx, _ := db.OpenVectorStore(ctx, "documents", trans, vector.Config{})
fmt.Println("Vector Store opened successfully!")
// Don't forget to commit if you made changes (though Open just initializes)
trans.Commit(ctx)
}
Initialize a clustered vector database using Redis for caching.
func main() {
// 1. Create the Database Manager
// Clustered mode = Local Filesystem (Shared) + Redis Cache
// Note: Redis is NOT used for data storage, just for coordination & to offer built-in caching.
// Ensure Redis is running on localhost:6379
db := database.NewDatabase(sop.DatabaseOptions{
Type: sop.Clustered,
StoresFolders: []string{"./my_cluster_db"},
})
// 2. Configure Storage Path
// db.SetStoragePath("./my_cluster_db") // Done in NewDatabase
// 3. Open a Domain (Index)
ctx := context.Background()
trans, _ := db.BeginTransaction(ctx, sop.ForWriting)
idx, _ := db.OpenVectorStore(ctx, "documents", trans, vector.Config{})
fmt.Println("Clustered Vector Store opened successfully!")
trans.Commit(ctx)
}
Add items (vectors + metadata) to the store.
func ingestData(ctx context.Context, idx ai.VectorStore[map[string]any]) {
// Define an item
item := ai.Item[map[string]any]{
ID: "doc-101",
Vector: []float32{0.1, 0.2, 0.3, 0.4}, // Normalized vector
Payload: map[string]any{
"title": "SOP Guide",
"tags": []string{"tech", "database"},
},
}
// Upsert (Insert or Update)
if err := idx.Upsert(ctx, item); err != nil {
panic(err)
}
// Batch Upsert (More efficient for bulk load)
items := []ai.Item[map[string]any]{
{ID: "doc-102", Vector: []float32{0.5, 0.5, 0.5, 0.5}, Payload: map[string]any{"title": "AI Agents"}},
{ID: "doc-103", Vector: []float32{0.9, 0.1, 0.1, 0.9}, Payload: map[string]any{"title": "Vector Search"}},
}
if err := idx.UpsertBatch(ctx, items); err != nil {
panic(err)
}
}
### Performance Tuning: Deduplication
By default, the Vector Store checks if an item ID already exists before inserting (to prevent duplicates). If you know your data is unique (e.g., during initial bulk load), you can disable this check to improve ingestion speed.
```go
func fastIngest(ctx context.Context, idx ai.VectorStore[map[string]any]) {
// Disable deduplication check for raw speed
// WARNING: Only do this if you are certain IDs are unique!
if store, ok := idx.(interface{ SetDeduplication(bool) }); ok {
store.SetDeduplication(false)
}
// ... perform bulk upsert ...
}
Rebalance the vector clusters for optimal search performance. This is especially important after bulk ingestion or significant updates.
Note: The Optimize process puts the store into a Read-Only mode. Any Upsert or Delete calls during this time will fail.
func optimizeIndex(ctx context.Context, idx ai.VectorStore[map[string]any]) {
// Triggers a rebalancing of the centroids.
// This runs in batches and can handle millions of records.
if err := idx.Optimize(ctx); err != nil {
panic(err)
}
fmt.Println("Index optimized successfully.")
}
func handleReadOnlyError(err error) {
// If you try to write during optimization, you'll get an error.
if err != nil && err.Error() == "Vector Store is currently optimizing (Read-Only mode)" {
fmt.Println("Store is busy optimizing. Please try again later.")
}
}
### Searching (Query)
Find the nearest neighbors to a query vector.
```go
func search(idx ai.VectorStore[map[string]any]) {
queryVec := []float32{0.1, 0.2, 0.3, 0.4}
k := 5 // Number of results
// Perform Search
hits, err := idx.Query(context.Background(), queryVec, k, nil)
if err != nil {
panic(err)
}
for _, hit := range hits {
fmt.Printf("ID: %s, Score: %.4f, Title: %v\n",
hit.ID, hit.Score, hit.Payload["title"])
}
}
Apply metadata filters to search results.
func searchWithFilter(idx ai.VectorStore[map[string]any]) {
queryVec := []float32{0.1, 0.2, 0.3, 0.4}
// Define Filter Function
filter := func(payload map[string]any) bool {
// Only return items with "tech" tag
tags, ok := payload["tags"].([]string)
if !ok {
return false
}
for _, t := range tags {
if t == "tech" {
return true
}
}
return false
}
hits, _ := idx.Query(context.Background(), queryVec, 5, filter)
// ... process hits
}
Remove items by ID.
func deleteItem(idx ai.VectorStore[map[string]any]) {
if err := idx.Delete(context.Background(), "doc-101"); err != nil {
panic(err)
}
}
The Model Store allows you to persist AI model artifacts (weights, configs) alongside your data.
Save a struct or any serializable object as a named model.
import (
"context"
"github.com/sharedcode/sop/ai"
"github.com/sharedcode/sop/ai/database"
)
type MyPerceptron struct {
Weights []float64
Bias float64
}
func saveModel() {
// Initialize Database
db := database.NewDatabase(sop.DatabaseOptions{
Type: sop.Standalone,
StoresFolders: []string{"./my_models"},
})
ctx := context.Background()
trans, _ := db.BeginTransaction(ctx, sop.ForWriting)
store, _ := db.OpenModelStore(ctx, "default", trans)
model := MyPerceptron{
Weights: []float64{0.5, -0.2, 1.0},
Bias: 0.1,
}
// Save
if err := store.Save(ctx, "classifiers", "perceptron_v1", model); err != nil {
panic(err)
}
trans.Commit(ctx)
}
Retrieve a model by name.
func loadModel() {
db := database.NewDatabase(sop.DatabaseOptions{
Type: sop.Standalone,
StoresFolders: []string{"./my_models"},
})
ctx := context.Background()
trans, _ := db.BeginTransaction(ctx, sop.ForReading)
store, _ := db.OpenModelStore(ctx, "default", trans)
var loadedModel MyPerceptron
if err := store.Load(ctx, "classifiers", "perceptron_v1", &loadedModel); err != nil {
panic(err)
}
trans.Commit(ctx)
fmt.Printf("Loaded Bias: %f\n", loadedModel.Bias)
}
See what’s in the store.
func listModels() {
db := database.NewDatabase(sop.DatabaseOptions{
Type: sop.Standalone,
StoresFolders: []string{"./my_models"},
})
ctx := context.Background()
trans, _ := db.BeginTransaction(ctx, sop.ForReading)
store, _ := db.OpenModelStore(ctx, "default", trans)
names, _ := store.List(ctx, "classifiers")
trans.Commit(ctx)
for _, name := range names {
fmt.Println("Found model:", name)
}
}
Use BTreeModelStore to update models and vectors atomically. This ensures that your model weights always match the vector index version.
import (
"github.com/sharedcode/sop"
"github.com/sharedcode/sop/ai"
"github.com/sharedcode/sop/database"
"github.com/sharedcode/sop/ai/vector"
)
func atomicUpdate() {
// 1. Start a Transaction
// (Assuming we have a transaction factory or manager set up)
trans, _ := db.BeginTransaction(ctx, sop.ForWriting)
// 2. Open Transactional Stores
// Vector Store (bound to transaction)
vecStore, _ := db.OpenVectorStore(ctx, "documents", trans, vector.Config{})
// Model Store (bound to transaction)
modelStore, _ := db.OpenModelStore(ctx, "default", trans)
// 3. Perform Updates
// Update Vector
vecStore.Upsert(ctx, ai.Item[any]{ID: "doc-1", Vector: newVec, Payload: "updated"})
// Update Model
newWeights := trainOn(newVec)
modelStore.Save(ctx, "classifiers", "embedding_model", newWeights)
// 4. Commit (All or Nothing)
// If this fails, neither the vector nor the model is updated.
trans.Commit(ctx)
}
### Transaction Hooks (OnCommit)
You can register callbacks to be executed only after a successful commit. This is useful for cleaning up temporary resources or triggering side effects (like notifications) that should only happen if the data is safely persisted.
```go
func transactionWithHook(ctx context.Context, trans sop.Transaction) {
trans.Begin(ctx)
// Perform updates...
// ...
// Register a hook
trans.OnCommit(func(ctx context.Context) error {
fmt.Println("Transaction committed successfully! Sending notification...")
// sendNotification()
return nil
})
// If Commit fails, the hook is NOT executed.
if err := trans.Commit(ctx); err != nil {
fmt.Println("Commit failed, hook skipped.")
}
}
```