DocumentationUser GuideAgents - Distributed stream processors

KasprAgents - Distributed Stream Processors

What Is a KasprAgent?

A KasprAgent is a stream processor defined as a Kubernetes custom resource. It listens to incoming event streams from Kafka topics or in-memory channels, applies custom logic using a programmable pipeline of operations, and optionally produces results to output topics, channels, or sinks.

Key Features:

  • Declarative configuration via Kubernetes CRDs
  • Horizontal scalability and automatic partition rebalancing
  • Flexible input/output (Kafka topics, channels, custom sinks)
  • Programmable transformation pipeline (Python functions)
  • Built-in support for batching, error handling, and stateful processing

Why use KasprAgents?

  • Version, manage, and deploy stream processors like any other Kubernetes resource
  • Integrate with existing Kafka infrastructure
  • Simplify complex event-driven workflows

Example Use Cases

  • ETL Pipelines: Transform and enrich data from Kafka topics before loading into databases
  • Anomaly Detection: Process sensor streams and emit alerts for outlier events
  • Event Enrichment: Join events with reference tables and output enriched results

Quickstart

Create a KasprAgent

A minimal agent can be created by specifying an input topic and a simple mapping processor. This example echoes incoming events to an output topic:

apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
  name: echo-agent
spec:
  input:
    topic:
      name: demo-input
  output:
    topics:
      - name: demo-output
  processors:
    pipeline:
      - echo
    operations:
      - name: echo
        map:
          python: |
            def echo(value):
                return value

Apply it using:

kubectl apply -f echo-agent.yaml

Components of a KasprAgent

Input

Agents must define their input source. This is either:

  • A Kafka topic, defined with name or pattern, and serialization settings.
  • An in-memory channel, useful for internal communication or testing.

You can also set declare: true to have the agent auto-create the topic if missing.

input:
  topic:
    name: my-topic
    declare: true
    keySerializer: json
    valueSerializer: json
Use declare: true to automatically create missing topics or channels.

Input Buffering

KasprAgents can buffer multiple input events before processing them as a batch. This is useful for scenarios where processing efficiency improves with larger batches, such as bulk database operations or machine learning inference.

input:
  topic:
    name: sensor-data
  take:
    max: 100           # Process up to 100 events at once
    within: 5s         # Or process after 5 seconds, whichever comes first

When buffering is enabled, your processor functions receive a list of events instead of individual events:

processors:
  operations:
    - name: batch-process
      map:
        python: |
          def process_batch(events):
              # events is now a list of values
              processed = []
              for event in events:
                  event["batch_size"] = len(events)
                  processed.append(event)
              return processed
Batching is ideal for bulk database writes, ML inference, and reducing network round trips.

Buffering is particularly beneficial for:

  • Bulk operations: Database inserts, API calls with batch endpoints
  • Statistical analysis: Computing aggregates over windows of data
  • ML inference: Batch prediction for better GPU utilization
  • I/O optimization: Reducing network round trips

Output

KasprAgents can emit output to:

  • Kafka topics, defined with static names or dynamic functions.
  • In-memory channels for internal use.
  • Custom sinks defined by Python code

Each output target can define:

  • keySelector, valueSelector, and optional headersSelector
  • predicate for filtering what values to allow to pass through
  • partitionSelector for custom partitioning
  • ack: true for delivery guarantees

Example:

output:
  topics:
    - name: my-output-topic
      keySelector:
        python: |
          def get_key(value):
              return value["id"]
      predicate:
        python: |
          def should_send(value):
              return "id" in value
Use predicates to filter which events are sent to output topics.

Processors

The heart of a KasprAgent is its processors section, which defines the transformation pipeline.

A processor pipeline is:

  • Declared as a sequence of named operations
  • Backed by user-defined Python map and/or filter functions
  • Optional tables can be attached for stateful logic

Example:

processors:
  pipeline:
    - validate
    - enrich
  init:
    python: |
      def init():
          print("Initializing agent...")
  operations:
    - name: validate
      filter:
        python: |
          def validate(value):
              return value.get("is_valid", False)
    - name: enrich
      map:
        python: |
          def enrich(value):
              value["processed"] = True
              return value
Keep processor functions stateless unless you use tables for stateful logic.

Initialization

The init block lets you define startup logic, load configuration, prepare resources, or validate environment variables.

init:
  python: |
    import os
    if "MY_VAR" not in os.environ:
        raise RuntimeError("Missing MY_VAR")
Use init for resource setup, environment validation, or loading configuration.

Table Access

Agents can reference state tables and use them as inputs to their operations.

tables:
  - name: my-table
    paramName: table
map:
  python: |
    def enrich(value, table):
        value["status"] = table.get(value["id"], "unknown")
        return value
Tables provide persistent or shared state for advanced event processing.

Partitioning and Routing

Kafka partitioning is automatically leveraged if your input topics are partitioned. You can control output partitioning via partitionSelector.

You may also implement repartitioning by emitting records with a newly computed key via keySelector and routing to a new topic.

Processing Failures

When a KasprAgent encounters an exception during event processing, the system maintains exactly-once semantics by acknowledging the source message and not reprocessing it. While this prevents duplicate processing, it raises the question of what happens to failed events.

There are several approaches to handling failed events, each with trade-offs:

Acknowledgment (Current Behavior)

  • The failed message is acknowledged and marked as complete
  • Ensures exactly-once processing semantics
  • Failed events are not reprocessed automatically

Retry Strategies

  • Retrying requires stopping topic processing to maintain message ordering
  • The next offset cannot be processed until the failed event is resolved
  • Moving events to the “back of the queue” breaks topic ordering

Instance Restart

  • Crashing the instance forces human intervention
  • Not ideal given the frequency of code errors and unexpected exceptions
  • Better to log errors and notify operations teams for manual replay
⚠️
Always monitor agent logs for errors and consider using DLQs for failed events.

Explicit Error Handling

Agents may emit to dead-letter queues (DLQs) using conditional logic in predicate or return values that match failure conditions.

output:
  topics:
    - name: dead-letter-topic
      predicate:
        python: |
          def should_send(value):
              return "error" in value
DLQs help isolate and analyze failed events for later reprocessing.

Concurrency and Scaling

KasprAgents scale by deploying multiple KasprApp replicas. Kafka ensures partition-based routing, meaning each message lands on exactly one instance. Kafka automatically distributes topic partitions across available agent instances using consumer group rebalancing. When you scale agent replicas:

  • Adding instances: New agents join the consumer group and Kafka redistributes partitions to balance the load
  • Removing instances: Kafka detects unavailable agents and reassigns their partitions to remaining instances
  • Partition assignment: Each partition is consumed by exactly one agent instance at a time, ensuring ordered processing

This automatic rebalancing means agents can scale horizontally without manual intervention, with processing automatically redistributed as the cluster size changes.

Scale agents up or down to match workload—Kafka handles partition rebalancing automatically.

FAQ & Troubleshooting

Q: Why isn’t my agent processing events? A: Check that your input topic/channel exists and is correctly configured. Use declare: true to auto-create topics. Review agent logs for errors.

Q: How do I debug processor errors? A: Enable detailed logging in your processor functions. Use DLQs to capture failed events for analysis.

Q: Can I use global state in processor functions? A: Use tables for persistent or shared state.

Q: How do I handle schema changes in input events? A: Update your processor functions to handle new fields or formats. Consider versioning your agent resources.