Scalable Objects Persistence
Simple, copy-pasteable examples for common SOP scenarios in Python.
This example demonstrates how to store structured data (dictionaries) in a B-Tree using the unified Database API.
import sop
from sop import btree
import json
# 1. Initialize Database
ctx = sop.Context()
db = sop.Database(sop.DatabaseOptions(stores_folders=["/tmp/sop_data"]))
# 2. Run Transaction
with db.begin_transaction(ctx) as t:
# Open/Create Store "users"
# BtreeOptions is optional if you just want default settings.
# You can pass the name directly to new_btree.
store = db.new_btree(ctx, "users", t)
# Add Data
for i in range(1000):
user_id = f"user_{i}"
profile = {
"name": "John Doe",
"email": f"john{i}@example.com",
"active": True
}
# SOP stores values as strings (JSON recommended for complex objects)
# Or use native types if supported by your wrapper version
store.add(ctx, sop.btree.Item(key=user_id, value=json.dumps(profile)))
print("Added 1000 users.")
Atomically update a “Bank Account” and a “Transaction Log” in the same transaction. If any part fails, the entire operation rolls back.
import sop
from sop import btree
import json
import uuid
def transfer_funds(ctx, db, from_id, to_id, amount):
# Start a transaction on the database
with db.begin_transaction(ctx) as t:
# Open both stores in the SAME transaction
# BtreeOptions is optional, name can be passed directly
accounts = db.new_btree(ctx, "accounts", t)
logs = db.new_btree(ctx, "logs", t)
# 1. Deduct from Sender
if accounts.find(ctx, from_id):
sender_item = accounts.get_current_item(ctx)
sender = json.loads(sender_item.value)
sender["balance"] -= amount
accounts.update_current_value(ctx, json.dumps(sender))
# 2. Add to Receiver
if accounts.find(ctx, to_id):
receiver_item = accounts.get_current_item(ctx)
receiver = json.loads(receiver_item.value)
receiver["balance"] += amount
accounts.update_current_value(ctx, json.dumps(receiver))
# 3. Log the Transfer
log_entry = {
"action": "transfer",
"from": from_id,
"to": to_id,
"amount": amount
}
logs.add(ctx, sop.btree.Item(key=str(uuid.uuid4()), value=json.dumps(log_entry)))
# Commit happens automatically on exit of 'with' block
print("Transfer complete.")
# Usage
ctx = sop.Context()
db = sop.Database(sop.DatabaseOptions(stores_folders=["/tmp/sop_data"]))
transfer_funds(ctx, db, "user_1", "user_2", 100)
SOP supports complex keys (structs/dataclasses) and allows you to define how fields are indexed and sorted. This enables efficient multi-column sorting and prefix scanning.
from dataclasses import dataclass
from sop import Context, BtreeOptions, Item, ValueDataSize, PagingInfo
from sop.btree import IndexSpecification, IndexFieldSpecification
from sop.ai import Database, DatabaseType
from sop.database import DatabaseOptions
# 1. Define Key Structure (Region -> Dept -> ID)
@dataclass
class UserKey:
region: str
department: str
employee_id: int
# 2. Define Value Structure
@dataclass
class UserProfile:
name: str
role: str
ctx = Context()
db = Database(DatabaseOptions(stores_folders=["/tmp/sop_complex_db"], type=DatabaseType.Standalone))
with db.begin_transaction(ctx) as t:
# Configure B-Tree for Complex Keys
bo = BtreeOptions("employees", is_unique=True)
bo.is_primitive_key = False
bo.set_value_data_size(ValueDataSize.Small)
# Define Index Hierarchy: Region -> Department -> EmployeeID
idx_spec = IndexSpecification(
index_fields=(
IndexFieldSpecification("region", ascending_sort_order=True),
IndexFieldSpecification("department", ascending_sort_order=True),
IndexFieldSpecification("employee_id", ascending_sort_order=True),
)
)
store = db.new_btree(ctx, "employees", t, options=bo, index_spec=idx_spec)
# Add Data
k1 = UserKey(region="US", department="Eng", employee_id=101)
store.add(ctx, Item(key=k1, value=UserProfile(name="Alice", role="Dev")))
k2 = UserKey(region="US", department="Sales", employee_id=202)
store.add(ctx, Item(key=k2, value=UserProfile(name="Bob", role="Manager")))
print("Added users.")
# Querying
with db.begin_transaction(ctx) as t:
store = db.open_btree(ctx, "employees", t)
# Range Scan: Find all "US" employees
# Since 'region' is the first index field, we can scan efficiently.
if store.first(ctx):
items = store.get_items(ctx, PagingInfo(page_size=100))
for item in items:
if item.key['region'] == "US":
print(f"Found: {item.value['name']} in {item.key['department']}")
Store metadata (like is_deleted flags or timestamps) directly in the Key. This allows you to filter records efficiently using get_keys without fetching the heavy Value payload from disk.
from dataclasses import dataclass
import time
from sop import Context, BtreeOptions, Item, ValueDataSize, PagingInfo
from sop.btree import IndexSpecification, IndexFieldSpecification
from sop.ai import Database, DatabaseType
from sop.database import DatabaseOptions
# 1. Define Key with "Ride-On" Metadata
# Only 'doc_id' is indexed. 'is_deleted' rides along.
@dataclass
class DocKey:
doc_id: int
is_deleted: bool = False
timestamp: int = 0
ctx = Context()
db = Database(DatabaseOptions(stores_folders=["/tmp/sop_meta_db"], type=DatabaseType.Standalone))
with db.begin_transaction(ctx) as t:
bo = BtreeOptions("documents", is_unique=True)
bo.is_primitive_key = False
# Use Big values (implies we want to avoid fetching them unless necessary)
bo.set_value_data_size(ValueDataSize.Big)
# Index ONLY on doc_id
idx_spec = IndexSpecification(
index_fields=(
IndexFieldSpecification("doc_id", ascending_sort_order=True),
)
)
store = db.new_btree(ctx, "documents", t, options=bo, index_spec=idx_spec)
# Add Data
k1 = DocKey(doc_id=100, is_deleted=False, timestamp=int(time.time()))
store.add(ctx, Item(key=k1, value="Large Content A..."))
k2 = DocKey(doc_id=101, is_deleted=True, timestamp=int(time.time()))
store.add(ctx, Item(key=k2, value="Large Content B..."))
## 5. Simplified Lookup (Dictionary Keys)
While the Producer application (which creates the store) should use strict Dataclasses and IndexSpecifications to ensure data integrity, Consumer applications can be loosely coupled. You can search for items using plain Python dictionaries as long as the keys match the structure defined in the B-Tree.
```python
import sop
from sop import Context, Item
from sop.ai import Database, DatabaseType
from sop.database import DatabaseOptions
# Initialize Context
ctx = Context()
db = Database(DatabaseOptions(stores_folders=["/tmp/sop_complex_db"], type=DatabaseType.Standalone))
with db.begin_transaction(ctx) as t:
# Open existing B-Tree
# Note: No IndexSpecification needed here; it's loaded from the store's metadata.
store = db.open_btree(ctx, "employees", t)
# Search using a plain dictionary
# The dictionary keys must match the field names defined in the IndexSpecification.
search_key = {
"region": "US",
"department": "Sales",
"employee_id": 202
}
if store.find(ctx, search_key):
# Fetch the value
# We pass the same dict as the key to get_values
items = store.get_values(ctx, Item(key=search_key))
if items:
# The value is returned as a dict (since we didn't provide a dataclass)
user_profile = items[0].value
print(f"Found: {user_profile['name']}")
with db.begin_transaction(ctx) as t: store = db.open_btree(ctx, “documents”, t) if store.first(ctx): # get_keys fetches ONLY the keys. Values remain on disk. keys = store.get_keys(ctx, PagingInfo(page_size=100))
for item in keys:
status = "[DELETED]" if item.key['is_deleted'] else "[ACTIVE]"
print(f"Doc {item.key['doc_id']}: {status}") ```
Store and search vector embeddings using the unified Database.
import sop
from sop.ai import Database as AIDatabase, Item
ctx = sop.Context()
db = AIDatabase(sop.DatabaseOptions(stores_folders=["/tmp/sop_vectors"]))
with db.begin_transaction(ctx) as tx:
store = db.open_vector_store(ctx, tx, "products")
# Upsert
store.upsert(ctx, Item(
id="prod_101",
vector=[0.1, 0.5, 0.9],
payload={"name": "Laptop", "price": 999}
))
# Search
hits = store.query(
ctx,
vector=[0.1, 0.5, 0.8],
k=5,
filter={"name": "Laptop"}
)
for hit in hits:
print(f"Match: {hit.id}, Score: {hit.score}")
Version and manage machine learning models.
import sop
from sop.ai import Database as AIDatabase, Model
ctx = sop.Context()
db = AIDatabase(ctx, storage_path="/tmp/sop_data")
with db.begin_transaction(ctx) as tx:
model_store = db.open_model_store(ctx, tx, "my_models")
# Save Model
model = Model(
id="churn_v1",
algorithm="random_forest",
parameters=[0.5, 0.1, 0.9],
metrics={"accuracy": 0.98}
)
model_store.save(ctx, "classifiers", "churn_v1", model)
# Load Model
loaded = model_store.get(ctx, "classifiers", "churn_v1")
print(f"Loaded: {loaded['model']['id']}")
Connect to different tenants using isolated Redis databases and Cassandra Keyspaces.
import sop
from sop.database import Database, DatabaseOptions, DatabaseType, RedisCacheConfig
ctx = sop.Context()
# 1. Connect to Tenant A
# Uses Redis DB 0 and Cassandra Keyspace 'tenant_a'
db_a = Database(DatabaseOptions(
keyspace="tenant_a",
type=DatabaseType.Clustered,
redis_config=RedisCacheConfig(url="redis://localhost:6379/0")
))
# 2. Connect to Tenant B
# Uses Redis DB 1 and Cassandra Keyspace 'tenant_b'
db_b = Database(DatabaseOptions(
keyspace="tenant_b",
type=DatabaseType.Clustered,
redis_config=RedisCacheConfig(url="redis://localhost:6379/1")
))
# Operations on db_a are completely isolated from db_b
with db_a.begin_transaction(ctx) as t:
store = db_a.new_btree(ctx, "config", t)
store.add(ctx, sop.btree.Item(key="theme", value="dark"))
# Commit happens automatically
# Note: You do NOT need to call Redis.initialize() globally when using per-database config.
Index and search text documents transactionally.
import sop
from sop import database
ctx = sop.Context()
db = database.Database(ctx, storage_path="/tmp/sop_data")
# 1. Index Documents
with db.begin_transaction(ctx) as t:
idx = db.open_search(ctx, "articles", t)
idx.add("doc1", "The quick brown fox")
idx.add("doc2", "jumps over the lazy dog")
# 2. Search
with db.begin_transaction(ctx) as t:
idx = db.open_search(ctx, "articles", t)
results = idx.search("fox")
for r in results:
print(f"Doc: {r.DocID}, Score: {r.Score}")
You can remove a B-Tree store from the database. This action is permanent and deletes all data associated with the store.
import sop
from sop import btree
# 1. Initialize Database
ctx = sop.Context()
db = sop.Database(sop.DatabaseOptions(stores_folders=["/tmp/sop_data"]))
# 2. Create a store to delete
with db.begin_transaction(ctx) as t:
bo = btree.BtreeOptions("temp_store")
store = db.new_btree(ctx, "temp_store", t, bo)
store.add(ctx, sop.btree.Item(key="foo", value="bar"))
print("Created 'temp_store' and added data.")
# 3. Remove the store
# Note: RemoveBtree is a database-level operation and does not require an active transaction.
try:
db.remove_btree(ctx, "temp_store")
print("Successfully removed 'temp_store'.")
except Exception as e:
print(f"Failed to remove store: {e}")
# 4. Verify removal
with db.begin_transaction(ctx) as t:
try:
store = db.open_btree(ctx, "temp_store", t)
print("Store still exists (unexpected).")
except Exception:
print("Store not found (expected).")
SOP supports “Swarm Computing” where multiple threads or processes can modify the same B-Tree concurrently without external locks. SOP handles conflict detection and merging automatically.
Important: You must pre-seed the B-Tree with at least one item in a separate transaction before launching concurrent workers. This establishes the root node and prevents race conditions during initialization.
Note: This requirement is simply to have at least one item in the tree. It can be a real application item or a dummy seed item.
import threading
import time
import random
from sop import Context, Database, DatabaseOptions, DatabaseType, Item
def worker(thread_id, db, ctx, items_per_thread):
retry_count = 0
committed = False
while not committed and retry_count < 10:
try:
# Each thread starts its own transaction
# No external locks needed!
with db.begin_transaction(ctx) as t:
btree = db.open_btree(ctx, "concurrent_tree", t)
for j in range(items_per_thread):
# Unique keys per thread -> No conflicts, SOP merges changes
key = (thread_id * items_per_thread) + j
btree.add(ctx, Item(key=key, value=f"Thread {thread_id} - Item {j}"))
# Commit happens here
committed = True
print(f"Thread {thread_id} committed.")
except Exception as e:
retry_count += 1
time.sleep(random.random() * 0.5) # Backoff
# Usage
ctx = Context()
# Option A: Standalone (Local disk or shared Network drive, In-Memory Cache) - Good for single-node concurrency
db = Database(DatabaseOptions(stores_folders=["/tmp/sop_swarm"], type=DatabaseType.Standalone))
# Option B: Clustered (Redis Cache) - Required for multi-process/distributed swarm
# db = Database(DatabaseOptions(stores_folders=["/tmp/sop_swarm"], type=DatabaseType.Clustered))
# 1. Pre-seed (Required for Swarm)
with db.begin_transaction(ctx) as t:
btree = db.new_btree(ctx, "concurrent_tree", t)
btree.add(ctx, Item(key=-1, value="Root Seed"))
# 2. Launch Threads
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, db, ctx, 100))
threads.append(t)
t.start()
for t in threads:
t.join()
SOP uses Relations Metadata to define connections between stores. This metadata is sufficient for the Join Tool and AI Agents to navigate most relationships (One-to-One, One-to-Many, Many-to-One).
You should always register relations in BtreeOptions when a store contains foreign keys.
from sop.btree import BtreeOptions, Relation
# Register: 'orders.user_id' -> 'users.id'
opts = BtreeOptions(
relations=[
Relation(
source_fields=["user_id"],
target_store="users",
target_fields=["id"]
)
]
)
order_store = db.new_btree(ctx, "orders", t, options=opts)
Benefit: The metadata allows the system to resolve lookups automatically using the indexed fields on the records.
For Many-to-Many relationships (e.g., Students <-> Classes), you need a dedicated Link Store because neither entity holds a unique pointer to the other.
StudentIDClassIDStudentID:ClassID# Create the Link Store for M:N mapping
with db.begin_transaction(ctx) as t:
link_store = db.new_btree(ctx, "student_classes", t)
# Add a link (Student 123 -> Class ABC)
link_store.add(ctx, sop.Item(key="s123:cABC", value=""))