Flux

WebSockets

WebSocket subscriptions and fanout

Implemented package:

packages/websocket

The package provides:

  • WebSocketHub for subscriptions and fanout.
  • createWebSocketServer() for a Bun-backed /ws endpoint.

Endpoint

GET /ws

Subscribe

Public channel:

{
  "op": "subscribe",
  "channel": "trades",
  "market": "BTC-PERP"
}

Private channel:

{
  "op": "subscribe",
  "channel": "positions",
  "token": "jwt"
}

Unsubscribe

{
  "op": "unsubscribe",
  "channel": "orderbook",
  "market": "BTC-PERP"
}

Channels

trades:{market}
orderbook:{market}
mark_price:{market}
funding:{market}
positions:{userId}

positions is private and requires successful authentication. The server uses the authenticated user id; clients cannot subscribe to another user's position topic by passing a user id.

Messages

Subscribed:

{
  "type": "subscribed",
  "channel": "trades",
  "topic": "trades:BTC-PERP"
}

Update:

{
  "type": "update",
  "channel": "trades",
  "topic": "trades:BTC-PERP",
  "sequence": 123,
  "data": {
    "tradeId": "trade-1"
  }
}

Snapshot:

{
  "type": "snapshot",
  "topic": "orderbook:BTC-PERP",
  "sequence": 100,
  "data": {
    "bids": [],
    "asks": []
  }
}

Resync:

{
  "type": "resync",
  "topic": "orderbook:BTC-PERP",
  "reason": "sequence gap"
}

Error:

{
  "type": "error",
  "reason": "bad token"
}

Orderbook Flow

  1. Client subscribes to orderbook:{market}.
  2. Server sends a current snapshot.
  3. Server sends sequence-numbered deltas.
  4. If a sequence gap is detected, server sends resync.
  5. Client discards local book and resubscribes or requests a fresh snapshot.

Redis Integration

The hub remains transport-agnostic. Production runtime code now writes engine.events.{market} and price.updated to Redis Streams; a websocket fanout process can read those streams and call hub.publish(...) without coupling subscription state to Redis client code.

On this page