Agents - Distributed Stream Processors
An agent is a distributed system processing the events in a stream.
Define Agents
Inputs
A KasprAgent resource takes an input topic or channel that defines the source of events for the agent’s stream.
Multiple topics
You can consume from multiple topics simultaniously by providing multiple topic names or a regex pattern. This is mostly useful when all inputs share the same data schema but can also be used in general.
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
name: welcome-email-sender
spec:
inputs:
topic:
names:
- new-customers
Partitioning
When an agent reads from a topic, the stream is partitioned based on the key of the message. For example, the stream could have keys that are customer ids, and values that are purchases, then partitioning will decide that any message with the same customer id as key, is always delivered to the same agent instance.
Sometimes you’ll have to repartition the stream, to ensure a portion of the data is consistently delievered to the same agent instance on a particu. See Stream Processors for more information on the group_by
operator.
App Assignment
Agents must be assigned to a KasprApp. The assigned app is the designated worker that will host and run the agent stream processor.
apiVersion: kaspr.io/v1alpha1
kind: KasprAgent
metadata:
name: offer-generator
kaspr.io/app: loyalty-app
spec:
inputs:
topic:
names:
- customers
Processors
Processors take actions on events in a stream, processing one event at a time. Operations like filter
, map
, and groupBy
are built-in processor action that transform the stream. You can also create custom operations using Python. An agent processor chains multiple operations together into a pipeline, with each input event flowing through the operations sequentially.
There are two types of processor operations:
- Terminal: A terminal operator concludes the stream processing and represents the final operation in a pipeline.
- Non-terminal: A non-terminal operator transforms the stream and passes it along, enabling subsequent operators in the chain to process the data further.