DocumentationUser GuideJoins - Table-to-Table Joins

KasprJoin - Table-to-Table Joins

What Is a KasprJoin?

A KasprJoin is a Kubernetes custom resource that declares a reactive join between two KasprTable resources. When either table changes, the join automatically emits a combined JoinedValue to a named output channel that a KasprAgent can consume.

Joins allow you to correlate records across two tables using keys, where a key can be extracted from the table’s value. For example, you can join an orders table (keyed by order_id) with a products table (keyed by product_id) by extracting the product_id from each order.

Key Features:

  • Declarative YAML configuration — no Python code beyond a small extractor function
  • Reactive — changes on either side trigger re-emission of joined results
  • No co-partitioning required — left and right tables can have different keys
  • Supports inner and left join semantics
  • Output is a named channel consumable by any KasprAgent

How It Works

A join uses an internal subscription/response protocol to efficiently join two tables:

  1. When a record changes in the left table, the extractor function extracts a key from the value
  2. A subscription message is sent to the right table’s partition that owns that key
  3. The right-side task stores the subscription and looks up the current right value
  4. A response is sent back to the left table’s partition
  5. The left-side task combines both values into a JoinedValue and emits it to the output channel

This process is fully automatic — Kaspr manages the internal topics, stores, and callbacks.

Left Table Change ──► Extract Key ──► Subscription ──► Right Table Lookup

Output Channel ◄── JoinedValue(left, right) ◄── Response ◄───┘

Internal subscription and response topics are auto-created by the runtime. You don’t need to manage them.


Creating a Join

Step 1: Define Two Tables

First, create the tables you want to join:

apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
  name: orders
  labels:
    kaspr.io/app: order-system
spec:
  name: orders
  description: "Orders keyed by order_id"
  partitions: 6
---
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
  name: products
  labels:
    kaspr.io/app: order-system
spec:
  name: products
  description: "Product catalog keyed by product_id"
  partitions: 6

Step 2: Define the Join

Create a KasprJoin that connects the two tables. The extractor is a Python function that extracts the join key from the left-table value:

apiVersion: kaspr.io/v1alpha1
kind: KasprJoin
metadata:
  name: orders-products-join
  labels:
    kaspr.io/app: order-system
spec:
  description: "Join orders with products by product_id"
  leftTable: orders
  rightTable: products
  extractor:
    entrypoint: get_product_id
    python: |
      def get_product_id(value):
          return value.get("product_id")
  type: inner
  outputChannel: orders-products-joined

Step 3: Consume the Join Channel

Create a KasprAgent that reads from the join’s output channel:

apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
  name: process-enriched-orders
  labels:
    kaspr.io/app: order-system
spec:
  description: "Processes joined order+product records"
  input:
    channel:
      name: orders-products-joined
  output:
    topics:
      - name: enriched-orders
  processors:
    pipeline:
      - enrich
    operations:
      - name: enrich
        map:
          entrypoint: enrich_order
          python: |
            def enrich_order(value):
                order = value["left"]
                product = value["right"]
                return {
                    "order_id": order.get("order_id"),
                    "product_name": product.get("name"),
                    "total": product.get("price", 0) * order.get("quantity", 0),
                }

Apply with:

kubectl apply -f tables.yaml
kubectl apply -f join.yaml
kubectl apply -f agent.yaml

Join Types

The type field controls join semantics:

Inner Join (default)

type: inner

Only emits a JoinedValue when both sides have a matching record. If the right table has no entry for the extracted key, no output is produced.

Left Join

type: left

Always emits a JoinedValue when the left table changes. If the right table has no match, right will be null:

{"left": {"order_id": "123", "product_id": "P999"}, "right": null}

Use left joins when you want to process all left-side changes even when the right side hasn’t been populated yet.


JoinedValue Format

The output channel emits events with the following structure:

{
  "left": { ... },
  "right": { ... }
}
  • left — The full record from the left table
  • right — The matching record from the right table (or null for left joins with no match)

Access these in your agent’s processor:

def process(value):
    order = value["left"]       # left table record
    product = value["right"]    # right table record (or None)
    return {
        "order_id": order.get("order_id"),
        "product_name": product.get("name") if product else "Unknown",
    }

Working with the Extractor

The extractor is a Python function that receives a left-table value and returns the key to look up in the right table.

Basic Extractor

extractor:
  entrypoint: get_key
  python: |
    def get_key(value):
        return value.get("product_id")

Composite Key Extractor

extractor:
  entrypoint: get_key
  python: |
    def get_key(value):
        return f"{value.get('region')}:{value.get('product_id')}"

Extractor with Fallback

extractor:
  entrypoint: get_key
  python: |
    def get_key(value):
        return value.get("product_id") or value.get("sku")
⚠️

The extractor must return a value that matches a key in the right table. If it returns None, no join will be performed for that record.


Key Serializers and Joins

Both input topics and tables use json as the default key serializer - this means key joins work out of the box without any additional serializer configuration. However, if you override serializers on either side, it’s important to keep them aligned.

Why Serializers Matter

The join works by extracting a key from the left table’s value and performing a lookup in the right table. For this lookup to succeed, the key stored in the right table must be encoded the same way as the extracted key.

Consider two agents writing to the orders and products tables:

  • With json serialization (the default), event.key is a deserialized Python value (e.g., "P100")
  • With raw serialization, event.key is raw bytes (e.g., b'"P100"')

When you write table[event.key] = value, the table stores whatever type event.key is. If the table’s keySerializer is json but the input topic uses raw, a raw-bytes key that already contains JSON quotes gets double-encoded — stored as "\"P100\"" instead of "P100".

Later, the extractor returns a clean value like "P100", but the lookup searches for "P100" — which doesn’t match the double-encoded "\"P100\"" in the table.

The Rule

🚫

The keySerializer on an agent’s input topic must match the keySerializer on the table that the agent writes to.

Since both default to json, joins work correctly as long as you don’t override one without the other. If you do need to change serializers, make sure both sides match:

# Agent writing to a JSON-serialized table — no keySerializer needed (both default to json)
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
  name: order-writer
  labels:
    kaspr.io/app: order-system
spec:
  input:
    topic:
      name: raw-orders
      # keySerializer defaults to json — matches the table's default
  processors:
    pipeline:
      - write-to-table
    operations:
      - name: write-to-table
        tables:
          - name: orders
            paramName: orders
        map:
          entrypoint: write
          python: |
            def write(value, orders):
                event = context["event"]
                orders[event.key] = value   # event.key is already deserialized

App-Level Default

The app-level default keySerializer can be overridden in the KasprApp config if needed:

apiVersion: kaspr.io/v1alpha1
kind: KasprApp
metadata:
  name: order-system
spec:
  config:
    keySerializer: json       # This is already the default
    valueSerializer: json     # This is already the default

You only need to set these explicitly if you are migrating from an older version where the default key serializer was raw, or if you want to be explicit in your configuration.

Quick Reference

Input Topic keySerializerTable keySerializerevent.key TypeJoin Works?
json (default)json (default)Python object (str, int, etc.)Yes
rawjsonRaw bytesNo — double-encoded keys
rawrawRaw bytesYes (but extractor must return bytes)
jsonrawPython objectNo — type mismatch

Output Channel Naming

The outputChannel field is optional. If omitted, the channel name defaults to {name}-channel:

metadata.nameoutputChannelResolved Channel Name
orders-products-joinorders-products-joinedorders-products-joined
orders-products-join(not set)orders-products-join-channel

Agents reference the channel by name in their input configuration:

input:
  channel:
    name: orders-products-joined  # must match the join's output channel

Monitoring Joins

View All Joins

kubectl get kasprjoins

Inspect a Join

kubectl describe kasprjoin orders-products-join

Status Fields

The join status shows validation results for referenced resources:

status:
  app:
    name: order-system
    status: AppFound          # or AppNotFound
  leftTable:
    name: orders
    status: TableFound        # or TableNotFound
  rightTable:
    name: products
    status: TableFound        # or TableNotFound
  configMap: orders-products-join
  hash: "abc123..."

Events

The operator emits events when resource references change:

EventReasonDescription
WarningAppNotFoundReferenced KasprApp does not exist
NormalAppFoundReferenced KasprApp was found (after being missing)
WarningLeftTableNotFoundReferenced left KasprTable does not exist
NormalLeftTableFoundReferenced left KasprTable was found
WarningRightTableNotFoundReferenced right KasprTable does not exist
NormalRightTableFoundReferenced right KasprTable was found

You can create a KasprJoin before its referenced tables exist. The operator will emit warnings and automatically update the status once the tables are created.


Full Example: Order Enrichment

This complete example shows an order enrichment pipeline using a key join:

# 1. App
apiVersion: kaspr.io/v1alpha1
kind: KasprApp
metadata:
  name: order-system
spec:
  replicas: 3
  bootstrapServers: kafka-bootstrap:9092
  storage:
    size: 5Gi
---
# 2. Orders table
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
  name: orders
  labels:
    kaspr.io/app: order-system
spec:
  name: orders
  partitions: 6
---
# 3. Products table
apiVersion: kaspr.io/v1alpha1
kind: KasprTable
metadata:
  name: products
  labels:
    kaspr.io/app: order-system
spec:
  name: products
  partitions: 6
---
# 4. Key join
apiVersion: kaspr.io/v1alpha1
kind: KasprJoin
metadata:
  name: orders-products-join
  labels:
    kaspr.io/app: order-system
spec:
  description: "Join orders with products by product_id"
  leftTable: orders
  rightTable: products
  extractor:
    entrypoint: get_product_id
    python: |
      def get_product_id(order):
          return order.get("product_id")
  type: inner
  outputChannel: orders-products-joined
---
# 5. Agent consuming the joined stream
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
  name: process-enriched-orders
  labels:
    kaspr.io/app: order-system
spec:
  description: "Processes joined order+product records from key join"
  input:
    channel:
      name: orders-products-joined
  output:
    topics:
      - name: enriched-orders
        keySelector:
          python: |
            def get_key(value):
                return value.get("order_id")
  processors:
    pipeline:
      - enrich
    init:
      python: |
        from datetime import datetime, timezone
    operations:
      - name: enrich
        map:
          entrypoint: enrich_order
          python: |
            def enrich_order(value):
                order = value["left"]
                product = value["right"]
                return {
                    "order_id": order.get("order_id"),
                    "product_id": order.get("product_id"),
                    "quantity": order.get("quantity"),
                    "product_name": product.get("name"),
                    "unit_price": product.get("price"),
                    "total": product.get("price", 0) * order.get("quantity", 0),
                    "enriched_at": datetime.now(timezone.utc).isoformat(),
                }

Data flow:

  1. Raw orders arrive on raw-orders topic → ingested into orders table
  2. Raw products arrive on raw-products topic → ingested into products table
  3. When an order is written, its product_id is extracted and used to look up the product
  4. A JoinedValue containing both the order and product is emitted to orders-products-joined
  5. The process-enriched-orders agent picks up the joined value and produces an enriched record to enriched-orders