Six Streams, One Truth: Real-time Observability with Server-Sent Events
Most dashboards poll. Hexarch streams.
The Guardrails subsystem exposes six Server-Sent Event (SSE) endpoints, each delivering a specific event type in real-time. The UI consumes these through a single async generator function that handles connection lifecycle, parsing, and error recovery.
The Six Streams
Each stream serves a distinct observability purpose:
| Endpoint | Event Type | Use Case |
|---|---|---|
/audit-logs/stream | AuditLog | All governance events: creates, updates, deletes |
/policy-decisions/stream | PolicyDecision | Every authorization decision (ALLOW/DENY) |
/provider-calls/stream | ProviderCall | AI model invocations with latency and token costs |
/security-events/stream | SecurityEvent | Rate limit violations, HTTP exceptions |
/policy-changes/stream | PolicyChange | Policy CRUD mutations |
/rule-changes/stream | RuleChange | Rule definition changes |
The Async Generator
The guardrailsSseStream() function is the core primitive:
type GuardrailsSseEvent<T = unknown> = {
event?: string;
id?: string;
data: T;
};
export async function* guardrailsSseStream(
path: string,
init: RequestInit = {},
configOverride?: Partial<GuardrailsConfig>
): AsyncGenerator<GuardrailsSseEvent, void, unknown> {
const config = { ...loadGuardrailsConfig(), ...configOverride };
const url = `${normalizeBaseUrl(config.baseUrl)}${path.startsWith('/') ? path : `/${path}`}`;
const headers = buildHeaders(config, init.headers, init.method || 'GET', init.body);
headers.set('Accept', 'text/event-stream');
const response = await fetch(url, { ...init, headers });
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
buffer = buffer.replace(/\r\n/g, '\n');
// Split blocks on \n\n, parse event/id/data lines, JSON.parse(data) when possible
// yield { event, id, data }
}
}
The generator yields typed events as they arrive. The consumer decides what to do with them—render to UI, aggregate for charts, or trigger alerts.
Stream Status Tracking
The GuardrailsStream.tsx component tracks connection state:
type StreamStatus = 'disconnected' | 'connecting' | 'connected' | 'error';
const [streamStatus, setStreamStatus] = useState<StreamStatus>('disconnected');
The UI shows the current status, so operators know whether they’re seeing live data or stale state.
Event Type Definitions
The UI intentionally treats streamed payloads as “shape-like” objects (because they come from the network). For example:
type PolicyDecisionLike = {
id?: string;
at?: string;
decision?: string;
reason?: string;
policies?: string[];
actor_id?: string;
actor_type?: string;
request?: { method?: string; path?: string };
};
type ProviderCallLike = {
id?: string;
at?: string;
resource?: string;
action?: string;
ok?: boolean;
status_code?: number;
latency_ms?: number;
model?: string;
tokens_in?: number;
tokens_out?: number;
cost_usd?: number;
error_type?: string;
error_message?: string;
actor_id?: string;
actor_type?: string;
};
type SecurityEventLike = {
id?: string;
at?: string;
kind?: string;
status_code?: number;
decision?: string;
reason?: string;
detail?: string;
request?: { method?: string; path?: string };
actor_id?: string;
actor_type?: string;
};
Format Functions
The stream component includes formatters for human-readable display:
function toLine(log: {
created_at?: string;
action?: string;
entity_type?: string;
entity_id?: string;
actor_type?: string;
actor_id?: string;
chain_id?: string;
reason?: string;
}): string {
const ts = log.created_at ? new Date(log.created_at).toISOString() : 'unknown-time';
const action = (log.action || 'EVENT').padEnd(8);
const entity = `${log.entity_type || 'Entity'}:${log.entity_id || 'unknown'}`;
const actor = `${log.actor_type || 'actor'}:${log.actor_id || 'unknown'}`;
const chain = log.chain_id ? ` chain=${log.chain_id}` : '';
const reason = log.reason ? ` reason=${log.reason}` : '';
return `${ts} ${action} ${entity} ${actor}${chain}${reason}`;
}
function toProviderLine(p: {
at?: string;
ok?: boolean;
resource?: string;
action?: string;
status_code?: number;
latency_ms?: number;
model?: string;
tokens_in?: number;
tokens_out?: number;
cost_usd?: number;
error_type?: string;
error_message?: string;
actor_type?: string;
actor_id?: string;
}): string {
const ts = p.at ? new Date(p.at).toISOString() : 'unknown-time';
const ok = p.ok === false ? 'FAIL' : 'OK';
const res = p.resource || '(resource)';
const act = p.action || '(action)';
const status = p.status_code != null ? ` status=${p.status_code}` : '';
const latency = p.latency_ms != null ? ` latency=${p.latency_ms}ms` : '';
const model = p.model ? ` model=${p.model}` : '';
const tokens = p.tokens_in != null || p.tokens_out != null ? ` tokens=${p.tokens_in ?? '?'}→${p.tokens_out ?? '?'}` : '';
const cost = p.cost_usd != null ? ` cost=$${p.cost_usd}` : '';
const err = p.error_type ? ` err=${p.error_type}` : '';
const msg = p.error_message ? ` msg=${p.error_message}` : '';
const actor = `${p.actor_type || 'actor'}:${p.actor_id || 'unknown'}`;
return `${ts} ${ok.padEnd(4)} ${res} ${act} ${actor}${status}${latency}${model}${tokens}${cost}${err}${msg}`;
}
These turn structured events into scannable log lines—terminal-style output for operators who need to see what’s happening now.
Stream Selection (One Connection)
The current GuardrailsStream.tsx UI connects to one stream at a time (audit, decisions, provider, security, policy changes, or rule changes), based on a streamKind selector:
const stream =
streamKind === 'decisions'
? guardrailsStreamPolicyDecisions(decisionFilters, cfg, controller.signal)
: streamKind === 'provider'
? guardrailsStreamProviderCalls(providerFilters, cfg, controller.signal)
: streamKind === 'security'
? guardrailsStreamSecurityEvents(securityFilters, cfg, controller.signal)
: streamKind === 'policy'
? guardrailsStreamPolicyChanges(policyChangeFilters, cfg, controller.signal)
: streamKind === 'rule'
? guardrailsStreamRuleChanges(ruleChangeFilters, cfg, controller.signal)
: guardrailsStreamAuditLogs(auditFilters, cfg, controller.signal);
for await (const evt of stream) {
// format and append terminal lines
}
An AbortController handles clean teardown on disconnect or when switching streams.
Why SSE Over WebSockets
Server-Sent Events are a deliberate choice:
- HTTP/2 multiplexing: Multiple streams share one TCP connection
- Simple reconnection: the UI reconnects explicitly (Connect/Reconnect) and tears down via
AbortController - Firewall-friendly: Plain HTTP, no upgrade handshake
- Unidirectional: The server pushes; the client listens. No bidirectional complexity.
For observability—where the server has information and the client wants to see it—SSE is the right primitive.
Query Parameters
The streams accept filtering parameters:
const params = new URLSearchParams({
chain_id: 'global',
entity_type: 'policy',
actor_id: 'admin@hexarch.io',
tail: '100', // Last N events
poll_ms: '1000' // Server-side poll interval
});
The tail parameter is particularly useful: start with the last 100 events, then stream new ones as they arrive. No cold start, no missing context.
Error Recovery
When a stream fails, the component stores the formatted error and switches status to error. The UI renders it inline above the stream controls, and the operator can hit Connect/Reconnect:
try {
for await (const evt of stream) {
// ...
}
} catch (e) {
setError(formatError(e));
setStatus('error');
}
{error && <div className="gr-error">{error}</div>}
The async generator surfaces errors; the UI decides how to present them.
Try It
The stream UI is available at /guardrails/stream. Select which event types to monitor, watch the live feed, and see how decisions, policy changes, and provider calls flow through the system in real-time.
If you’re running the Guardrails API locally, trigger some authorization requests and watch them appear instantly. That’s the difference between polling and streaming.