Examples

Real-world patterns you can copy directly into your project. Every example is TypeScript strict and tested against production EdgeStream deployments.

Basic pub/sub

Ex1
Wildcard subscription — all events for a user

Subscribe to every event for a specific user using wildcard topic notation. Useful for notification centres and activity feeds.

import { EdgeStream, HubServer } from 'edge-stream-js'; const edgeStream = new EdgeStream(); edgeStream.registerServer(new HubServer({ url: '/hub' })); await edgeStream.start(); // Subscribe to everything for user-123 const sub = edgeStream.subscribe('user:123:*', (envelope) => { console.log('[user:123 event]', envelope.meta.topic, envelope.body); }); // Publishes that match: // ✅ 'user:123:message' // ✅ 'user:123:notification' // ✅ 'user:123:order-update' // ❌ 'user:456:message' — different user // ❌ 'order:123:status' — different root // Unsubscribe when component unmounts sub.unsubscribe();
Ex2
Request-reply — waiting for a server response

Subscribe to a reply topic before publishing. The server processes the message and publishes a response to the reply topic — same pipeline, zero custom infrastructure.

async function requestOrderStatus(orderId: string): Promise<OrderStatus> { const replyTopic = `order:${orderId}:status:reply:${Date.now()}`; return new Promise((resolve, reject) => { const timeout = setTimeout(() => { sub.unsubscribe(); reject(new Error('Request timed out')); }, 5000); // 1. Subscribe to the reply topic first const sub = edgeStream.subscribe(replyTopic, (envelope) => { clearTimeout(timeout); sub.unsubscribe(); resolve(envelope.body as OrderStatus); }); // 2. Publish the request with the reply topic attached edgeStream.publish({ topic: 'order:status:request', body: { orderId }, meta: { source: 'web-client', correlationId: replyTopic // Server reads this to know where to reply } }); }); } const status = await requestOrderStatus('order-4521'); console.log('Order status:', status.state, status.estimatedDelivery);

Custom hooks

Ex3
Incoming hook — enrich messages with user data

Register an incoming hook that fetches user data from your API and attaches it to every message. Every downstream subscriber sees the enriched message — no duplication.

import type { IHook, IEnvelope } from 'edge-stream-js'; class UserEnrichmentHook implements IHook { name = 'user-enrichment'; priority = 30; // Runs after validation (10) and normalization (20) private cache = new Map<string, UserProfile>(); async execute(envelope: IEnvelope): Promise<IEnvelope> { const userId = envelope.meta.userId; if (!userId) return envelope; // No-op if no user on message // Cache to avoid redundant API calls if (!this.cache.has(userId)) { const profile = await fetchUserProfile(userId); this.cache.set(userId, profile); } const profile = this.cache.get(userId)!; return { ...envelope, attributes: { ...envelope.attributes, userTier: profile.tier, // 'gold' | 'silver' | 'standard' userName: profile.displayName, permissions: profile.permissions } }; } } edgeStream.registerIncomingHook(new UserEnrichmentHook()); // Now every subscriber sees envelope.attributes.userTier automatically edgeStream.subscribe('messages:*', (envelope) => { console.log('Tier:', envelope.attributes?.userTier); // 'gold' console.log('User:', envelope.attributes?.userName); // 'Sarah Johnson' });
Ex4
Outgoing hook — add HMAC signature to every message

Register an outgoing hook that signs every outgoing message. The server verifies the signature — zero per-call boilerplate.

import { createHmac } from 'crypto'; class HmacSigningHook implements IHook { name = 'hmac-signing'; priority = 5; // Runs first — before logging private secret: string; constructor(secret: string) { this.secret = secret; } async execute(envelope: IEnvelope): Promise<IEnvelope> { const payload = JSON.stringify(envelope.body); const signature = createHmac('sha256', this.secret) .update(payload) .digest('hex'); return { ...envelope, headers: { ...envelope.headers, 'x-signature': `sha256=${signature}` } }; } } edgeStream.registerOutgoingHook( new HmacSigningHook(process.env.EDGE_STREAM_HMAC_SECRET!) );

Observability

Ex5
Query the activity log for a specific message

EdgeStream logs every hook execution and subscriber delivery. Query by messageId to get the complete lifecycle — timing, status, errors — for debugging and compliance.

import { LoggingService } from 'edge-stream-js'; const logger = edgeStream.getLoggingService(); // Get complete lifecycle for a specific message const log = logger.getMessageLog('msg-uuid-here'); /* { id: 'msg-uuid-here', topic: 'messages:conversation:conv-123', status: 'delivered', createdAt: '2026-04-14T10:23:41.123Z', completedAt: '2026-04-14T10:23:41.187Z', // 64ms total hookLogs: [ { hookName: 'validation-hook', status: 'success', duration: 2 }, { hookName: 'normalization-hook', status: 'success', duration: 4 }, { hookName: 'user-enrichment', status: 'success', duration: 45 }, { hookName: 'log-hook', status: 'success', duration: 1 } ], subscriberDeliveries: [ { subscriberId: 'sub-chat-ui', status: 'delivered', processingTime: 8 }, { subscriberId: 'sub-audit-logger', status: 'delivered', processingTime: 3 } ] } */ // Performance stats const stats = logger.getPerformanceStats(); console.log('Avg latency:', stats.avgLatencyMs); console.log('Success rate:', stats.successRate); console.log('Slowest hook:', stats.slowestHook.name, stats.slowestHook.avgDuration);

React integration

Ex6
Live order tracking component

A complete React component that subscribes to order status updates and renders them in real-time. No polling, no custom state management — just a subscription and reactive state.

import { useSubscription } from '@edge-stream/react-hooks'; import { useState } from 'react'; interface OrderStatus { orderId: string; state: 'processing' | 'shipped' | 'delivered'; location?: string; eta?: string; } function OrderTracker({ orderId }: { orderId: string }) { const [status, setStatus] = useState<OrderStatus | null>(null); const [updates, setUpdates] = useState<string[]>([]); // Subscribes on mount, unsubscribes on unmount — automatic useSubscription( `order:${orderId}:status`, (envelope) => { const update = envelope.body as OrderStatus; setStatus(update); setUpdates(prev => [ `[${new Date().toLocaleTimeString()}] ${update.state}${update.location ? ` — ${update.location}` : ''}`, ...prev.slice(0, 9) // Keep last 10 ]); } ); if (!status) return <div>Waiting for order updates...</div>; return ( <div> <h3>Order #{status.orderId}</h3> <p>Status: <strong>{status.state}</strong></p> {status.eta && <p>ETA: {status.eta}</p>} <ul> {updates.map((u, i) => <li key={i}>{u}</li>)} </ul> </div> ); }
Ex7
Connection status indicator

Display connection state to users. EdgeStream emits structured connection events — no polling or manual ping logic required.

import { useConnectionStatus } from '@edge-stream/react-hooks'; function ConnectionBadge() { const { status, latency, reconnectAttempt } = useConnectionStatus(); const colours = { connected: '#22c55e', disconnected: '#ef4444', reconnecting: '#f59e0b', connecting: '#6366f1' }; return ( <div style={{ display: 'flex', alignItems: 'center', gap: 8 }}> <span style={{ width: 8, height: 8, borderRadius: '50%', background: colours[status] ?? '#6b7280' }} /> <span> {status === 'connected' && `Connected (${latency}ms)`} {status === 'reconnecting' && `Reconnecting (attempt ${reconnectAttempt})`} {status === 'disconnected' && 'Disconnected'} {status === 'connecting' && 'Connecting…'} </span> </div> ); }

AI agent integration

Ex8
LLM streaming — token-by-token delivery

Stream LLM token output to the UI via EdgeStream. The AI service publishes one token per message. The React hook accumulates them — real-time typing animation with no custom streaming protocol.

// AI service (backend) // Each LLM token published individually for await (const token of llm.streamComplete(prompt)) { await edgeStream.publish({ topic: `ai:conversation:${convId}:stream`, body: { token, done: false, convId, sessionId } }); } // Signal completion await edgeStream.publish({ topic: `ai:conversation:${convId}:stream`, body: { done: true, convId } }); // React component (frontend) function StreamingResponse({ convId }: { convId: string }) { const [text, setText] = useState(''); const [done, setDone] = useState(false); useSubscription( `ai:conversation:${convId}:stream`, (envelope) => { const { token, done: isDone } = envelope.body; if (isDone) { setDone(true); return; } setText(prev => prev + token); } ); return ( <div> {text}{!done && <span className="cursor">█</span>} </div> ); }

Ready to build?

Follow the get-started guide or explore the full architecture.