DocumentationUser GuideTables - Distributed Key-Value State

KasprTables - Distributed Key-Value State

What Is a KasprTable?

A KasprTable is a Kubernetes custom resource that declares a distributed key-value store embedded within a KasprApp. Tables let your stream processors maintain persistent state — caches, counters, lookup data, or any data you need to read and write during event processing.

Key Features:

  • Backed by a Kafka changelog topic for durability and fault tolerance
  • Stored locally in an embedded RocksDB database for fast access
  • Agents, Tasks, and WebViews can all reference tables
  • Supports Normal (partitioned) and Global (fully replicated) modes
  • Custom default values via Python functions
  • Configurable serialization and partitioning

How Tables Work

Each KasprTable creates a Kafka topic (called a changelog). Every write to the table produces a message to this topic. On startup or rebalance, the table replays the changelog to rebuild its local state in RocksDB.

Agent writes table[key] = value


   ┌──────────┐         ┌───────────────┐
   │ RocksDB  │ ◄────── │ Changelog     │
   │ (local)  │ ──────► │ Topic (Kafka) │
   └──────────┘         └───────────────┘


Agent reads table.get(key)

This gives you:

  • Low-latency reads — data lives in the local RocksDB instance
  • Durability — if a pod crashes, the changelog topic replays to rebuild state
  • Scalability — normal tables are partitioned, so each pod owns a slice of the data

Tables are automatically managed by the Kaspr runtime. You don’t need to create Kafka topics or manage RocksDB — just declare the table and use it.


Normal vs. Global Tables

Normal Tables (Default)

A normal table distributes data across partitions. Each application instance (pod) owns a subset of the keys, determined by key hashing. This is the default and most scalable option.

Use normal tables when:

  • Data is naturally partitioned (e.g., by user ID, order ID)
  • Each event only needs to access data for its own partition key
  • You need horizontal scalability

Global Tables

A global table replicates all data to every application instance. Each pod has a complete copy.

spec:
  name: config-data
  global: true

Use global tables when:

  • The dataset is small (reference/lookup data)
  • Any event might need to access any key (cross-partition lookups)
  • You need broadcast-style updates
⚠️

Global tables consume more memory and network bandwidth since every instance holds a full copy. Use them only for small, frequently-read reference data.


Creating a Table

Basic Table

apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
  name: user-preferences
  labels:
    kaspr.io/app: my-app
spec:
  name: user-preferences
  description: "Stores user notification preferences"
  keySerializer: json
  valueSerializer: json
  partitions: 8

Apply it:

kubectl apply -f user-preferences-table.yaml

Check the status:

kubectl get ktable user-preferences

ktable and ktables are shorthand aliases for kasprtable / kasprtables.

Table with Default Values

When reading a key that doesn’t exist, tables normally return None. You can provide a defaultSelector — a Python function that returns a class type used to initialize missing keys:

apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
  name: counters
  labels:
    kaspr.io/app: my-app
spec:
  name: counters
  description: "Event counters with int default"
  defaultSelector:
    python: |
      def default_type():
          return int

With this configuration, reading a missing key returns 0 (the default int() value) instead of None. This is useful for counter patterns where you want to increment without checking for existence:

def count_events(value, counters):
    # No need to check if key exists — defaults to int (0)
    counters[value["category"]] += 1
    return value
⚠️

The defaultSelector function must return a class type (e.g., int, dict, list), not an instance. The runtime validates this with inspect.isclass().

Global Table

apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
  name: product-catalog
  labels:
    kaspr.io/app: my-app
spec:
  name: product-catalog
  description: "Product reference data — replicated to all instances"
  global: true
  keySerializer: json
  valueSerializer: json
  partitions: 4

Table with Extra Topic Configuration

You can pass Kafka topic configuration properties for the changelog topic:

apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
  name: api-cache
  labels:
    kaspr.io/app: my-app
spec:
  name: api-cache
  description: "Caches API responses with 24h retention"
  keySerializer: json
  valueSerializer: json
  partitions: 8
  extraTopicConfigs:
    retention.ms: "86400000"    # 24 hours
    cleanup.policy: "compact"

Common extraTopicConfigs options:

ConfigDescriptionExample
retention.msHow long to retain messages"86400000" (24h), "-1" (forever)
cleanup.policycompact (keep latest per key) or delete"compact"
segment.bytesLog segment size"1073741824" (1GB)
min.compaction.lag.msMinimum time before compaction"60000" (1min)

Using Tables in Agents

Tables are injected into agent operations as Python function parameters. This is how you read from and write to table state during event processing.

Step 1: Declare the Table Reference

In your agent’s operation, add a tables block that maps the table resource name to a Python parameter name:

apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
  name: event-enricher
  labels:
    kaspr.io/app: my-app
spec:
  input:
    topic:
      name: raw-events
  output:
    topics:
      - name: enriched-events
  processors:
    pipeline:
      - enrich
    operations:
      - name: enrich
        tables:
          - name: user-preferences    # KasprTable resource name
            paramName: prefs           # Python parameter name
        map:
          python: |
            def enrich(value, prefs):
                user_id = value["user_id"]
                user_pref = prefs.get(user_id)
                if user_pref:
                    value["notification_channel"] = user_pref.get("channel", "email")
                return value

Step 2: Reading from Tables

Use standard dict-like operations to read table data:

# Get a value (returns None if key doesn't exist)
result = table.get(key)
 
# Get with a default
result = table.get(key, "fallback_value")

Step 3: Writing to Tables

Agents can write to tables to persist state:

def track_last_seen(value, tracker):
    user_id = value["user_id"]
    tracker[user_id] = {
        "last_seen": value["timestamp"],
        "event_count": tracker.get(user_id, {}).get("event_count", 0) + 1
    }
    return value

Step 4: Deleting from Tables

def cleanup(value, cache):
    if value.get("action") == "delete":
        key = value["id"]
        del cache[key]
    return value

An operation can reference multiple tables. Each table gets its own paramName:

tables:
  - name: user-preferences
    paramName: prefs
  - name: api-cache
    paramName: cache
def process(value, prefs, cache):
    # Access both tables
    user = prefs.get(value["user_id"])
    cached = cache.get(value["api_key"])
    ...

Using Tables in WebViews

WebViews can read from tables to serve data over HTTP, but cannot write to them. Only Agents can mutate table state.

apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
  name: user-lookup
  labels:
    kaspr.io/app: my-app
spec:
  name: user-lookup
  request:
    method: GET
    path: /user/{user_id}
  response:
    processors:
      pipeline:
        - lookup
      operations:
        - name: lookup
          tables:
            - name: user-preferences
              paramName: prefs
          map:
            python: |
              def lookup(value, prefs):
                  user_id = value["user_id"]
                  return prefs.get(user_id, {"error": "not found"})

Serialization

Tables support two serializer modes for keys and values:

SerializerDescriptionUse When
json (default)JSON encoding/decodingData is structured (dicts, lists, strings, numbers)
rawRaw bytes — no serializationData is already bytes or you handle serialization yourself
spec:
  keySerializer: json      # Keys are JSON-serialized (default)
  valueSerializer: json    # Values are JSON-serialized (default)

For most use cases, leave serializers at their defaults (json). Use raw only when you need byte-level control or are integrating with external systems that produce raw bytes.


Windowed Tables

Windowed tables partition data into time-based windows, enabling time-bounded aggregations like “count events per minute” or “average value over the last hour.”

⚠️

Windowed tables are defined in the CRD schema but are not yet implemented in the runtime. The configuration is accepted and validated but has no effect at this time. This section documents the planned API.

Tumbling Windows

Fixed-size, non-overlapping time intervals. Each event belongs to exactly one window.

spec:
  name: minute-counters
  window:
    tumbling:
      size: "1m"         # 1-minute windows
      expires: "1h"      # Retain window data for 1 hour

Hopping Windows

Fixed-size windows that advance by a step interval. Windows can overlap.

spec:
  name: sliding-averages
  window:
    hopping:
      size: "5m"         # 5-minute window size
      step: "1m"         # New window every 1 minute
      expires: "2h"      # Retain window data for 2 hours

Window Time Assignment

The relativeTo field controls how events are assigned to windows:

ValueDescription
stream (default)Uses the Kafka event timestamp
nowUses the wall-clock time when the event is processed
customUses a user-defined function to extract a timestamp
spec:
  name: custom-windowed
  window:
    tumbling:
      size: "1h"
    relativeTo: custom
    relativeToSelector:
      python: |
        def get_timestamp(value):
            return value["event_time"]

Monitoring

Checking Table Status

# List all tables
kubectl get ktables
 
# Describe a specific table
kubectl describe ktable user-preferences

The operator sets these status fields:

FieldDescription
app.nameName of the parent KasprApp
app.statusAppFound or AppNotFound
configMapName of the ConfigMap storing the table spec
hashHash of the current table configuration

Table Metrics

The Kaspr runtime exposes Prometheus metrics for table operations:

Metric LabelDescription
keys_retrievedNumber of table.get() calls
keys_updatedNumber of table[key] = value writes
keys_deletedNumber of del table[key] operations

Complete Example: Event Counting Pipeline

This example shows a table, an agent that writes to it, and a web view that reads from it.

1. Define the counter table:

apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
  name: event-counters
  labels:
    kaspr.io/app: counter-app
spec:
  name: event-counters
  description: "Counts events by category"
  keySerializer: json
  valueSerializer: json
  partitions: 6
  defaultSelector:
    python: |
      def default_type():
          return int

2. Define the counting agent:

apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
  name: counter-agent
  labels:
    kaspr.io/app: counter-app
spec:
  input:
    topic:
      name: events
  processors:
    pipeline:
      - count
    operations:
      - name: count
        tables:
          - name: event-counters
            paramName: counters
        map:
          python: |
            def count(value, counters):
                category = value.get("category", "unknown")
                counters[category] += 1
                return value

3. Define a web view to read counts:

apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
  name: count-viewer
  labels:
    kaspr.io/app: counter-app
spec:
  name: count-viewer
  request:
    method: GET
    path: /count/{category}
  response:
    processors:
      pipeline:
        - get-count
      operations:
        - name: get-count
          tables:
            - name: event-counters
              paramName: counters
          map:
            python: |
              def get_count(value, counters):
                  category = value["category"]
                  count = counters.get(category, 0)
                  return {"category": category, "count": count}

FAQ

Q: What happens if a pod crashes? Is my table data lost? A: No. Table data is backed by a Kafka changelog topic. When the pod restarts, the table replays the changelog to rebuild its state in RocksDB.

Q: Can multiple agents write to the same table? A: Yes, multiple agents can reference and write to the same KasprTable. However, be mindful of concurrent writes to the same key — the last write wins.

Q: How large can a table be? A: Tables are limited by local disk space (RocksDB) and the Kafka changelog topic retention settings. For very large datasets, use normal (partitioned) tables so the data is distributed across pods.

Q: Can I use a table without an agent? A: Tables must be declared and linked to a KasprApp, but they don’t need to be actively written to by an agent. You can pre-populate a table via Kafka (by producing to the changelog topic directly) and use it as read-only reference data.

Q: What’s the difference between defaultSelector and using .get(key, default)? A: defaultSelector sets a type-level default — every missing key returns a new instance of that type (e.g., int()0). The .get(key, default) method is a per-call fallback. Use defaultSelector for patterns like counters where you always want a specific type.