Build your first real-time message pipeline in minutes
Get EdgeStream up and running with minimal setup.
Add EdgeStream to your project:
npm install edge-stream-js
Initialize with your server:
const stream = createEdgeStream()
.register({
type: 'bas',
url: 'wss://server.example.com'
})
.build();
Listen for messages:
await stream.start();
stream.subscribe('bas', 'user.*',
(envelope) => {
console.log(envelope.body);
});
Wrap your app:
<EdgeStreamProvider stream={stream}>
<App />
</EdgeStreamProvider>
Access messages in components:
function Messages() {
const msgs = useMessages('bas', '*');
return msgs.map(m => <Msg />);
}
Messages flowing in real-time. Handle high throughput, multiple transports, protocol normalization—all built-in.
A full working example with all the basics:
import { createEdgeStream, EdgeStreamProvider, useMessages } from 'edge-stream-js';
import React from 'react';
import ReactDOM from 'react-dom';
// 1. Create stream
const stream = createEdgeStream({ logLevel: 'info' })
.register({
type: 'server',
url: 'https://api.example.com/realtime'
})
.transport({ type: 'signalr' })
.protocol('json')
.incoming()
.add(new LogHook())
.add(new PublishToSubscribersHook())
.done()
.done()
.build();
// 2. Wrap app with provider
ReactDOM.createRoot(document.getElementById('root')).render(
<EdgeStreamProvider stream={stream}>
<App />
</EdgeStreamProvider>
);
// 3. Use in component
function App() {
const messages = useMessages('server', 'notification.*');
const status = useConnectionStatus('server');
if (status.connecting) return <div>Connecting...</div>;
if (status.error) return <div>Error: {status.error.message}</div>;
return (
<div>
<h1>Messages ({messages.length})</h1>
{messages.map(msg => (
<div key={msg.meta.id}>
<p>{JSON.stringify(msg.body)}</p>
</div>
))}
</div>
);
}
// SignalR (recommended for .NET backends)
.transport({
type: 'signalr',
reconnect: { enabled: true, maxAttempts: 5, backoffMs: 1000 }
})
// HTTP Polling (reliable fallback)
.transport({
type: 'http-polling',
intervalMs: 1000,
timeoutMs: 5000
})
// Server-Sent Events (efficient for server push)
.transport({
type: 'sse',
reconnect: { enabled: true, maxAttempts: Infinity }
})
// Raw WebSocket (maximum control)
.transport({
type: 'websocket',
reconnect: { enabled: false }
})
// Simple JSON (default, no overhead)
.protocol('json')
// DIDComm v2 (W3C standard, encrypted)
.protocol('didcomm-v2')
// CloudEvents (CNCF standard, structured)
.protocol('cloudevents')
// Custom protocol
.protocol('custom', new MyCustomParser())
.incoming()
// Logging
.add(new LogHook({ label: 'API' }))
// Security
.add(new DecryptHook({ privateKeyId: 'key-1' }))
.add(new VerifySignatureHook({ trustedIssuers: [...] }))
// Validation
.add(new ValidationHook({ schema: jsonSchema }))
// Filtering
.add(new FilterHook({
predicate: (envelope) => envelope.body.type !== 'heartbeat'
}))
// Acknowledgment
.add(new AcknowledgeHook({ method: 'receipt' }))
// Interactive (for approval workflows)
.add(new RenderToFormHook({ formRegistry }))
// Terminal hook (required)
.add(new PublishToSubscribersHook())
.done()
const stream = createEdgeStream()
// Business Activity Server
.register({ type: 'bas', url: 'https://bas.example.com/hub' })
.transport({ type: 'signalr' })
.protocol('json')
.incoming()
.add(new LogHook())
.add(new PublishToSubscribersHook())
.done()
.done()
// Chat Server
.register({ type: 'chat', url: 'https://chat.example.com/events' })
.transport({ type: 'sse' })
.protocol('json')
.incoming()
.add(new FilterHook({
predicate: m => m.body.type !== 'typing'
}))
.add(new PublishToSubscribersHook())
.done()
.done()
.build();
// Subscribe to different topics per server
stream.subscribe('bas', 'workflow.*', (envelope) => { ... });
stream.subscribe('chat', 'message.*', (envelope) => { ... });
// Listen for pipeline pause events
stream.on('pipeline:paused', ({ pauseToken, request }) => {
// Show approval dialog
showApprovalDialog(request.prompt)
.then(decision => {
// Resume pipeline with user decision
pauseToken.resume({ approved: decision });
});
});
// Component renders approval form
function ApprovalForm({ request, onSubmit }) {
return (
<dialog open>
<h2>Approval Required</h2>
<p>{request.prompt}</p>
<button onClick={() => onSubmit(true)}>Approve</button>
<button onClick={() => onSubmit(false)}>Reject</button>
</dialog>
);
}
// Create custom hook
class TransformHook implements IHook {
readonly name = 'transform';
readonly priority = 50;
async execute(context: IPipelineContext): Promise {
// Transform message body
if (context.body.type === 'raw') {
context.body = JSON.parse(context.body.data);
}
return { continue: true };
}
}
// Add to pipeline
.incoming()
.add(new LogHook())
.add(new TransformHook()) // ← Your custom hook
.add(new PublishToSubscribersHook())
.done()
For applications handling 100+ msg/sec, enable optional scalability features:
const stream = createEdgeStream({
logLevel: 'info',
// NEW: Scalability configuration
scalability: {
// Queue: how many messages can wait before backpressure
messageQueueSize: 1000,
maxConcurrentHooksPerServer: 10,
// Message cache: LRU bounds
messageStoreMaxPerTopic: 100,
messageStoreMaxTotal: 10000,
messageStoreEvictionPolicy: 'lru',
// React batching: group updates over time
reactUpdateBatchIntervalMs: 100,
reactUpdateBatchSize: 50,
// Worker threads: offload CPU work
enableWorkerThreads: true,
workerPoolSize: navigator.hardwareConcurrency / 2
}
})
.register({...})
.build();
With scalability enabled:
function App() {
const status = useConnectionStatus('server');
// Monitor connection status
if (status.error) {
return (
<ErrorBanner
error={status.error}
retrying={status.reconnecting}
/>
);
}
// Show connecting state
if (status.connecting) {
return <LoadingSpinner />;
}
// Connected
return <MainContent />;
}
// Backend sends all prices
// Frontend only cares about significant changes
.add(new FilterHook({
predicate: (envelope) => {
const priceChange = Math.abs(envelope.body.change);
return priceChange > 1.0; // Only > 1% changes
}
}))
// Or filter in component
function Dashboard() {
const allMessages = useMessages('server', 'price.*');
const importantOnly = allMessages
.filter(m => Math.abs(m.body.change) > 1.0);
return <PriceGrid prices={importantOnly} />;
}
// Wildcard subscriptions
stream.subscribe('bas', 'user.*', handleUserEvent);
stream.subscribe('bas', 'order.*', handleOrderEvent);
stream.subscribe('bas', 'notification.*', handleNotification);
// Or subscribe to all
stream.subscribe('bas', '*', handleAll);
// Component-level filtering
function UserPanel() {
const userEvents = useMessages('bas', 'user.*');
const userId = getCurrentUserId();
// Filter to current user only
const myEvents = userEvents.filter(
e => e.body.userId === userId
);
return <UserMessages events={myEvents} />;
}
Yes! The core library is framework-agnostic. React hooks are optional. You can use the vanilla API anywhere: Node.js, vanilla JS, Vue, Svelte, etc.
Extend it! Create a custom transport by implementing the `ITransport` interface. Roughly 50 lines of code for most transport types.
No breaking changes. Phase 10 adds optional scalability features. Existing code keeps working. Just enable scalability config to use workers, batching, etc.
Absolutely. One server sends JSON, another sends CloudEvents, a third sends custom format. ProtocolParserRegistry auto-detects per message. Same application code.
No, but recommended. EdgeStream is written in TypeScript and fully typed. You can use JavaScript, but you lose type safety benefits.
FIFO within a topic. Messages published to `user.create`, then `user.update` arrive in that order. Topics are independent.
Yes! Use `stream.send('serverId', 'topic', body)`. Outgoing pipeline applies encryption, signing, retry logic.
Open source and free! MIT licensed. Use in any project, commercial or personal.
Start with the npm package or clone the GitHub repo
View on GitHub