DocumentationUser GuideWebviews - Web interfaces for stream processing

WebViews - HTTP Interfaces for Kaspr Apps

What Is a KasprWebView?

A KasprWebView exposes an HTTP endpoint backed by your Kaspr application logic. It lets you build lightweight, declarative, serverless-style request handlers that:

  • Accept HTTP requests (method + path) inside the cluster (and optionally via an ingress)
  • Run a programmable processing pipeline similar to KasprAgent operations
  • Dynamically publish messages to Kafka topics
  • Return structured HTTP responses with customizable status code, headers, and body

WebViews are defined as Kubernetes custom resources (KasprWebView) and deployed alongside your KasprApp. They complement Agents by handling pull-based request/response flows (HTTP) instead of push-based stream consumption.

Use WebViews when you need an HTTP bridge into your streaming system—for example: REST ingestion, administrative triggers, or interactive tools.

Core Concepts

ConceptDescription
RequestDefines the HTTP method and path to bind (e.g. POST /api/v2/kafka/topics/{topic}).
ProcessorsA pipeline of operations (functions) that transform the request or intermediate data.
OperationsIndividual transformation or side-effect steps (map or topicSend).
topicSendSpecial operation that sends a record to a Kafka topic, optionally using dynamic selectors.
SelectorsUser-provided Python functions to compute dynamic values (topic name, key, value, headers, partition, predicates, response pieces).
Response HandlersFunctions that determine the final body, status code, and headers (success or error).

When to Use WebViews

Common scenarios:

  • REST → Kafka Ingestion: Accept batched JSON payloads and fan them out to a topic.
  • Command & Control: Trigger administrative or orchestration events in your system.
  • Adhoc Tools: Provide a simple UI or CLI-friendly endpoint to inspect or publish data.
  • Validation / Pre-Processing: Apply synchronous validation before emitting to a stream.
  • Bridging External Systems: Allow external services to publish without embedding Kafka clients.
  • Materialized State Lookup: Read from a KasprTable (populated by Agents) to serve enriched or cached responses.

Not ideal for:

  • Long-lived streaming subscriptions (use Agents instead)
  • High-performance bulk data upload (consider specialized ingestion paths)

Quickstart Example

A minimal WebView that accepts a POST request and publishes messages to dynamic Kafka topics:

apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
  name: topic-publisher
  labels:
    kaspr.io/app: kafka-rest-proxy
spec:
  name: topic-publisher
  description: A web interface to publish messages to Kafka topics.
  request:
    method: POST
    path: /api/v2/kafka/rest-proxy/topics/{topic}
  response:
    contentType: application/json
    statusCodeSelector:
      onSuccess:
        python: |
          def select_status_code(data):
              return 202
  processors:
    pipeline:
      - prepare
      - distribute
      - send
    operations:
      - name: prepare
        map:
          python: |
            async def prepare(request, topic):
                values = await request.json()
                messages = []
                for value in values.get("content", []):
                    messages.append({
                        "topic": topic,
                        "key": value.get("key"),
                        "value": value.get("value"),
                        "partition": value.get("partition"),
                        "has_errors": False
                    })
                return messages
      - name: distribute
        map:
          python: |
            def distribute(messages):
                for m in messages:
                    yield m
      - name: send
        topicSend:
          nameSelector:
            python: |
              def select_topic(record):
                  return record.get("topic")
          keySelector:
            python: |
              def select_key(record):
                  return record.get("key")
          valueSelector:
            python: |
              def select_value(record):
                  return record.get("value")
          predicate:
            python: |
              def should_send(record):
                  return record.get("has_errors") is False

Apply it:

kubectl apply -f webview.yaml

Anatomy of a WebView

A KasprWebView spec is composed of these major sections:

  1. request – HTTP binding (method & path)
  2. processors – Transformation pipeline
  3. response – How to render success/error HTTP responses

Request

request:
  method: POST
  path: /api/v2/kafka/rest-proxy/topics/{topic}

Path parameters (e.g. {topic}) are made available as function arguments in operations.

Use descriptive and versioned paths (e.g. /api/v1/...) for forward compatibility.

Processors Pipeline

The processing model is similar to Agents: you define a pipeline ordering and implement each operation under operations.

processors:
  pipeline:
    - prepare
    - distribute
    - send
  operations:
    - name: prepare
      map:
        entrypoint: prepare
        python: |
          def prepare(request, topic):
              return []

Rules:

  • Every name in pipeline must have a corresponding operations entry.
  • map functions return either a single value or an iterable/async iterable.
  • Generators / yielding allow fan-out semantics.

Operation Types

Currently supported operation capabilities:

CapabilityFieldPurpose
TransformmapProduce new value(s) from input. Can return a value, list, or generator.
Side Effect PublishtopicSendSend to Kafka (with dynamic selectors) while continuing or terminating the chain.

Example map Operation

- name: normalize
  map:
    python: |
      def normalize(record):
          record["value"] = (record.get("value") or "").strip()
          return record

topicSend Operation

topicSend allows emitting Kafka messages from within the pipeline.

- name: send
  topicSend:
    name: my-static-topic           # or use nameSelector
    keySerializer: json             # raw | json | pickle | binary
    valueSerializer: json
    keySelector:
      python: |
        def select_key(record):
            return record.get("key")
    valueSelector:
      python: |
        def select_value(record):
            return record.get("value")
    headersSelector:
      python: |
        def select_headers(record):
            return {"x-source": "webview"}
    predicate:
      python: |
        def should_send(record):
            return True

Selectors let you compute dynamic values per record. If predicate returns False, the message is skipped.

Either name or nameSelector must be provided. Use nameSelector when the target topic is derived from the request.

Response Customization

A WebView can dynamically control body, status code, and headers using _Selector handlers.

response:
  contentType: application/json
  statusCodeSelector:
    onSuccess:
      python: |
        def success_status(data):
            return 202
    onError:
      python: |
        def error_status(err):
            return 400
  bodySelector:
    onSuccess:
      python: |
        def success_body(data):
            return {"accepted": True, "count": len(data) if isinstance(data, list) else 1}
    onError:
      python: |
        def error_body(err):
            return {"error": str(err)}
  headersSelector:
    onSuccess:
      python: |
        def success_headers(data):
            return {"x-processed": "true"}

If no selectors are provided, defaults are used (status 200 or CRD-specified defaults, empty headers, body = serialized return object).

⚠️
Always sanitize error messages in onError handlers to avoid leaking sensitive details.

Error Flow

If any operation raises an exception:

  1. Processing stops
  2. onError selectors (if defined) are evaluated
  3. A response is returned with computed status/body/headers

You can intentionally raise structured errors inside map for controlled failures.

map:
  python: |
    def validate(request):
        if not request.get("value"):
            raise ValueError("missing value")
        return request

Async vs Sync Functions

  • Functions may be async def when awaiting I/O (e.g., reading request JSON)
  • Downstream operations receive the resolved result
Use async only where necessary to keep the pipeline simple.

Complete Example with Dynamic Partitioning

apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
  name: enriched-publisher
  labels:
    kaspr.io/app: ecommerce-app
spec:
  name: enriched-publisher
  description: Accepts product events and publishes enriched records.
  request:
    method: POST
    path: /api/v1/products/{region}
  processors:
    pipeline:
      - parse
      - enrich
      - send
    operations:
      - name: parse
        map:
          python: |
            async def parse(request, region):
                payload = await request.json()
                return payload.get("products", [])
      - name: enrich
        map:
          python: |
            def enrich(products, region):
                for p in products:
                    p["region"] = region
                    yield p
      - name: send
        topicSend:
          name: products.events
          keySelector:
            python: |
              def select_key(product):
                  return product.get("id")
          partitionSelector:
            python: |
              def select_partition(product):
                  return hash(product.get("region")) % 8
          predicate:
            python: |
              def allow(product):
                  return product.get("active", True)
  response:
    contentType: application/json
    bodySelector:
      onSuccess:
        python: |
          def body(result):
              return {"processed": len(result) if isinstance(result, list) else 1}

Best Practices

Reading from Tables (Materialized Views)

WebViews can read from KasprTable resources that are populated by KasprAgent processing. This enables low-latency, query-style HTTP endpoints over streaming state (e.g., product metadata, user profiles, counters, feature flags).

⚠️
WebViews cannot write/update table state. Only Agents mutate tables. Treat table access in WebViews as read-only cache access.

Use Case: Enrich Response with Table Data

Suppose an Agent maintains a products-table keyed by product ID with details (name, category, price). A WebView can expose an HTTP endpoint to fetch enriched product info for a region.

apiVersion: kaspr.io/v1alpha1
kind: KasprWebView
metadata:
  name: product-lookup
  labels:
    kaspr.io/app: ecommerce-app
spec:
  name: product-lookup
  request:
    method: GET
    path: /api/v1/products/{product_id}/{region}
  processors:
    pipeline:
      - build
      - respond
    operations:
      - name: build
        map:
          python: |
            def build(request, product_id, region):
                # request object gives access to headers if needed
                # product_table injected via tables declaration below
                return {"product_id": product_id, "region": region}
      - name: respond
        map:
          python: |
            def respond(state, product_table):
                # product_table is read-only; comes from a KasprTable resource
                record = product_table.get(state["product_id"], None)
                if record is None:
                    # Raise to trigger error selectors (or rely on default 404 mapping if implemented)
                    raise ValueError("product not found")
                # Enrich the base state with table record
                state.update({
                  "name": record.get("name"),
                  "category": record.get("category"),
                  "price": record.get("price"),
                })
                return state
    # Table references: make table available as param product_table
    operations:
      - name: build
        map:
          python: |
            def build(request, product_id, region):
                return {"product_id": product_id, "region": region}
      - name: respond
        tables:
          - name: products-table
            paramName: product_table
        map:
          python: |
            def respond(state, product_table):
                rec = product_table.get(state["product_id"], None)
                if rec is None:
                    raise ValueError("product not found")
                state.update(rec)
                return state
  response:
    contentType: application/json
    bodySelector:
      onError:
        python: |
          def error_body(err):
            return {"error": str(err)}

Key points:

  • Tables declared under operations[].tables mirror Agent usage.
  • Function parameters receive the table object by the configured paramName.
  • Raise exceptions to signal not-found or invalid requests.
  • Avoid expensive per-request joins—tables should already contain prepared state curated by Agents.
Design tables as materialized views updated by streaming Agents; WebViews then serve them with millisecond latency.

Performance considerations:

  • Prefer small, key-based lookups (e.g., product_id) over full scans.
  • For composite queries, consider precomputing aggregate state in another table.

Security considerations:

  • Do not expose raw internal keys unless necessary.

  • Sanitize or filter fields before returning table contents.

  • Keep operations small and composable

  • Validate inputs early (first operation)

  • Use generators / yielding for large fan-out to avoid memory spikes

  • Prefer nameSelector over hard-coded name only when truly dynamic

  • Limit synchronous I/O in critical paths; push heavy work to Agents when possible

  • Centralize reusable Python snippets (TODO: shared code include feature)

Security Considerations

  • Expose WebViews via ingress only when necessary
  • Add authentication / authorization at the gateway layer (TODO: native auth support)
  • Sanitize and validate all request bodies
  • Avoid echoing raw exception strings in responses

Monitoring & Observability

  • Log request path, method, and processing duration
  • Count emitted Kafka messages per WebView
  • Track error rates from onError selectors
  • Use partition metrics to validate distribution when using partitionSelector

FAQ

How is this different from an Agent?
Agents consume continuously from streams; WebViews react to on-demand HTTP calls.

Can a WebView call an Agent?
Indirectly—publish to a topic the Agent consumes.

Can I return streaming responses?
Not yet. TODO: evaluate Server-Sent Events or chunked encoding support.

Are headers available in operations?
Yes—request context includes headers. Pass them through your first map if you need them downstream.

Can I perform authentication inside a processor?
Yes, but prefer an external gateway. You can raise an error to return a custom status code.

Limitations & TODOs

  • Native auth / rate limiting (TODO)
  • Structured validation errors helper (TODO)
  • Shared code modules/imports (TODO)
  • Streaming responses (TODO)

See Also