Workflow Orchestration Architecture

EdgeStream enables server-driven workflow automation where business logic orchestrates user interfaces in real-time. The 6-layer architecture handles bi-directional communication between servers, UIs, and external systems.

graph TB A["๐ŸŒ Transport
SignalR/SSE"] -->|Raw| B["๐Ÿ“„ Parser
JSON/DIDComm"] B -->|Envelope| C["โš™๏ธ Pipeline
Hooks"] C -->|Context| D["๐Ÿ“Š Subscriptions"] D -->|Data| E["๐Ÿ’พ Stores"] E -->|Updates| F["โš›๏ธ React"]

Core Layers Explained

1. Transport Layer

Handles real-time communication with remote servers via multiple protocols.

SignalR

Bidirectional communication with automatic fallback. Best for persistent connections.

HTTP Polling

Client-side polling with exponential backoff. Reliable for any environment.

Server-Sent Events

Unidirectional server push. Efficient for broadcast scenarios.

WebSocket

Raw WebSocket for maximum control and minimal overhead.

// Transport is abstracted - same API for all transports
interface ITransport {
  connect(): Promise<void>;
  disconnect(): Promise<void>;
  send(data: string | ArrayBuffer): Promise<void>;
  onMessage(handler: (raw) => void): () => void;
  readonly status: TransportStatus;
}

2. Protocol Parser Layer

Normalizes different message formats into a unified internal representation.

JSON

Simple, universal format. No overhead.

DIDComm v2

W3C standard for decentralized messaging. Includes encryption & signatures.

CloudEvents

CNCF standard for event interchange. Great for cloud-native apps.

Custom

Extend with your own protocol parser. Full control.

// Parsers are auto-detected via canHandle()
interface IProtocolParser {
  readonly name: string;
  parse(raw: string | ArrayBuffer): ParsedMessage;
  serialize(body: unknown): string | ArrayBuffer;
  canHandle?(raw: string | ArrayBuffer): boolean;
}

3. Pipeline Engine

Composable hook architecture where each hook transforms the message.

graph LR A["๐Ÿ“จ Msg"] -->|10| B["Log"] B -->|20| C["Validate"] C -->|30| D["Decrypt"] D -->|40| E["Verify"] E -->|50| F["Filter"] F -->|60| G["Ack"] G -->|70| H["Interactive"] H -->|200| I["๐Ÿ“ค Publish"]
// Hooks execute in priority order
interface IHook {
  readonly name: string;
  readonly priority: number; // lower = runs first
  execute(context: IPipelineContext): Promise<HookResult>;
}

// Context is mutable - hooks transform the message
interface IPipelineContext {
  body: unknown; // hooks modify this
  metadata: Record<string, unknown>; // inter-hook communication
  abort(reason?: string): void;
  pause(request): IPipelinePause; // for interactive hooks
}

4. Subscription Manager

Topic-based pub/sub routing with wildcard support.

// Subscribe to specific topics or use wildcards
const sub = stream.subscribe('bas', 'form.*', (envelope) => {
  console.log(envelope.body);
});

// Wildcard patterns supported
stream.subscribe('chat', '*', callback);        // all chat messages
stream.subscribe('bas', 'approval.*', callback); // workflow approvals

5. State Management (Zustand)

Reactive state stores for React components.

connectionStore

Per-server transport status (connected, connecting, error). Persisted to localStorage.

messageStore

Message history by topic (LRU-bounded). Ephemeral, cleared on reload.

streamStore

Global stream status and lifecycle events.

6. React Bindings

Optional React hooks for component integration.

// React hooks access Zustand stores
function ChatComponent() {
  const messages = useMessages('chat', 'message.*');
  const status = useConnectionStatus('chat');
  const send = useEdgeStreamSend();

  if (status.connecting) return <Loading />;
  if (status.error) return <Error message={status.error} />;

  return (
    <>
      {messages.map(msg => <Message key={msg.id} envelope={msg} />)}
      <Input onSend={(text) => send('chat', 'message.send', { text })} />
    </>
  );
}

Workflow Orchestration Pattern

EdgeStream enables three key orchestration patterns that drive business automation:

๐ŸŽฏ Pattern 1: Workflows Drive Your UI

Server-side business logic determines what the user sees and when. As a workflow progresses, EdgeStream sends messages to update the UI: show a form, hide it, display status, enable/disable buttons, etc.

// Server orchestrates user experience
workflow.onApprovalStep = () => {
  send('ui', 'show-approval-form', {
    formId: 'approval',
    data: { amount: 5000 }
  });
}

workflow.onApprovalReceived = (decision) => {
  if (decision.approved) {
    send('ui', 'show-success', { message: 'Approved!' });
    orchestrate('payment-processor', {...});
  }
}

// React component listens
function App() {
  const [currentForm, setCurrentForm] = useState(null);
  const messages = useMessages('server', '*');

  useEffect(() => {
    messages.forEach(msg => {
      if (msg.body.action === 'show-approval-form') {
        setCurrentForm(msg.body.formId);
      }
    });
  }, [messages]);
}

๐Ÿ”„ Pattern 2: Server Queries Your UI

Workflows pause automatically to request information from the user. EdgeStream sends a message asking for input, waits for response, then continues the workflow with the provided data.

// Server pauses workflow and asks for input
const userInput = await stream.queryUI('payment', {
  type: 'form',
  prompt: 'Select payment method',
  schema: paymentMethodSchema
});

// Client-side pause handler
stream.on('pipeline:paused', ({ pauseToken, request }) => {
  if (request.prompt === 'Select payment method') {
    showPaymentModal()
      .then(selection => {
        pauseToken.resume({ method: selection });
        // Server continues with selection
      });
  }
});

// Workflow continues with user's choice
workflow.onPaymentMethodSelected = (method) => {
  if (method === 'wallet') {
    orchestrate('web3-service', { action: 'connect-wallet' });
  }
}

๐ŸŒ Pattern 3: Multi-System Integration

Workflows coordinate across disconnected systems: Web3 wallets, payment processors, banks, and APIs. EdgeStream handles orchestration, message correlation, and response handling.

// Orchestrate across multiple services
async function executeComplexWorkflow() {
  // 1. Collect user data from UI
  const userData = await stream.queryUI('form', {
    type: 'form',
    prompt: 'Enter recipient address'
  });

  // 2. Connect to Web3 wallet
  const walletResponse = await stream.send('web3', 'connect-wallet', {
    chainId: 1,
    connectorId: 'metamask'
  });

  // 3. Process payment
  const paymentResult = await stream.send('payment', 'execute-transfer', {
    to: userData.address,
    amount: userData.amount,
    wallet: walletResponse.address
  });

  // 4. Notify user of completion
  stream.send('ui', 'show-confirmation', {
    txHash: paymentResult.transactionHash,
    status: 'success'
  });
}

// All messages are correlated and auditable
// workflow.executionId links all messages in sequence

Message Flow (Incoming)

Step-by-step journey of a message from transport to subscriber:

1

Transport Receives

Message arrives via SignalR/SSE/WebSocket/HTTP

raw: ArrayBuffer | string
โ†“
2

Protocol Detection

ProtocolParserRegistry tries each parser's canHandle() method

Selected: IProtocolParser
โ†“
3

Envelope Created

Parser normalizes to IEnvelope with meta, body, attributes

IEnvelope<TBody>
โ†“
4

Pipeline Context

IPipelineContext created (mutable body, metadata bag)

context.body = raw data
โ†“
5

Hooks Execute

IncomingPipeline runs each hook in priority order

[LogHook โ†’ ValidationHook โ†’ DecryptHook โ†’ ...]
โ†“
6

Publish to Subscribers

PublishToSubscribersHook routes by topic

subscriptionManager.publish(topic, envelope)
โ†“
7

Store Updates

messageStore adds to LRU cache, Zustand subscribers notified

messageStore.addMessage()
โ†“
8

React Re-render

useMessages() hook triggers component update

<Component> receives new messages

Interactive Hooks (Pause & Resume)

For workflows requiring user input (forms, approvals, confirmations):

sequenceDiagram participant Transport as ๐ŸŒ Transport participant Pipeline as โš™๏ธ Pipeline participant Hook as ๐Ÿ‘ค Interactive Hook participant UI as ๐ŸŽจ React UI participant Sub as ๐Ÿ“ค Subscriber Transport->>Pipeline: Message received Pipeline->>Hook: execute() Hook->>Pipeline: pause(request) Pipeline->>UI: EventBus: pipeline:paused UI->>UI: Render dialog/form UI->>Hook: pauseToken.resume(userInput) Hook->>Pipeline: continue Pipeline->>Sub: PublishHook routes Sub->>Sub: Callback invoked

Example: Approval workflow with forms

// Setup pipeline with interactive hook
.incoming()
  .add(new LogHook())
  .add(new ValidationHook())
  .add(new RenderToFormHook({ formRegistry }))  // โ† Interactive
  .add(new PublishToSubscribersHook())
.done()

// In React component, listen for pause event
stream.on('pipeline:paused', ({ pauseToken, request }) => {
  // Show approval form to user
  showApprovalForm(request).then(input => {
    pauseToken.resume(input);
    // Pipeline continues with user's decision
  });
});

Scalability Architecture (Phase 10)

Optional advanced features for 100+ msg/sec throughput:

graph TB A["๐Ÿš€ Transport
100+/sec"] -->|enqueue| B["๐Ÿšฆ Queue
Backpressure"] B -->|dequeue| C["โš™๏ธ Pipeline"] C -->|crypto| D["๐Ÿ”ง Workers"] D -->|result| E["๐Ÿ’พ LRU Cache"] E -->|store| F["๐Ÿ“Š Store"] F -->|batch| G["๐Ÿ“ฆ Batch"] G -->|React| H["โš›๏ธ UI"]

๐Ÿšฆ Bounded Queue

Prevents memory overflow. When queue fills, transport is signaled to slow down (backpressure).

โš™๏ธ Worker Thread Pool

CPU-intensive operations (decryption, signature verification) offloaded to worker threads. Main thread stays responsive.

๐Ÿ’พ LRU Message Cache

Memory-bounded cache with automatic eviction. Prevents OOM from 360,000 messages/hour.

๐Ÿ“Š React Batching

Groups updates over time window. 100 messages/sec โ†’ 10 batch updates/sec (10x fewer renders).

Design Principles

๐ŸŽฏ Single Responsibility

Each file has exactly one reason to change. Transports don't know about hooks. Parsers don't know about pipelines.

๐Ÿ”Œ Composable

Build complex behaviors by chaining simple, reusable hooks. Mix and match for your use case.

๐Ÿ” Framework Agnostic

Pure TypeScript core works anywhere. Optional React bindings are just convenience.

๐Ÿ“ˆ Observable

Full event bus for lifecycle events. Hook into pipeline:paused, connection:status, etc.

โšก Memory Safe

LRU cache prevents memory leaks at scale. Bounded queues prevent runaway growth.

๐Ÿงช Testable

All dependencies injected. Mock transports and parsers for unit testing.