Agents - Distributed Stream Processors
What Is a KasprAgent?
A KasprAgent
is a stream processor defined as a Kubernetes custom resource. It listens to incoming event streams from Kafka topics or in-memory channels, applies custom logic using a programmable pipeline of operations, and optionally produces results to output topics, channels, or sinks.
KasprAgents provide a fully declarative, Kubernetes-native way to express stream processing applications, making them easy to version, manage, and deploy like any other resource in your cluster.
Quickstart
Create a KasprAgent
A minimal agent can be created by specifying an input topic and a simple mapping processor:
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
name: echo-agent
spec:
input:
topic:
name: demo-input
output:
topics:
- name: demo-output
processors:
pipeline:
- echo
operations:
- name: echo
map:
python: |
def echo(value):
return value
Apply it using:
kubectl apply -f echo-agent.yaml
Components of a KasprAgent
Input
Agents must define their input source. This is either:
- A Kafka topic, defined with name or pattern, and serialization settings.
- An in-memory channel, useful for internal communication or testing.
You can also set declare: true
to have the agent auto-create the topic if missing.
input:
topic:
name: my-topic
declare: true
keySerializer: json
valueSerializer: json
Input Buffering
Agents can buffer multiple input events before processing them as a batch. This is useful for scenarios where processing efficiency improves with larger batches, such as bulk database operations or machine learning inference.
input:
topic:
name: sensor-data
take:
max: 100 # Process up to 100 events at once
within: 5s # Or process after 5 seconds, whichever comes first
When buffering is enabled, your processor functions receive a list of events instead of individual events:
processors:
operations:
- name: batch-process
map:
python: |
def process_batch(events):
# events is now a list of values
processed = []
for event in events:
event["batch_size"] = len(events)
processed.append(event)
return processed
Buffering is particularly beneficial for:
- Bulk operations: Database inserts, API calls with batch endpoints
- Statistical analysis: Computing aggregates over windows of data
- ML inference: Batch prediction for better GPU utilization
- I/O optimization: Reducing network round trips
Output
KasprAgents can emit output to:
- Kafka topics, defined with static names or dynamic functions.
- In-memory channels for internal use.
- Custom sinks defined by Python code
Each output target can define:
keySelector
,valueSelector
, and optionalheadersSelector
predicate
for filtering what values to allow to pass throughpartitionSelector
for custom partitioningack: true
for delivery guarantees
Example:
output:
topics:
- name: my-output-topic
keySelector:
python: |
def get_key(value):
return value["id"]
predicate:
python: |
def should_send(value):
return "id" in value
Processors
The heart of a KasprAgent is its processors section, which defines the transformation pipeline.
A processor pipeline is:
- Declared as a sequence of named operations
- Backed by user-defined Python
map
and/orfilter
functions - Optional
tables
can be attached for stateful logic
Example:
processors:
pipeline:
- validate
- enrich
init:
python: |
def init():
print("Initializing agent...")
operations:
- name: validate
filter:
python: |
def validate(value):
return value.get("is_valid", False)
- name: enrich
map:
python: |
def enrich(value):
value["processed"] = True
return value
Initialization
The init
block lets you define startup logic, load configuration, prepare resources, or validate environment variables.
init:
python: |
import os
if "MY_VAR" not in os.environ:
raise RuntimeError("Missing MY_VAR")
Table Access
Agents can reference state tables and use them as inputs to their operations.
tables:
- name: my-table
paramName: table
map:
python: |
def enrich(value, table):
value["status"] = table.get(value["id"], "unknown")
return value
Partitioning and Routing
Kafka partitioning is automatically leveraged if your input topics are partitioned. You can control output partitioning via partitionSelector
.
You may also implement repartitioning by emitting records with a newly computed key via keySelector
and routing to a new topic.
Processing Failures
When an agent encounters an exception during event processing, the system maintains exactly-once semantics by acknowledging the source message and not reprocessing it. While this prevents duplicate processing, it raises the question of what happens to failed events.
There are several approaches to handling failed events, each with trade-offs:
Acknowledgment (Current Behavior)
- The failed message is acknowledged and marked as complete
- Ensures exactly-once processing semantics
- Failed events are not reprocessed automatically
Retry Strategies
- Retrying requires stopping topic processing to maintain message ordering
- The next offset cannot be processed until the failed event is resolved
- Moving events to the “back of the queue” breaks topic ordering
Instance Restart
- Crashing the instance forces human intervention
- Not ideal given the frequency of code errors and unexpected exceptions
- Better to log errors and notify operations teams for manual replay
Explicit Error Handling
Agents may emit to dead-letter queues (DLQs) using conditional logic in predicate or return values that match failure conditions.
output:
topics:
- name: dead-letter-topic
predicate:
python: |
def should_send(value):
return "error" in value
Concurrency and Scaling
KasprAgents scale by deploying multiple KasprApp
replicas. Kafka ensures partition-based routing, meaning each message lands on exactly one instance.
Kafka automatically distributes topic partitions across available agent instances using consumer group rebalancing. When you scale agent replicas:
- Adding instances: New agents join the consumer group and Kafka redistributes partitions to balance the load
- Removing instances: Kafka detects unavailable agents and reassigns their partitions to remaining instances
- Partition assignment: Each partition is consumed by exactly one agent instance at a time, ensuring ordered processing
This automatic rebalancing means agents can scale horizontally without manual intervention, with processing automatically redistributed as the cluster size changes.