← Back to Blog

Six Streams, One Truth: Real-time Observability with Server-Sent Events

Most dashboards poll. Hexarch streams.

The Guardrails subsystem exposes six Server-Sent Event (SSE) endpoints, each delivering a specific event type in real-time. The UI consumes these through a single async generator function that handles connection lifecycle, parsing, and error recovery.

The Six Streams

Each stream serves a distinct observability purpose:

EndpointEvent TypeUse Case
/audit-logs/streamAuditLogAll governance events: creates, updates, deletes
/policy-decisions/streamPolicyDecisionEvery authorization decision (ALLOW/DENY)
/provider-calls/streamProviderCallAI model invocations with latency and token costs
/security-events/streamSecurityEventRate limit violations, HTTP exceptions
/policy-changes/streamPolicyChangePolicy CRUD mutations
/rule-changes/streamRuleChangeRule definition changes

The Async Generator

The guardrailsSseStream() function is the core primitive:

type GuardrailsSseEvent<T = unknown> = {
  event?: string;
  id?: string;
  data: T;
};

export async function* guardrailsSseStream(
  path: string,
  init: RequestInit = {},
  configOverride?: Partial<GuardrailsConfig>
): AsyncGenerator<GuardrailsSseEvent, void, unknown> {
  const config = { ...loadGuardrailsConfig(), ...configOverride };
  const url = `${normalizeBaseUrl(config.baseUrl)}${path.startsWith('/') ? path : `/${path}`}`;
  const headers = buildHeaders(config, init.headers, init.method || 'GET', init.body);
  headers.set('Accept', 'text/event-stream');

  const response = await fetch(url, { ...init, headers });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    buffer = buffer.replace(/\r\n/g, '\n');

    // Split blocks on \n\n, parse event/id/data lines, JSON.parse(data) when possible
    // yield { event, id, data }
  }
}

The generator yields typed events as they arrive. The consumer decides what to do with them—render to UI, aggregate for charts, or trigger alerts.

Stream Status Tracking

The GuardrailsStream.tsx component tracks connection state:

type StreamStatus = 'disconnected' | 'connecting' | 'connected' | 'error';

const [streamStatus, setStreamStatus] = useState<StreamStatus>('disconnected');

The UI shows the current status, so operators know whether they’re seeing live data or stale state.

Event Type Definitions

The UI intentionally treats streamed payloads as “shape-like” objects (because they come from the network). For example:

type PolicyDecisionLike = {
  id?: string;
  at?: string;
  decision?: string;
  reason?: string;
  policies?: string[];
  actor_id?: string;
  actor_type?: string;
  request?: { method?: string; path?: string };
};

type ProviderCallLike = {
  id?: string;
  at?: string;
  resource?: string;
  action?: string;
  ok?: boolean;
  status_code?: number;
  latency_ms?: number;
  model?: string;
  tokens_in?: number;
  tokens_out?: number;
  cost_usd?: number;
  error_type?: string;
  error_message?: string;
  actor_id?: string;
  actor_type?: string;
};

type SecurityEventLike = {
  id?: string;
  at?: string;
  kind?: string;
  status_code?: number;
  decision?: string;
  reason?: string;
  detail?: string;
  request?: { method?: string; path?: string };
  actor_id?: string;
  actor_type?: string;
};

Format Functions

The stream component includes formatters for human-readable display:

function toLine(log: {
  created_at?: string;
  action?: string;
  entity_type?: string;
  entity_id?: string;
  actor_type?: string;
  actor_id?: string;
  chain_id?: string;
  reason?: string;
}): string {
  const ts = log.created_at ? new Date(log.created_at).toISOString() : 'unknown-time';
  const action = (log.action || 'EVENT').padEnd(8);
  const entity = `${log.entity_type || 'Entity'}:${log.entity_id || 'unknown'}`;
  const actor = `${log.actor_type || 'actor'}:${log.actor_id || 'unknown'}`;
  const chain = log.chain_id ? ` chain=${log.chain_id}` : '';
  const reason = log.reason ? ` reason=${log.reason}` : '';
  return `${ts}  ${action}  ${entity}  ${actor}${chain}${reason}`;
}

function toProviderLine(p: {
  at?: string;
  ok?: boolean;
  resource?: string;
  action?: string;
  status_code?: number;
  latency_ms?: number;
  model?: string;
  tokens_in?: number;
  tokens_out?: number;
  cost_usd?: number;
  error_type?: string;
  error_message?: string;
  actor_type?: string;
  actor_id?: string;
}): string {
  const ts = p.at ? new Date(p.at).toISOString() : 'unknown-time';
  const ok = p.ok === false ? 'FAIL' : 'OK';
  const res = p.resource || '(resource)';
  const act = p.action || '(action)';
  const status = p.status_code != null ? ` status=${p.status_code}` : '';
  const latency = p.latency_ms != null ? ` latency=${p.latency_ms}ms` : '';
  const model = p.model ? ` model=${p.model}` : '';
  const tokens = p.tokens_in != null || p.tokens_out != null ? ` tokens=${p.tokens_in ?? '?'}→${p.tokens_out ?? '?'}` : '';
  const cost = p.cost_usd != null ? ` cost=$${p.cost_usd}` : '';
  const err = p.error_type ? ` err=${p.error_type}` : '';
  const msg = p.error_message ? ` msg=${p.error_message}` : '';
  const actor = `${p.actor_type || 'actor'}:${p.actor_id || 'unknown'}`;
  return `${ts}  ${ok.padEnd(4)}  ${res} ${act}  ${actor}${status}${latency}${model}${tokens}${cost}${err}${msg}`;
}

These turn structured events into scannable log lines—terminal-style output for operators who need to see what’s happening now.

Stream Selection (One Connection)

The current GuardrailsStream.tsx UI connects to one stream at a time (audit, decisions, provider, security, policy changes, or rule changes), based on a streamKind selector:

const stream =
  streamKind === 'decisions'
    ? guardrailsStreamPolicyDecisions(decisionFilters, cfg, controller.signal)
    : streamKind === 'provider'
      ? guardrailsStreamProviderCalls(providerFilters, cfg, controller.signal)
      : streamKind === 'security'
        ? guardrailsStreamSecurityEvents(securityFilters, cfg, controller.signal)
        : streamKind === 'policy'
          ? guardrailsStreamPolicyChanges(policyChangeFilters, cfg, controller.signal)
          : streamKind === 'rule'
            ? guardrailsStreamRuleChanges(ruleChangeFilters, cfg, controller.signal)
            : guardrailsStreamAuditLogs(auditFilters, cfg, controller.signal);

for await (const evt of stream) {
  // format and append terminal lines
}

An AbortController handles clean teardown on disconnect or when switching streams.

Why SSE Over WebSockets

Server-Sent Events are a deliberate choice:

  1. HTTP/2 multiplexing: Multiple streams share one TCP connection
  2. Simple reconnection: the UI reconnects explicitly (Connect/Reconnect) and tears down via AbortController
  3. Firewall-friendly: Plain HTTP, no upgrade handshake
  4. Unidirectional: The server pushes; the client listens. No bidirectional complexity.

For observability—where the server has information and the client wants to see it—SSE is the right primitive.

Query Parameters

The streams accept filtering parameters:

const params = new URLSearchParams({
  chain_id: 'global',
  entity_type: 'policy',
  actor_id: 'admin@hexarch.io',
  tail: '100',        // Last N events
  poll_ms: '1000'     // Server-side poll interval
});

The tail parameter is particularly useful: start with the last 100 events, then stream new ones as they arrive. No cold start, no missing context.

Error Recovery

When a stream fails, the component stores the formatted error and switches status to error. The UI renders it inline above the stream controls, and the operator can hit Connect/Reconnect:

try {
  for await (const evt of stream) {
    // ...
  }
} catch (e) {
  setError(formatError(e));
  setStatus('error');
}

{error && <div className="gr-error">{error}</div>}

The async generator surfaces errors; the UI decides how to present them.

Try It

The stream UI is available at /guardrails/stream. Select which event types to monitor, watch the live feed, and see how decisions, policy changes, and provider calls flow through the system in real-time.

If you’re running the Guardrails API locally, trigger some authorization requests and watch them appear instantly. That’s the difference between polling and streaming.