KasprJoin - Table-to-Table Joins
What Is a KasprJoin?
A KasprJoin is a Kubernetes custom resource that declares a reactive join between two KasprTable resources. When either table changes, the join automatically emits a combined JoinedValue to a named output channel that a KasprAgent can consume.
Joins allow you to correlate records across two tables using keys, where a key can be extracted from the table’s value. For example, you can join an orders table (keyed by order_id) with a products table (keyed by product_id) by extracting the product_id from each order.
Key Features:
- Declarative YAML configuration — no Python code beyond a small extractor function
- Reactive — changes on either side trigger re-emission of joined results
- No co-partitioning required — left and right tables can have different keys
- Supports
innerandleftjoin semantics - Output is a named channel consumable by any
KasprAgent
How It Works
A join uses an internal subscription/response protocol to efficiently join two tables:
- When a record changes in the left table, the
extractorfunction extracts a key from the value - A subscription message is sent to the right table’s partition that owns that key
- The right-side task stores the subscription and looks up the current right value
- A response is sent back to the left table’s partition
- The left-side task combines both values into a
JoinedValueand emits it to the output channel
This process is fully automatic — Kaspr manages the internal topics, stores, and callbacks.
Left Table Change ──► Extract Key ──► Subscription ──► Right Table Lookup
│
Output Channel ◄── JoinedValue(left, right) ◄── Response ◄───┘Internal subscription and response topics are auto-created by the runtime. You don’t need to manage them.
Creating a Join
Step 1: Define Two Tables
First, create the tables you want to join:
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
name: orders
labels:
kaspr.io/app: order-system
spec:
name: orders
description: "Orders keyed by order_id"
partitions: 6
---
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
name: products
labels:
kaspr.io/app: order-system
spec:
name: products
description: "Product catalog keyed by product_id"
partitions: 6Step 2: Define the Join
Create a KasprJoin that connects the two tables. The extractor is a Python function that extracts the join key from the left-table value:
apiVersion: kaspr.io/v1alpha1
kind: KasprJoin
metadata:
name: orders-products-join
labels:
kaspr.io/app: order-system
spec:
description: "Join orders with products by product_id"
leftTable: orders
rightTable: products
extractor:
entrypoint: get_product_id
python: |
def get_product_id(value):
return value.get("product_id")
type: inner
outputChannel: orders-products-joinedStep 3: Consume the Join Channel
Create a KasprAgent that reads from the join’s output channel:
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
name: process-enriched-orders
labels:
kaspr.io/app: order-system
spec:
description: "Processes joined order+product records"
input:
channel:
name: orders-products-joined
output:
topics:
- name: enriched-orders
processors:
pipeline:
- enrich
operations:
- name: enrich
map:
entrypoint: enrich_order
python: |
def enrich_order(value):
order = value["left"]
product = value["right"]
return {
"order_id": order.get("order_id"),
"product_name": product.get("name"),
"total": product.get("price", 0) * order.get("quantity", 0),
}Apply with:
kubectl apply -f tables.yaml
kubectl apply -f join.yaml
kubectl apply -f agent.yamlJoin Types
The type field controls join semantics:
Inner Join (default)
type: innerOnly emits a JoinedValue when both sides have a matching record. If the right table has no entry for the extracted key, no output is produced.
Left Join
type: leftAlways emits a JoinedValue when the left table changes. If the right table has no match, right will be null:
{"left": {"order_id": "123", "product_id": "P999"}, "right": null}Use left joins when you want to process all left-side changes even when the right side hasn’t been populated yet.
JoinedValue Format
The output channel emits events with the following structure:
{
"left": { ... },
"right": { ... }
}left— The full record from the left tableright— The matching record from the right table (ornullfor left joins with no match)
Access these in your agent’s processor:
def process(value):
order = value["left"] # left table record
product = value["right"] # right table record (or None)
return {
"order_id": order.get("order_id"),
"product_name": product.get("name") if product else "Unknown",
}Working with the Extractor
The extractor is a Python function that receives a left-table value and returns the key to look up in the right table.
Basic Extractor
extractor:
entrypoint: get_key
python: |
def get_key(value):
return value.get("product_id")Composite Key Extractor
extractor:
entrypoint: get_key
python: |
def get_key(value):
return f"{value.get('region')}:{value.get('product_id')}"Extractor with Fallback
extractor:
entrypoint: get_key
python: |
def get_key(value):
return value.get("product_id") or value.get("sku")The extractor must return a value that matches a key in the right table. If it returns None, no join will be performed for that record.
Key Serializers and Joins
Both input topics and tables use json as the default key serializer - this means key joins work out of the box without any additional serializer configuration. However, if you override serializers on either side, it’s important to keep them aligned.
Why Serializers Matter
The join works by extracting a key from the left table’s value and performing a lookup in the right table. For this lookup to succeed, the key stored in the right table must be encoded the same way as the extracted key.
Consider two agents writing to the orders and products tables:
- With
jsonserialization (the default),event.keyis a deserialized Python value (e.g.,"P100") - With
rawserialization,event.keyis raw bytes (e.g.,b'"P100"')
When you write table[event.key] = value, the table stores whatever type event.key is. If the table’s keySerializer is json but the input topic uses raw, a raw-bytes key that already contains JSON quotes gets double-encoded — stored as "\"P100\"" instead of "P100".
Later, the extractor returns a clean value like "P100", but the lookup searches for "P100" — which doesn’t match the double-encoded "\"P100\"" in the table.
The Rule
The keySerializer on an agent’s input topic must match the keySerializer on the table that the agent writes to.
Since both default to json, joins work correctly as long as you don’t override one without the other. If you do need to change serializers, make sure both sides match:
# Agent writing to a JSON-serialized table — no keySerializer needed (both default to json)
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
name: order-writer
labels:
kaspr.io/app: order-system
spec:
input:
topic:
name: raw-orders
# keySerializer defaults to json — matches the table's default
processors:
pipeline:
- write-to-table
operations:
- name: write-to-table
tables:
- name: orders
paramName: orders
map:
entrypoint: write
python: |
def write(value, orders):
event = context["event"]
orders[event.key] = value # event.key is already deserializedApp-Level Default
The app-level default keySerializer can be overridden in the KasprApp config if needed:
apiVersion: kaspr.io/v1alpha1
kind: KasprApp
metadata:
name: order-system
spec:
config:
keySerializer: json # This is already the default
valueSerializer: json # This is already the defaultYou only need to set these explicitly if you are migrating from an older version where the default key serializer was raw, or if you want to be explicit in your configuration.
Quick Reference
Input Topic keySerializer | Table keySerializer | event.key Type | Join Works? |
|---|---|---|---|
json (default) | json (default) | Python object (str, int, etc.) | Yes |
raw | json | Raw bytes | No — double-encoded keys |
raw | raw | Raw bytes | Yes (but extractor must return bytes) |
json | raw | Python object | No — type mismatch |
Output Channel Naming
The outputChannel field is optional. If omitted, the channel name defaults to {name}-channel:
metadata.name | outputChannel | Resolved Channel Name |
|---|---|---|
orders-products-join | orders-products-joined | orders-products-joined |
orders-products-join | (not set) | orders-products-join-channel |
Agents reference the channel by name in their input configuration:
input:
channel:
name: orders-products-joined # must match the join's output channelMonitoring Joins
View All Joins
kubectl get kasprjoinsInspect a Join
kubectl describe kasprjoin orders-products-joinStatus Fields
The join status shows validation results for referenced resources:
status:
app:
name: order-system
status: AppFound # or AppNotFound
leftTable:
name: orders
status: TableFound # or TableNotFound
rightTable:
name: products
status: TableFound # or TableNotFound
configMap: orders-products-join
hash: "abc123..."Events
The operator emits events when resource references change:
| Event | Reason | Description |
|---|---|---|
| Warning | AppNotFound | Referenced KasprApp does not exist |
| Normal | AppFound | Referenced KasprApp was found (after being missing) |
| Warning | LeftTableNotFound | Referenced left KasprTable does not exist |
| Normal | LeftTableFound | Referenced left KasprTable was found |
| Warning | RightTableNotFound | Referenced right KasprTable does not exist |
| Normal | RightTableFound | Referenced right KasprTable was found |
You can create a KasprJoin before its referenced tables exist. The operator will emit warnings and automatically update the status once the tables are created.
Full Example: Order Enrichment
This complete example shows an order enrichment pipeline using a key join:
# 1. App
apiVersion: kaspr.io/v1alpha1
kind: KasprApp
metadata:
name: order-system
spec:
replicas: 3
bootstrapServers: kafka-bootstrap:9092
storage:
size: 5Gi
---
# 2. Orders table
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
name: orders
labels:
kaspr.io/app: order-system
spec:
name: orders
partitions: 6
---
# 3. Products table
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
name: products
labels:
kaspr.io/app: order-system
spec:
name: products
partitions: 6
---
# 4. Key join
apiVersion: kaspr.io/v1alpha1
kind: KasprJoin
metadata:
name: orders-products-join
labels:
kaspr.io/app: order-system
spec:
description: "Join orders with products by product_id"
leftTable: orders
rightTable: products
extractor:
entrypoint: get_product_id
python: |
def get_product_id(order):
return order.get("product_id")
type: inner
outputChannel: orders-products-joined
---
# 5. Agent consuming the joined stream
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
name: process-enriched-orders
labels:
kaspr.io/app: order-system
spec:
description: "Processes joined order+product records from key join"
input:
channel:
name: orders-products-joined
output:
topics:
- name: enriched-orders
keySelector:
python: |
def get_key(value):
return value.get("order_id")
processors:
pipeline:
- enrich
init:
python: |
from datetime import datetime, timezone
operations:
- name: enrich
map:
entrypoint: enrich_order
python: |
def enrich_order(value):
order = value["left"]
product = value["right"]
return {
"order_id": order.get("order_id"),
"product_id": order.get("product_id"),
"quantity": order.get("quantity"),
"product_name": product.get("name"),
"unit_price": product.get("price"),
"total": product.get("price", 0) * order.get("quantity", 0),
"enriched_at": datetime.now(timezone.utc).isoformat(),
}Data flow:
- Raw orders arrive on
raw-orderstopic → ingested intoorderstable - Raw products arrive on
raw-productstopic → ingested intoproductstable - When an order is written, its
product_idis extracted and used to look up the product - A
JoinedValuecontaining both the order and product is emitted toorders-products-joined - The
process-enriched-ordersagent picks up the joined value and produces an enriched record toenriched-orders