DocumentationUser GuideAgents - Distributed stream processors

Agents - Distributed Stream Processors

What Is a KasprAgent?

A KasprAgent is a stream processor defined as a Kubernetes custom resource. It listens to incoming event streams from Kafka topics or in-memory channels, applies custom logic using a programmable pipeline of operations, and optionally produces results to output topics, channels, or sinks.

KasprAgents provide a fully declarative, Kubernetes-native way to express stream processing applications, making them easy to version, manage, and deploy like any other resource in your cluster.

Quickstart

Create a KasprAgent

A minimal agent can be created by specifying an input topic and a simple mapping processor:

apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
  name: echo-agent
spec:
  input:
    topic:
      name: demo-input
  output:
    topics:
      - name: demo-output
  processors:
    pipeline:
      - echo
    operations:
      - name: echo
        map:
          python: |
            def echo(value):
                return value

Apply it using:

kubectl apply -f echo-agent.yaml

Components of a KasprAgent

Input

Agents must define their input source. This is either:

  • A Kafka topic, defined with name or pattern, and serialization settings.
  • An in-memory channel, useful for internal communication or testing.

You can also set declare: true to have the agent auto-create the topic if missing.

input:
  topic:
    name: my-topic
    declare: true
    keySerializer: json
    valueSerializer: json

Input Buffering

Agents can buffer multiple input events before processing them as a batch. This is useful for scenarios where processing efficiency improves with larger batches, such as bulk database operations or machine learning inference.

input:
  topic:
    name: sensor-data
  take:
    max: 100           # Process up to 100 events at once
    within: 5s         # Or process after 5 seconds, whichever comes first

When buffering is enabled, your processor functions receive a list of events instead of individual events:

processors:
  operations:
    - name: batch-process
      map:
        python: |
          def process_batch(events):
              # events is now a list of values
              processed = []
              for event in events:
                  event["batch_size"] = len(events)
                  processed.append(event)
              return processed

Buffering is particularly beneficial for:

  • Bulk operations: Database inserts, API calls with batch endpoints
  • Statistical analysis: Computing aggregates over windows of data
  • ML inference: Batch prediction for better GPU utilization
  • I/O optimization: Reducing network round trips

Output

KasprAgents can emit output to:

  • Kafka topics, defined with static names or dynamic functions.
  • In-memory channels for internal use.
  • Custom sinks defined by Python code

Each output target can define:

  • keySelector, valueSelector, and optional headersSelector
  • predicate for filtering what values to allow to pass through
  • partitionSelector for custom partitioning
  • ack: true for delivery guarantees

Example:

output:
  topics:
    - name: my-output-topic
      keySelector:
        python: |
          def get_key(value):
              return value["id"]
      predicate:
        python: |
          def should_send(value):
              return "id" in value

Processors

The heart of a KasprAgent is its processors section, which defines the transformation pipeline.

A processor pipeline is:

  • Declared as a sequence of named operations
  • Backed by user-defined Python map and/or filter functions
  • Optional tables can be attached for stateful logic

Example:

processors:
  pipeline:
    - validate
    - enrich
  init:
    python: |
      def init():
          print("Initializing agent...")
  operations:
    - name: validate
      filter:
        python: |
          def validate(value):
              return value.get("is_valid", False)
    - name: enrich
      map:
        python: |
          def enrich(value):
              value["processed"] = True
              return value

Initialization

The init block lets you define startup logic, load configuration, prepare resources, or validate environment variables.

init:
  python: |
    import os
    if "MY_VAR" not in os.environ:
        raise RuntimeError("Missing MY_VAR")

Table Access

Agents can reference state tables and use them as inputs to their operations.

tables:
  - name: my-table
    paramName: table
map:
  python: |
    def enrich(value, table):
        value["status"] = table.get(value["id"], "unknown")
        return value

Partitioning and Routing

Kafka partitioning is automatically leveraged if your input topics are partitioned. You can control output partitioning via partitionSelector.

You may also implement repartitioning by emitting records with a newly computed key via keySelector and routing to a new topic.

Processing Failures

When an agent encounters an exception during event processing, the system maintains exactly-once semantics by acknowledging the source message and not reprocessing it. While this prevents duplicate processing, it raises the question of what happens to failed events.

There are several approaches to handling failed events, each with trade-offs:

Acknowledgment (Current Behavior)

  • The failed message is acknowledged and marked as complete
  • Ensures exactly-once processing semantics
  • Failed events are not reprocessed automatically

Retry Strategies

  • Retrying requires stopping topic processing to maintain message ordering
  • The next offset cannot be processed until the failed event is resolved
  • Moving events to the “back of the queue” breaks topic ordering

Instance Restart

  • Crashing the instance forces human intervention
  • Not ideal given the frequency of code errors and unexpected exceptions
  • Better to log errors and notify operations teams for manual replay

Explicit Error Handling

Agents may emit to dead-letter queues (DLQs) using conditional logic in predicate or return values that match failure conditions.

output:
  topics:
    - name: dead-letter-topic
      predicate:
        python: |
          def should_send(value):
              return "error" in value

Concurrency and Scaling

KasprAgents scale by deploying multiple KasprApp replicas. Kafka ensures partition-based routing, meaning each message lands on exactly one instance. Kafka automatically distributes topic partitions across available agent instances using consumer group rebalancing. When you scale agent replicas:

  • Adding instances: New agents join the consumer group and Kafka redistributes partitions to balance the load
  • Removing instances: Kafka detects unavailable agents and reassigns their partitions to remaining instances
  • Partition assignment: Each partition is consumed by exactly one agent instance at a time, ensuring ordered processing

This automatic rebalancing means agents can scale horizontally without manual intervention, with processing automatically redistributed as the cluster size changes.