KasprTables - Distributed Key-Value State
What Is a KasprTable?
A KasprTable is a Kubernetes custom resource that declares a distributed key-value store embedded within a KasprApp. Tables let your stream processors maintain persistent state — caches, counters, lookup data, or any data you need to read and write during event processing.
Key Features:
- Backed by a Kafka changelog topic for durability and fault tolerance
- Stored locally in an embedded RocksDB database for fast access
- Agents, Tasks, and WebViews can all reference tables
- Supports Normal (partitioned) and Global (fully replicated) modes
- Custom default values via Python functions
- Configurable serialization and partitioning
How Tables Work
Each KasprTable creates a Kafka topic (called a changelog). Every write to the table produces a message to this topic. On startup or rebalance, the table replays the changelog to rebuild its local state in RocksDB.
Agent writes table[key] = value
│
▼
┌──────────┐ ┌───────────────┐
│ RocksDB │ ◄────── │ Changelog │
│ (local) │ ──────► │ Topic (Kafka) │
└──────────┘ └───────────────┘
│
▼
Agent reads table.get(key)This gives you:
- Low-latency reads — data lives in the local RocksDB instance
- Durability — if a pod crashes, the changelog topic replays to rebuild state
- Scalability — normal tables are partitioned, so each pod owns a slice of the data
Tables are automatically managed by the Kaspr runtime. You don’t need to create Kafka topics or manage RocksDB — just declare the table and use it.
Normal vs. Global Tables
Normal Tables (Default)
A normal table distributes data across partitions. Each application instance (pod) owns a subset of the keys, determined by key hashing. This is the default and most scalable option.
Use normal tables when:
- Data is naturally partitioned (e.g., by user ID, order ID)
- Each event only needs to access data for its own partition key
- You need horizontal scalability
Global Tables
A global table replicates all data to every application instance. Each pod has a complete copy.
spec:
name: config-data
global: trueUse global tables when:
- The dataset is small (reference/lookup data)
- Any event might need to access any key (cross-partition lookups)
- You need broadcast-style updates
Global tables consume more memory and network bandwidth since every instance holds a full copy. Use them only for small, frequently-read reference data.
Creating a Table
Basic Table
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
name: user-preferences
labels:
kaspr.io/app: my-app
spec:
name: user-preferences
description: "Stores user notification preferences"
keySerializer: json
valueSerializer: json
partitions: 8Apply it:
kubectl apply -f user-preferences-table.yamlCheck the status:
kubectl get ktable user-preferencesktable and ktables are shorthand aliases for kasprtable / kasprtables.
Table with Default Values
When reading a key that doesn’t exist, tables normally return None. You can provide a defaultSelector — a Python function that returns a class type used to initialize missing keys:
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
name: counters
labels:
kaspr.io/app: my-app
spec:
name: counters
description: "Event counters with int default"
defaultSelector:
python: |
def default_type():
return intWith this configuration, reading a missing key returns 0 (the default int() value) instead of None. This is useful for counter patterns where you want to increment without checking for existence:
def count_events(value, counters):
# No need to check if key exists — defaults to int (0)
counters[value["category"]] += 1
return valueThe defaultSelector function must return a class type (e.g., int, dict, list), not an instance. The runtime validates this with inspect.isclass().
Global Table
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
name: product-catalog
labels:
kaspr.io/app: my-app
spec:
name: product-catalog
description: "Product reference data — replicated to all instances"
global: true
keySerializer: json
valueSerializer: json
partitions: 4Table with Extra Topic Configuration
You can pass Kafka topic configuration properties for the changelog topic:
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
name: api-cache
labels:
kaspr.io/app: my-app
spec:
name: api-cache
description: "Caches API responses with 24h retention"
keySerializer: json
valueSerializer: json
partitions: 8
extraTopicConfigs:
retention.ms: "86400000" # 24 hours
cleanup.policy: "compact"Common extraTopicConfigs options:
| Config | Description | Example |
|---|---|---|
retention.ms | How long to retain messages | "86400000" (24h), "-1" (forever) |
cleanup.policy | compact (keep latest per key) or delete | "compact" |
segment.bytes | Log segment size | "1073741824" (1GB) |
min.compaction.lag.ms | Minimum time before compaction | "60000" (1min) |
Using Tables in Agents
Tables are injected into agent operations as Python function parameters. This is how you read from and write to table state during event processing.
Step 1: Declare the Table Reference
In your agent’s operation, add a tables block that maps the table resource name to a Python parameter name:
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
name: event-enricher
labels:
kaspr.io/app: my-app
spec:
input:
topic:
name: raw-events
output:
topics:
- name: enriched-events
processors:
pipeline:
- enrich
operations:
- name: enrich
tables:
- name: user-preferences # KasprTable resource name
paramName: prefs # Python parameter name
map:
python: |
def enrich(value, prefs):
user_id = value["user_id"]
user_pref = prefs.get(user_id)
if user_pref:
value["notification_channel"] = user_pref.get("channel", "email")
return valueStep 2: Reading from Tables
Use standard dict-like operations to read table data:
# Get a value (returns None if key doesn't exist)
result = table.get(key)
# Get with a default
result = table.get(key, "fallback_value")Step 3: Writing to Tables
Agents can write to tables to persist state:
def track_last_seen(value, tracker):
user_id = value["user_id"]
tracker[user_id] = {
"last_seen": value["timestamp"],
"event_count": tracker.get(user_id, {}).get("event_count", 0) + 1
}
return valueStep 4: Deleting from Tables
def cleanup(value, cache):
if value.get("action") == "delete":
key = value["id"]
del cache[key]
return valueAn operation can reference multiple tables. Each table gets its own paramName:
tables:
- name: user-preferences
paramName: prefs
- name: api-cache
paramName: cachedef process(value, prefs, cache):
# Access both tables
user = prefs.get(value["user_id"])
cached = cache.get(value["api_key"])
...Using Tables in WebViews
WebViews can read from tables to serve data over HTTP, but cannot write to them. Only Agents can mutate table state.
apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
name: user-lookup
labels:
kaspr.io/app: my-app
spec:
name: user-lookup
request:
method: GET
path: /user/{user_id}
response:
processors:
pipeline:
- lookup
operations:
- name: lookup
tables:
- name: user-preferences
paramName: prefs
map:
python: |
def lookup(value, prefs):
user_id = value["user_id"]
return prefs.get(user_id, {"error": "not found"})Serialization
Tables support two serializer modes for keys and values:
| Serializer | Description | Use When |
|---|---|---|
json (default) | JSON encoding/decoding | Data is structured (dicts, lists, strings, numbers) |
raw | Raw bytes — no serialization | Data is already bytes or you handle serialization yourself |
spec:
keySerializer: json # Keys are JSON-serialized (default)
valueSerializer: json # Values are JSON-serialized (default)For most use cases, leave serializers at their defaults (json). Use raw only when you need byte-level control or are integrating with external systems that produce raw bytes.
Windowed Tables
Windowed tables partition data into time-based windows, enabling time-bounded aggregations like “count events per minute” or “average value over the last hour.”
Windowed tables are defined in the CRD schema but are not yet implemented in the runtime. The configuration is accepted and validated but has no effect at this time. This section documents the planned API.
Tumbling Windows
Fixed-size, non-overlapping time intervals. Each event belongs to exactly one window.
spec:
name: minute-counters
window:
tumbling:
size: "1m" # 1-minute windows
expires: "1h" # Retain window data for 1 hourHopping Windows
Fixed-size windows that advance by a step interval. Windows can overlap.
spec:
name: sliding-averages
window:
hopping:
size: "5m" # 5-minute window size
step: "1m" # New window every 1 minute
expires: "2h" # Retain window data for 2 hoursWindow Time Assignment
The relativeTo field controls how events are assigned to windows:
| Value | Description |
|---|---|
stream (default) | Uses the Kafka event timestamp |
now | Uses the wall-clock time when the event is processed |
custom | Uses a user-defined function to extract a timestamp |
spec:
name: custom-windowed
window:
tumbling:
size: "1h"
relativeTo: custom
relativeToSelector:
python: |
def get_timestamp(value):
return value["event_time"]Monitoring
Checking Table Status
# List all tables
kubectl get ktables
# Describe a specific table
kubectl describe ktable user-preferencesThe operator sets these status fields:
| Field | Description |
|---|---|
app.name | Name of the parent KasprApp |
app.status | AppFound or AppNotFound |
configMap | Name of the ConfigMap storing the table spec |
hash | Hash of the current table configuration |
Table Metrics
The Kaspr runtime exposes Prometheus metrics for table operations:
| Metric Label | Description |
|---|---|
keys_retrieved | Number of table.get() calls |
keys_updated | Number of table[key] = value writes |
keys_deleted | Number of del table[key] operations |
Complete Example: Event Counting Pipeline
This example shows a table, an agent that writes to it, and a web view that reads from it.
1. Define the counter table:
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
name: event-counters
labels:
kaspr.io/app: counter-app
spec:
name: event-counters
description: "Counts events by category"
keySerializer: json
valueSerializer: json
partitions: 6
defaultSelector:
python: |
def default_type():
return int2. Define the counting agent:
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
name: counter-agent
labels:
kaspr.io/app: counter-app
spec:
input:
topic:
name: events
processors:
pipeline:
- count
operations:
- name: count
tables:
- name: event-counters
paramName: counters
map:
python: |
def count(value, counters):
category = value.get("category", "unknown")
counters[category] += 1
return value3. Define a web view to read counts:
apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
name: count-viewer
labels:
kaspr.io/app: counter-app
spec:
name: count-viewer
request:
method: GET
path: /count/{category}
response:
processors:
pipeline:
- get-count
operations:
- name: get-count
tables:
- name: event-counters
paramName: counters
map:
python: |
def get_count(value, counters):
category = value["category"]
count = counters.get(category, 0)
return {"category": category, "count": count}FAQ
Q: What happens if a pod crashes? Is my table data lost? A: No. Table data is backed by a Kafka changelog topic. When the pod restarts, the table replays the changelog to rebuild its state in RocksDB.
Q: Can multiple agents write to the same table?
A: Yes, multiple agents can reference and write to the same KasprTable. However, be mindful of concurrent writes to the same key — the last write wins.
Q: How large can a table be? A: Tables are limited by local disk space (RocksDB) and the Kafka changelog topic retention settings. For very large datasets, use normal (partitioned) tables so the data is distributed across pods.
Q: Can I use a table without an agent?
A: Tables must be declared and linked to a KasprApp, but they don’t need to be actively written to by an agent. You can pre-populate a table via Kafka (by producing to the changelog topic directly) and use it as read-only reference data.
Q: What’s the difference between defaultSelector and using .get(key, default)?
A: defaultSelector sets a type-level default — every missing key returns a new instance of that type (e.g., int() → 0). The .get(key, default) method is a per-call fallback. Use defaultSelector for patterns like counters where you always want a specific type.