Quick Start (5 minutes)

Get EdgeStream up and running with minimal setup.

1

Install

Add EdgeStream to your project:

npm install edge-stream-js
2

Create Stream

Initialize with your server:

const stream = createEdgeStream() .register({ type: 'bas', url: 'wss://server.example.com' }) .build();
3

Start & Subscribe

Listen for messages:

await stream.start(); stream.subscribe('bas', 'user.*', (envelope) => { console.log(envelope.body); });
4

Use in React

Wrap your app:

<EdgeStreamProvider stream={stream}> <App /> </EdgeStreamProvider>
5

Connect Components

Access messages in components:

function Messages() { const msgs = useMessages('bas', '*'); return msgs.map(m => <Msg />); }
6

Done!

Messages flowing in real-time. Handle high throughput, multiple transports, protocol normalization—all built-in.

Complete Minimal Example

A full working example with all the basics:

import { createEdgeStream, EdgeStreamProvider, useMessages } from 'edge-stream-js';
import React from 'react';
import ReactDOM from 'react-dom';

// 1. Create stream
const stream = createEdgeStream({ logLevel: 'info' })
  .register({
    type: 'server',
    url: 'https://api.example.com/realtime'
  })
    .transport({ type: 'signalr' })
    .protocol('json')
    .incoming()
      .add(new LogHook())
      .add(new PublishToSubscribersHook())
    .done()
  .done()
  .build();

// 2. Wrap app with provider
ReactDOM.createRoot(document.getElementById('root')).render(
  <EdgeStreamProvider stream={stream}>
    <App />
  </EdgeStreamProvider>
);

// 3. Use in component
function App() {
  const messages = useMessages('server', 'notification.*');
  const status = useConnectionStatus('server');

  if (status.connecting) return <div>Connecting...</div>;
  if (status.error) return <div>Error: {status.error.message}</div>;

  return (
    <div>
      <h1>Messages ({messages.length})</h1>
      {messages.map(msg => (
        <div key={msg.meta.id}>
          <p>{JSON.stringify(msg.body)}</p>
        </div>
      ))}
    </div>
  );
}

Configuration Guide

Transport Options

// SignalR (recommended for .NET backends)
.transport({
  type: 'signalr',
  reconnect: { enabled: true, maxAttempts: 5, backoffMs: 1000 }
})

// HTTP Polling (reliable fallback)
.transport({
  type: 'http-polling',
  intervalMs: 1000,
  timeoutMs: 5000
})

// Server-Sent Events (efficient for server push)
.transport({
  type: 'sse',
  reconnect: { enabled: true, maxAttempts: Infinity }
})

// Raw WebSocket (maximum control)
.transport({
  type: 'websocket',
  reconnect: { enabled: false }
})

Protocol Options

// Simple JSON (default, no overhead)
.protocol('json')

// DIDComm v2 (W3C standard, encrypted)
.protocol('didcomm-v2')

// CloudEvents (CNCF standard, structured)
.protocol('cloudevents')

// Custom protocol
.protocol('custom', new MyCustomParser())

Pipeline Hooks

.incoming()
  // Logging
  .add(new LogHook({ label: 'API' }))

  // Security
  .add(new DecryptHook({ privateKeyId: 'key-1' }))
  .add(new VerifySignatureHook({ trustedIssuers: [...] }))

  // Validation
  .add(new ValidationHook({ schema: jsonSchema }))

  // Filtering
  .add(new FilterHook({
    predicate: (envelope) => envelope.body.type !== 'heartbeat'
  }))

  // Acknowledgment
  .add(new AcknowledgeHook({ method: 'receipt' }))

  // Interactive (for approval workflows)
  .add(new RenderToFormHook({ formRegistry }))

  // Terminal hook (required)
  .add(new PublishToSubscribersHook())
.done()

Advanced Patterns

Multiple Servers

const stream = createEdgeStream()
  // Business Activity Server
  .register({ type: 'bas', url: 'https://bas.example.com/hub' })
    .transport({ type: 'signalr' })
    .protocol('json')
    .incoming()
      .add(new LogHook())
      .add(new PublishToSubscribersHook())
    .done()
  .done()

  // Chat Server
  .register({ type: 'chat', url: 'https://chat.example.com/events' })
    .transport({ type: 'sse' })
    .protocol('json')
    .incoming()
      .add(new FilterHook({
        predicate: m => m.body.type !== 'typing'
      }))
      .add(new PublishToSubscribersHook())
    .done()
  .done()

  .build();

// Subscribe to different topics per server
stream.subscribe('bas', 'workflow.*', (envelope) => { ... });
stream.subscribe('chat', 'message.*', (envelope) => { ... });

Approval Workflows

// Listen for pipeline pause events
stream.on('pipeline:paused', ({ pauseToken, request }) => {
  // Show approval dialog
  showApprovalDialog(request.prompt)
    .then(decision => {
      // Resume pipeline with user decision
      pauseToken.resume({ approved: decision });
    });
});

// Component renders approval form
function ApprovalForm({ request, onSubmit }) {
  return (
    <dialog open>
      <h2>Approval Required</h2>
      <p>{request.prompt}</p>
      <button onClick={() => onSubmit(true)}>Approve</button>
      <button onClick={() => onSubmit(false)}>Reject</button>
    </dialog>
  );
}

Custom Hook

// Create custom hook
class TransformHook implements IHook {
  readonly name = 'transform';
  readonly priority = 50;

  async execute(context: IPipelineContext): Promise {
    // Transform message body
    if (context.body.type === 'raw') {
      context.body = JSON.parse(context.body.data);
    }
    return { continue: true };
  }
}

// Add to pipeline
.incoming()
  .add(new LogHook())
  .add(new TransformHook())  // ← Your custom hook
  .add(new PublishToSubscribersHook())
.done()

Scalability (Optional Phase 10)

For applications handling 100+ msg/sec, enable optional scalability features:

const stream = createEdgeStream({
  logLevel: 'info',
  // NEW: Scalability configuration
  scalability: {
    // Queue: how many messages can wait before backpressure
    messageQueueSize: 1000,
    maxConcurrentHooksPerServer: 10,

    // Message cache: LRU bounds
    messageStoreMaxPerTopic: 100,
    messageStoreMaxTotal: 10000,
    messageStoreEvictionPolicy: 'lru',

    // React batching: group updates over time
    reactUpdateBatchIntervalMs: 100,
    reactUpdateBatchSize: 50,

    // Worker threads: offload CPU work
    enableWorkerThreads: true,
    workerPoolSize: navigator.hardwareConcurrency / 2
  }
})
  .register({...})
  .build();

With scalability enabled:

Common Patterns

Error Handling

function App() {
  const status = useConnectionStatus('server');

  // Monitor connection status
  if (status.error) {
    return (
      <ErrorBanner
        error={status.error}
        retrying={status.reconnecting}
      />
    );
  }

  // Show connecting state
  if (status.connecting) {
    return <LoadingSpinner />;
  }

  // Connected
  return <MainContent />;
}

Message Filtering

// Backend sends all prices
// Frontend only cares about significant changes
.add(new FilterHook({
  predicate: (envelope) => {
    const priceChange = Math.abs(envelope.body.change);
    return priceChange > 1.0;  // Only > 1% changes
  }
}))

// Or filter in component
function Dashboard() {
  const allMessages = useMessages('server', 'price.*');
  const importantOnly = allMessages
    .filter(m => Math.abs(m.body.change) > 1.0);

  return <PriceGrid prices={importantOnly} />;
}

Topic-Based Routing

// Wildcard subscriptions
stream.subscribe('bas', 'user.*', handleUserEvent);
stream.subscribe('bas', 'order.*', handleOrderEvent);
stream.subscribe('bas', 'notification.*', handleNotification);

// Or subscribe to all
stream.subscribe('bas', '*', handleAll);

// Component-level filtering
function UserPanel() {
  const userEvents = useMessages('bas', 'user.*');
  const userId = getCurrentUserId();

  // Filter to current user only
  const myEvents = userEvents.filter(
    e => e.body.userId === userId
  );

  return <UserMessages events={myEvents} />;
}

Next Steps

Frequently Asked Questions

Can I use EdgeStream without React?

Yes! The core library is framework-agnostic. React hooks are optional. You can use the vanilla API anywhere: Node.js, vanilla JS, Vue, Svelte, etc.

What if my server doesn't support any of the 4 transports?

Extend it! Create a custom transport by implementing the `ITransport` interface. Roughly 50 lines of code for most transport types.

How do I upgrade from Phase 1 to Phase 10?

No breaking changes. Phase 10 adds optional scalability features. Existing code keeps working. Just enable scalability config to use workers, batching, etc.

Can I use multiple protocols from different servers?

Absolutely. One server sends JSON, another sends CloudEvents, a third sends custom format. ProtocolParserRegistry auto-detects per message. Same application code.

Is TypeScript required?

No, but recommended. EdgeStream is written in TypeScript and fully typed. You can use JavaScript, but you lose type safety benefits.

What about message ordering?

FIFO within a topic. Messages published to `user.create`, then `user.update` arrive in that order. Topics are independent.

Can I send messages back to the server?

Yes! Use `stream.send('serverId', 'topic', body)`. Outgoing pipeline applies encryption, signing, retry logic.

How much does EdgeStream cost?

Open source and free! MIT licensed. Use in any project, commercial or personal.

Ready to Build?

Start with the npm package or clone the GitHub repo

View on GitHub