WebViews - HTTP Interfaces for Kaspr Apps
What Is a KasprWebView?
A KasprWebView exposes an HTTP endpoint backed by your Kaspr application logic. It lets you build lightweight, declarative, serverless-style request handlers that:
- Accept HTTP requests (method + path) inside the cluster (and optionally via an ingress)
- Run a programmable processing pipeline similar to
KasprAgentoperations - Dynamically publish messages to Kafka topics
- Return structured HTTP responses with customizable status code, headers, and body
WebViews are defined as Kubernetes custom resources (KasprWebView) and deployed alongside your KasprApp. They complement Agents by handling pull-based request/response flows (HTTP) instead of push-based stream consumption.
Core Concepts
| Concept | Description |
|---|---|
| Request | Defines the HTTP method and path to bind (e.g. POST /api/v2/kafka/topics/{topic}). |
| Processors | A pipeline of operations (functions) that transform the request or intermediate data. |
| Operations | Individual transformation or side-effect steps (map or topicSend). |
| topicSend | Special operation that sends a record to a Kafka topic, optionally using dynamic selectors. |
| Selectors | User-provided Python functions to compute dynamic values (topic name, key, value, headers, partition, predicates, response pieces). |
| Response Handlers | Functions that determine the final body, status code, and headers (success or error). |
When to Use WebViews
Common scenarios:
- REST → Kafka Ingestion: Accept batched JSON payloads and fan them out to a topic.
- Command & Control: Trigger administrative or orchestration events in your system.
- Adhoc Tools: Provide a simple UI or CLI-friendly endpoint to inspect or publish data.
- Validation / Pre-Processing: Apply synchronous validation before emitting to a stream.
- Bridging External Systems: Allow external services to publish without embedding Kafka clients.
- Materialized State Lookup: Read from a
KasprTable(populated by Agents) to serve enriched or cached responses.
Not ideal for:
- Long-lived streaming subscriptions (use Agents instead)
- High-performance bulk data upload (consider specialized ingestion paths)
Quickstart Example
A minimal WebView that accepts a POST request and publishes messages to dynamic Kafka topics:
apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
name: topic-publisher
labels:
kaspr.io/app: kafka-rest-proxy
spec:
name: topic-publisher
description: A web interface to publish messages to Kafka topics.
request:
method: POST
path: /api/v2/kafka/rest-proxy/topics/{topic}
response:
contentType: application/json
statusCodeSelector:
onSuccess:
python: |
def select_status_code(data):
return 202
processors:
pipeline:
- prepare
- distribute
- send
operations:
- name: prepare
map:
python: |
async def prepare(request, topic):
values = await request.json()
messages = []
for value in values.get("content", []):
messages.append({
"topic": topic,
"key": value.get("key"),
"value": value.get("value"),
"partition": value.get("partition"),
"has_errors": False
})
return messages
- name: distribute
map:
python: |
def distribute(messages):
for m in messages:
yield m
- name: send
topicSend:
nameSelector:
python: |
def select_topic(record):
return record.get("topic")
keySelector:
python: |
def select_key(record):
return record.get("key")
valueSelector:
python: |
def select_value(record):
return record.get("value")
predicate:
python: |
def should_send(record):
return record.get("has_errors") is FalseApply it:
kubectl apply -f webview.yamlAnatomy of a WebView
A KasprWebView spec is composed of these major sections:
request– HTTP binding (method & path)processors– Transformation pipelineresponse– How to render success/error HTTP responses
Request
request:
method: POST
path: /api/v2/kafka/rest-proxy/topics/{topic}Path parameters (e.g. {topic}) are made available as function arguments in operations.
/api/v1/...) for forward compatibility.Processors Pipeline
The processing model is similar to Agents: you define a pipeline ordering and implement each operation under operations.
processors:
pipeline:
- prepare
- distribute
- send
operations:
- name: prepare
map:
entrypoint: prepare
python: |
def prepare(request, topic):
return []Rules:
- Every name in
pipelinemust have a correspondingoperationsentry. mapfunctions return either a single value or an iterable/async iterable.- Generators / yielding allow fan-out semantics.
Operation Types
Currently supported operation capabilities:
| Capability | Field | Purpose |
|---|---|---|
| Transform | map | Produce new value(s) from input. Can return a value, list, or generator. |
| Side Effect Publish | topicSend | Send to Kafka (with dynamic selectors) while continuing or terminating the chain. |
Example map Operation
- name: normalize
map:
python: |
def normalize(record):
record["value"] = (record.get("value") or "").strip()
return recordtopicSend Operation
topicSend allows emitting Kafka messages from within the pipeline.
- name: send
topicSend:
name: my-static-topic # or use nameSelector
keySerializer: json # raw | json | pickle | binary
valueSerializer: json
keySelector:
python: |
def select_key(record):
return record.get("key")
valueSelector:
python: |
def select_value(record):
return record.get("value")
headersSelector:
python: |
def select_headers(record):
return {"x-source": "webview"}
predicate:
python: |
def should_send(record):
return TrueSelectors let you compute dynamic values per record. If predicate returns False, the message is skipped.
name or nameSelector must be provided. Use nameSelector when the target topic is derived from the request.Response Customization
A WebView can dynamically control body, status code, and headers using _Selector handlers.
response:
contentType: application/json
statusCodeSelector:
onSuccess:
python: |
def success_status(data):
return 202
onError:
python: |
def error_status(err):
return 400
bodySelector:
onSuccess:
python: |
def success_body(data):
return {"accepted": True, "count": len(data) if isinstance(data, list) else 1}
onError:
python: |
def error_body(err):
return {"error": str(err)}
headersSelector:
onSuccess:
python: |
def success_headers(data):
return {"x-processed": "true"}If no selectors are provided, defaults are used (status 200 or CRD-specified defaults, empty headers, body = serialized return object).
onError handlers to avoid leaking sensitive details.Error Flow
If any operation raises an exception:
- Processing stops
onErrorselectors (if defined) are evaluated- A response is returned with computed status/body/headers
You can intentionally raise structured errors inside map for controlled failures.
map:
python: |
def validate(request):
if not request.get("value"):
raise ValueError("missing value")
return requestAsync vs Sync Functions
- Functions may be
async defwhen awaiting I/O (e.g., reading request JSON) - Downstream operations receive the resolved result
Complete Example with Dynamic Partitioning
apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
name: enriched-publisher
labels:
kaspr.io/app: ecommerce-app
spec:
name: enriched-publisher
description: Accepts product events and publishes enriched records.
request:
method: POST
path: /api/v1/products/{region}
processors:
pipeline:
- parse
- enrich
- send
operations:
- name: parse
map:
python: |
async def parse(request, region):
payload = await request.json()
return payload.get("products", [])
- name: enrich
map:
python: |
def enrich(products, region):
for p in products:
p["region"] = region
yield p
- name: send
topicSend:
name: products.events
keySelector:
python: |
def select_key(product):
return product.get("id")
partitionSelector:
python: |
def select_partition(product):
return hash(product.get("region")) % 8
predicate:
python: |
def allow(product):
return product.get("active", True)
response:
contentType: application/json
bodySelector:
onSuccess:
python: |
def body(result):
return {"processed": len(result) if isinstance(result, list) else 1}Best Practices
Reading from Tables (Materialized Views)
WebViews can read from KasprTable resources that are populated by KasprAgent processing. This enables low-latency, query-style HTTP endpoints over streaming state (e.g., product metadata, user profiles, counters, feature flags).
Use Case: Enrich Response with Table Data
Suppose an Agent maintains a products-table keyed by product ID with details (name, category, price). A WebView can expose an HTTP endpoint to fetch enriched product info for a region.
apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
name: product-lookup
labels:
kaspr.io/app: ecommerce-app
spec:
name: product-lookup
request:
method: GET
path: /api/v1/products/{product_id}/{region}
processors:
pipeline:
- build
- respond
operations:
- name: build
map:
python: |
def build(request, product_id, region):
# request object gives access to headers if needed
# product_table injected via tables declaration below
return {"product_id": product_id, "region": region}
- name: respond
map:
python: |
def respond(state, product_table):
# product_table is read-only; comes from a KasprTable resource
record = product_table.get(state["product_id"], None)
if record is None:
# Raise to trigger error selectors (or rely on default 404 mapping if implemented)
raise ValueError("product not found")
# Enrich the base state with table record
state.update({
"name": record.get("name"),
"category": record.get("category"),
"price": record.get("price"),
})
return state
# Table references: make table available as param product_table
operations:
- name: build
map:
python: |
def build(request, product_id, region):
return {"product_id": product_id, "region": region}
- name: respond
tables:
- name: products-table
paramName: product_table
map:
python: |
def respond(state, product_table):
rec = product_table.get(state["product_id"], None)
if rec is None:
raise ValueError("product not found")
state.update(rec)
return state
response:
contentType: application/json
bodySelector:
onError:
python: |
def error_body(err):
return {"error": str(err)}Key points:
- Tables declared under
operations[].tablesmirror Agent usage. - Function parameters receive the table object by the configured
paramName. - Raise exceptions to signal not-found or invalid requests.
- Avoid expensive per-request joins—tables should already contain prepared state curated by Agents.
Performance considerations:
- Prefer small, key-based lookups (e.g.,
product_id) over full scans. - For composite queries, consider precomputing aggregate state in another table.
Security considerations:
-
Do not expose raw internal keys unless necessary.
-
Sanitize or filter fields before returning table contents.
-
Keep operations small and composable
-
Validate inputs early (first operation)
-
Use generators / yielding for large fan-out to avoid memory spikes
-
Prefer
nameSelectorover hard-codednameonly when truly dynamic -
Limit synchronous I/O in critical paths; push heavy work to Agents when possible
-
Centralize reusable Python snippets (TODO: shared code include feature)
Security Considerations
- Expose WebViews via ingress only when necessary
- Add authentication / authorization at the gateway layer (TODO: native auth support)
- Sanitize and validate all request bodies
- Avoid echoing raw exception strings in responses
Monitoring & Observability
- Log request path, method, and processing duration
- Count emitted Kafka messages per WebView
- Track error rates from
onErrorselectors - Use partition metrics to validate distribution when using
partitionSelector
FAQ
How is this different from an Agent?
Agents consume continuously from streams; WebViews react to on-demand HTTP calls.
Can a WebView call an Agent?
Indirectly—publish to a topic the Agent consumes.
Can I return streaming responses?
Not yet. TODO: evaluate Server-Sent Events or chunked encoding support.
Are headers available in operations?
Yes—request context includes headers. Pass them through your first map if you need them downstream.
Can I perform authentication inside a processor?
Yes, but prefer an external gateway. You can raise an error to return a custom status code.
Limitations & TODOs
- Native auth / rate limiting (TODO)
- Structured validation errors helper (TODO)
- Shared code modules/imports (TODO)
- Streaming responses (TODO)