Deep dive into EdgeStream's layered, SRP-compliant design
EdgeStream enables server-driven workflow automation where business logic orchestrates user interfaces in real-time. The 6-layer architecture handles bi-directional communication between servers, UIs, and external systems.
Handles real-time communication with remote servers via multiple protocols.
Bidirectional communication with automatic fallback. Best for persistent connections.
Client-side polling with exponential backoff. Reliable for any environment.
Unidirectional server push. Efficient for broadcast scenarios.
Raw WebSocket for maximum control and minimal overhead.
// Transport is abstracted - same API for all transports
interface ITransport {
connect(): Promise<void>;
disconnect(): Promise<void>;
send(data: string | ArrayBuffer): Promise<void>;
onMessage(handler: (raw) => void): () => void;
readonly status: TransportStatus;
}
Normalizes different message formats into a unified internal representation.
Simple, universal format. No overhead.
W3C standard for decentralized messaging. Includes encryption & signatures.
CNCF standard for event interchange. Great for cloud-native apps.
Extend with your own protocol parser. Full control.
// Parsers are auto-detected via canHandle()
interface IProtocolParser {
readonly name: string;
parse(raw: string | ArrayBuffer): ParsedMessage;
serialize(body: unknown): string | ArrayBuffer;
canHandle?(raw: string | ArrayBuffer): boolean;
}
Composable hook architecture where each hook transforms the message.
// Hooks execute in priority order
interface IHook {
readonly name: string;
readonly priority: number; // lower = runs first
execute(context: IPipelineContext): Promise<HookResult>;
}
// Context is mutable - hooks transform the message
interface IPipelineContext {
body: unknown; // hooks modify this
metadata: Record<string, unknown>; // inter-hook communication
abort(reason?: string): void;
pause(request): IPipelinePause; // for interactive hooks
}
Topic-based pub/sub routing with wildcard support.
// Subscribe to specific topics or use wildcards
const sub = stream.subscribe('bas', 'form.*', (envelope) => {
console.log(envelope.body);
});
// Wildcard patterns supported
stream.subscribe('chat', '*', callback); // all chat messages
stream.subscribe('bas', 'approval.*', callback); // workflow approvals
Reactive state stores for React components.
Per-server transport status (connected, connecting, error). Persisted to localStorage.
Message history by topic (LRU-bounded). Ephemeral, cleared on reload.
Global stream status and lifecycle events.
Optional React hooks for component integration.
// React hooks access Zustand stores
function ChatComponent() {
const messages = useMessages('chat', 'message.*');
const status = useConnectionStatus('chat');
const send = useEdgeStreamSend();
if (status.connecting) return <Loading />;
if (status.error) return <Error message={status.error} />;
return (
<>
{messages.map(msg => <Message key={msg.id} envelope={msg} />)}
<Input onSend={(text) => send('chat', 'message.send', { text })} />
</>
);
}
EdgeStream enables three key orchestration patterns that drive business automation:
Server-side business logic determines what the user sees and when. As a workflow progresses, EdgeStream sends messages to update the UI: show a form, hide it, display status, enable/disable buttons, etc.
// Server orchestrates user experience
workflow.onApprovalStep = () => {
send('ui', 'show-approval-form', {
formId: 'approval',
data: { amount: 5000 }
});
}
workflow.onApprovalReceived = (decision) => {
if (decision.approved) {
send('ui', 'show-success', { message: 'Approved!' });
orchestrate('payment-processor', {...});
}
}
// React component listens
function App() {
const [currentForm, setCurrentForm] = useState(null);
const messages = useMessages('server', '*');
useEffect(() => {
messages.forEach(msg => {
if (msg.body.action === 'show-approval-form') {
setCurrentForm(msg.body.formId);
}
});
}, [messages]);
}
Workflows pause automatically to request information from the user. EdgeStream sends a message asking for input, waits for response, then continues the workflow with the provided data.
// Server pauses workflow and asks for input
const userInput = await stream.queryUI('payment', {
type: 'form',
prompt: 'Select payment method',
schema: paymentMethodSchema
});
// Client-side pause handler
stream.on('pipeline:paused', ({ pauseToken, request }) => {
if (request.prompt === 'Select payment method') {
showPaymentModal()
.then(selection => {
pauseToken.resume({ method: selection });
// Server continues with selection
});
}
});
// Workflow continues with user's choice
workflow.onPaymentMethodSelected = (method) => {
if (method === 'wallet') {
orchestrate('web3-service', { action: 'connect-wallet' });
}
}
Workflows coordinate across disconnected systems: Web3 wallets, payment processors, banks, and APIs. EdgeStream handles orchestration, message correlation, and response handling.
// Orchestrate across multiple services
async function executeComplexWorkflow() {
// 1. Collect user data from UI
const userData = await stream.queryUI('form', {
type: 'form',
prompt: 'Enter recipient address'
});
// 2. Connect to Web3 wallet
const walletResponse = await stream.send('web3', 'connect-wallet', {
chainId: 1,
connectorId: 'metamask'
});
// 3. Process payment
const paymentResult = await stream.send('payment', 'execute-transfer', {
to: userData.address,
amount: userData.amount,
wallet: walletResponse.address
});
// 4. Notify user of completion
stream.send('ui', 'show-confirmation', {
txHash: paymentResult.transactionHash,
status: 'success'
});
}
// All messages are correlated and auditable
// workflow.executionId links all messages in sequence
Step-by-step journey of a message from transport to subscriber:
Message arrives via SignalR/SSE/WebSocket/HTTP
raw: ArrayBuffer | string
ProtocolParserRegistry tries each parser's canHandle() method
Selected: IProtocolParser
Parser normalizes to IEnvelope with meta, body, attributes
IEnvelope<TBody>
IPipelineContext created (mutable body, metadata bag)
context.body = raw data
IncomingPipeline runs each hook in priority order
[LogHook โ ValidationHook โ DecryptHook โ ...]
PublishToSubscribersHook routes by topic
subscriptionManager.publish(topic, envelope)
messageStore adds to LRU cache, Zustand subscribers notified
messageStore.addMessage()
useMessages() hook triggers component update
<Component> receives new messages
For workflows requiring user input (forms, approvals, confirmations):
Example: Approval workflow with forms
// Setup pipeline with interactive hook
.incoming()
.add(new LogHook())
.add(new ValidationHook())
.add(new RenderToFormHook({ formRegistry })) // โ Interactive
.add(new PublishToSubscribersHook())
.done()
// In React component, listen for pause event
stream.on('pipeline:paused', ({ pauseToken, request }) => {
// Show approval form to user
showApprovalForm(request).then(input => {
pauseToken.resume(input);
// Pipeline continues with user's decision
});
});
Optional advanced features for 100+ msg/sec throughput:
Prevents memory overflow. When queue fills, transport is signaled to slow down (backpressure).
CPU-intensive operations (decryption, signature verification) offloaded to worker threads. Main thread stays responsive.
Memory-bounded cache with automatic eviction. Prevents OOM from 360,000 messages/hour.
Groups updates over time window. 100 messages/sec โ 10 batch updates/sec (10x fewer renders).
Each file has exactly one reason to change. Transports don't know about hooks. Parsers don't know about pipelines.
Build complex behaviors by chaining simple, reusable hooks. Mix and match for your use case.
Pure TypeScript core works anywhere. Optional React bindings are just convenience.
Full event bus for lifecycle events. Hook into pipeline:paused, connection:status, etc.
LRU cache prevents memory leaks at scale. Bounded queues prevent runaway growth.
All dependencies injected. Mock transports and parsers for unit testing.