Skip to main content

bezant_server/events/
connector.rs

1//! Long-lived WebSocket connector + REST-side handle.
2//!
3//! The connector is an actor that owns a single [`bezant::WsClient`] and
4//! drives it forever:
5//!
6//! 1. Connect (with exponential-backoff retry).
7//! 2. Subscribe to `orders` + `pnl` (always, on every connect).
8//! 3. Loop: dispatch frames into per-topic rings, accept
9//!    subscribe/unsubscribe commands for market data, watch a heartbeat
10//!    timeout to detect a stalled socket.
11//! 4. On any disconnect: bump `reset_epoch`, push a synthetic `gap` event
12//!    into every active ring, sleep with backoff, GOTO 1.
13//!
14//! [`EventsHandle`] is what axum handlers get. Cloneable, cheap, exposes
15//! reads against the rings and a command channel into the actor task.
16
17use std::collections::{BTreeMap, BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bezant::{MarketDataFields, WsClient, WsMessage};
22use serde_json::json;
23use tokio::sync::{mpsc, oneshot, RwLock};
24use tokio::time::{sleep, timeout};
25use tracing::{debug, info, warn};
26
27use super::persistence::{EventLog, RetentionPolicy};
28use super::ring::{ReadResult, TopicRing};
29use super::{EventsStatus, GapReason, ObservedEvent};
30
31/// Configurable knobs for the connector.
32#[derive(Clone, Debug)]
33pub struct ConnectorCfg {
34    /// Capacity of the `orders` ring.
35    pub orders_capacity: usize,
36    /// Capacity of the `pnl` ring.
37    pub pnl_capacity: usize,
38    /// Per-conid market-data ring capacity.
39    pub marketdata_capacity: usize,
40    /// Min reconnect backoff.
41    pub backoff_min: Duration,
42    /// Max reconnect backoff.
43    pub backoff_max: Duration,
44    /// If no frame arrives in this long, the connector tears the socket
45    /// down and reconnects (assumes the socket is dead, not idle).
46    pub heartbeat_timeout: Duration,
47    /// Idle market-data subs auto-unsubscribe after this long with no
48    /// `ensure_market_data` call (P2 — currently unused, plumbed for
49    /// forward compatibility).
50    pub marketdata_idle_unsubscribe: Duration,
51    /// Optional sqlite-backed historical event log. When set, every
52    /// pushed event is also appended here so `/events/{topic}/history`
53    /// can serve reads beyond ring capacity.
54    pub event_log: Option<Arc<EventLog>>,
55    /// Retention policy used by the periodic prune task.
56    pub retention: RetentionPolicy,
57    /// How often to run the prune task. Defaults to once per hour.
58    pub prune_every: Duration,
59}
60
61impl Default for ConnectorCfg {
62    fn default() -> Self {
63        Self {
64            orders_capacity: 1_000,
65            pnl_capacity: 5_000,
66            marketdata_capacity: 2_000,
67            backoff_min: Duration::from_secs(1),
68            backoff_max: Duration::from_secs(60),
69            heartbeat_timeout: Duration::from_secs(90),
70            marketdata_idle_unsubscribe: Duration::from_secs(300),
71            event_log: None,
72            retention: RetentionPolicy::default(),
73            prune_every: Duration::from_secs(3_600),
74        }
75    }
76}
77
78/// Commands the connector accepts from outside.
79#[derive(Debug)]
80enum ConnectorCmd {
81    EnsureMarketData {
82        conid: i64,
83        reply: oneshot::Sender<Result<(), String>>,
84    },
85}
86
87/// Cloneable handle the axum handlers use. Reads come straight off the
88/// shared ring map; writes go through the command channel.
89#[derive(Clone)]
90pub struct EventsHandle {
91    rings: Arc<RwLock<HashMap<String, TopicRing>>>,
92    status: Arc<RwLock<StatusState>>,
93    cmd_tx: mpsc::Sender<ConnectorCmd>,
94    started_at: Instant,
95    event_log: Option<Arc<EventLog>>,
96}
97
98impl EventsHandle {
99    /// Borrow the optional sqlite event log so `/events/{topic}/history`
100    /// route handlers can query historical events.
101    #[must_use]
102    pub fn event_log(&self) -> Option<&Arc<EventLog>> {
103        self.event_log.as_ref()
104    }
105}
106
107#[derive(Debug, Default)]
108struct StatusState {
109    connected: bool,
110    last_message_at: Option<String>,
111    reconnect_count: u64,
112    reset_epoch: u64,
113    topics_subscribed: BTreeSet<String>,
114}
115
116impl EventsHandle {
117    /// Read events from a topic. Returns [`None`] if the topic isn't
118    /// known (no events ever arrived for it). For `marketdata:<conid>`,
119    /// callers should call [`Self::ensure_market_data`] first to register
120    /// interest.
121    pub async fn read_topic(&self, topic: &str, since: u64, limit: usize) -> Option<ReadResult> {
122        let rings = self.rings.read().await;
123        rings.get(topic).map(|r| r.read_since(since, limit))
124    }
125
126    /// Snapshot of status. `uptime_seconds` is computed at call time
127    /// from the connector's start instant.
128    pub async fn status(&self) -> EventsStatus {
129        let s = self.status.read().await;
130        let buffer_sizes: BTreeMap<String, usize> = self
131            .rings
132            .read()
133            .await
134            .iter()
135            .map(|(k, v)| (k.clone(), v.len()))
136            .collect();
137        EventsStatus {
138            connected: s.connected,
139            last_message_at: s.last_message_at.clone(),
140            reconnect_count: s.reconnect_count,
141            uptime_seconds: self.started_at.elapsed().as_secs(),
142            reset_epoch: s.reset_epoch,
143            topics_subscribed: s.topics_subscribed.iter().cloned().collect(),
144            buffer_sizes,
145        }
146    }
147
148    /// Ensure the upstream WS is subscribed to market data for `conid`.
149    /// Idempotent; multiple callers can request the same conid and only
150    /// one upstream subscribe is sent. Returns `Err` if the connector
151    /// task is dead or if the subscribe send failed.
152    pub async fn ensure_market_data(&self, conid: i64) -> Result<(), String> {
153        let (tx, rx) = oneshot::channel();
154        self.cmd_tx
155            .send(ConnectorCmd::EnsureMarketData { conid, reply: tx })
156            .await
157            .map_err(|_| "connector task is not running".to_string())?;
158        rx.await
159            .map_err(|_| "connector task dropped reply channel".to_string())?
160    }
161}
162
163impl EventsHandle {
164    /// Build a handle that's not backed by a live connector — used by
165    /// integration tests to populate rings synthetically and verify the
166    /// HTTP surface without standing up a CPAPI WS mock.
167    ///
168    /// Returns the handle plus a `TestSink` that lets the test push
169    /// pre-decoded events into named topics.
170    #[doc(hidden)]
171    #[must_use]
172    pub fn for_test() -> (Self, TestSink) {
173        Self::for_test_with_log(None)
174    }
175
176    /// Same as [`Self::for_test`] but with an attached event log so
177    /// tests can exercise the `/history` route.
178    #[doc(hidden)]
179    #[must_use]
180    pub fn for_test_with_log(event_log: Option<Arc<EventLog>>) -> (Self, TestSink) {
181        let rings: Arc<RwLock<HashMap<String, TopicRing>>> =
182            Arc::new(RwLock::new(HashMap::new()));
183        let status = Arc::new(RwLock::new(StatusState {
184            connected: true,
185            reset_epoch: 1,
186            ..Default::default()
187        }));
188        let (cmd_tx, _cmd_rx) = mpsc::channel::<ConnectorCmd>(1);
189        let started_at = Instant::now();
190        let handle = Self {
191            rings: rings.clone(),
192            status: status.clone(),
193            cmd_tx,
194            started_at,
195            event_log: event_log.clone(),
196        };
197        let sink = TestSink { rings, status, event_log };
198        (handle, sink)
199    }
200}
201
202/// Sink-side counterpart of [`EventsHandle::for_test`]. Lets integration
203/// tests push events into a topic ring, mark the connection
204/// disconnected, and bump the reset epoch — without involving a real
205/// WebSocket. Public solely so `tests/events_routes.rs` can drive the
206/// routes; not part of the production API surface.
207#[doc(hidden)]
208#[derive(Clone)]
209pub struct TestSink {
210    rings: Arc<RwLock<HashMap<String, TopicRing>>>,
211    status: Arc<RwLock<StatusState>>,
212    event_log: Option<Arc<EventLog>>,
213}
214
215#[doc(hidden)]
216impl TestSink {
217    /// Push `payload` into the named topic's ring. Creates the ring on
218    /// first call. Reports the assigned cursor. If a log is attached,
219    /// also appends to sqlite.
220    pub async fn push(&self, topic: &str, payload: serde_json::Value) -> u64 {
221        let cap = match topic {
222            "orders" => 1_000,
223            "pnl" => 5_000,
224            t if t.starts_with("marketdata:") => 2_000,
225            _ => 256,
226        };
227        let epoch = self.status.read().await.reset_epoch;
228        let received_at = now_iso();
229        let mut rings = self.rings.write().await;
230        let ring = rings
231            .entry(topic.to_string())
232            .or_insert_with(|| TopicRing::new(topic, cap, epoch));
233        let cursor = ring.push(payload.clone(), received_at.clone());
234        drop(rings);
235        if let Some(log) = &self.event_log {
236            let _ = log.append(&ObservedEvent {
237                cursor,
238                topic: topic.to_string(),
239                received_at: received_at.clone(),
240                reset_epoch: epoch,
241                payload,
242            });
243        }
244        self.status
245            .write()
246            .await
247            .topics_subscribed
248            .insert(topic.to_string());
249        cursor
250    }
251
252    /// Force a specific reset_epoch — used to test cursor-expired flows.
253    pub async fn set_reset_epoch(&self, epoch: u64) {
254        self.status.write().await.reset_epoch = epoch;
255    }
256
257    /// Force the `connected` flag — useful for `_status` shape testing.
258    pub async fn set_connected(&self, connected: bool) {
259        self.status.write().await.connected = connected;
260    }
261}
262
263/// Spawn the connector task and return a [`EventsHandle`] for the
264/// axum side. The task owns the [`bezant::Client`] reference (cheap, it's
265/// already `Arc`-wrapped internally) and runs until the binary exits.
266///
267/// If `cfg.event_log` is `Some`, also spawns a periodic prune task that
268/// trims the sqlite store to retention policy every `cfg.prune_every`.
269pub fn spawn_connector(client: bezant::Client, cfg: ConnectorCfg) -> EventsHandle {
270    let rings: Arc<RwLock<HashMap<String, TopicRing>>> = Arc::new(RwLock::new(HashMap::new()));
271    let status = Arc::new(RwLock::new(StatusState::default()));
272    let (cmd_tx, cmd_rx) = mpsc::channel::<ConnectorCmd>(64);
273    let started_at = Instant::now();
274    let event_log = cfg.event_log.clone();
275
276    if let Some(log) = event_log.clone() {
277        let prune_every = cfg.prune_every;
278        let policy = cfg.retention.clone();
279        tokio::spawn(async move {
280            loop {
281                sleep(prune_every).await;
282                let log = log.clone();
283                let policy = policy.clone();
284                let dropped = tokio::task::spawn_blocking(move || log.prune(&policy)).await;
285                match dropped {
286                    Ok(Ok(n)) if n > 0 => info!(rows = n, "events sqlite: prune dropped rows"),
287                    Ok(Err(e)) => warn!(error = %e, "events sqlite: prune failed"),
288                    _ => {}
289                }
290            }
291        });
292    }
293
294    let actor = ConnectorActor {
295        client,
296        cfg,
297        rings: rings.clone(),
298        status: status.clone(),
299        cmd_rx,
300        active_marketdata_subs: BTreeSet::new(),
301    };
302
303    tokio::spawn(actor.run());
304
305    EventsHandle {
306        rings,
307        status,
308        cmd_tx,
309        started_at,
310        event_log,
311    }
312}
313
314struct ConnectorActor {
315    client: bezant::Client,
316    cfg: ConnectorCfg,
317    rings: Arc<RwLock<HashMap<String, TopicRing>>>,
318    status: Arc<RwLock<StatusState>>,
319    cmd_rx: mpsc::Receiver<ConnectorCmd>,
320    active_marketdata_subs: BTreeSet<i64>,
321}
322
323impl ConnectorActor {
324    async fn run(mut self) {
325        info!("events connector starting");
326        let mut backoff = self.cfg.backoff_min;
327        loop {
328            // Bump epoch + emit gap markers BEFORE attempting connect, so
329            // any events arriving during this run are tagged with the
330            // correct epoch from the very first frame.
331            self.bump_epoch_with_gap(GapReason::ReconnectedAfterDisconnect)
332                .await;
333
334            match self.connect_and_run().await {
335                Ok(()) => {
336                    info!("events connector: ws closed cleanly, reconnecting");
337                    backoff = self.cfg.backoff_min;
338                }
339                Err(e) => {
340                    warn!(error = %e, "events connector: ws failed, will retry");
341                    self.set_disconnected().await;
342                }
343            }
344
345            sleep(backoff).await;
346            backoff = (backoff * 2).min(self.cfg.backoff_max);
347        }
348    }
349
350    /// One full connect cycle. Returns `Ok(())` on clean close, `Err`
351    /// otherwise. Caller handles backoff + reconnect.
352    async fn connect_and_run(&mut self) -> Result<(), bezant::Error> {
353        let mut ws = WsClient::connect(&self.client).await?;
354
355        // CPAPI WS quirk: subscribes sent *before* the server's initial
356        // `system+success` "ready" frame are silently discarded. Pump
357        // frames until that frame arrives (or 5s timeout) so our
358        // `sor`/`spl` subscribes actually take effect — without this,
359        // we get heartbeats forever and no order/PnL events.
360        let ready = timeout(Duration::from_secs(5), pump_until_ready(&mut ws)).await;
361        match ready {
362            Ok(Ok(())) => debug!("events connector: server ready signal received"),
363            Ok(Err(e)) => return Err(e),
364            Err(_) => warn!(
365                "events connector: didn't see server-ready frame within 5s; \
366                 subscribing anyway (CPAPI may silently drop these)"
367            ),
368        }
369
370        ws.subscribe_orders().await?;
371        ws.subscribe_pnl().await?;
372        // Re-establish any market data subs that were active before the
373        // disconnect.
374        for conid in self.active_marketdata_subs.clone() {
375            if let Err(e) = ws
376                .subscribe_market_data(conid, &MarketDataFields::default_l1())
377                .await
378            {
379                warn!(conid, error = %e, "events connector: re-subscribe market data failed");
380            }
381        }
382
383        self.set_connected().await;
384        info!("events connector: connected, orders + pnl subscribed");
385
386        let result = self.dispatch_loop(&mut ws).await;
387        self.set_disconnected().await;
388        result
389    }
390
391    /// The hot loop: read frames, dispatch into rings, handle commands,
392    /// detect heartbeat timeout. Returns when the socket closes or any
393    /// fatal error occurs.
394    async fn dispatch_loop(&mut self, ws: &mut WsClient) -> Result<(), bezant::Error> {
395        loop {
396            tokio::select! {
397                // Frame from the upstream WS — dispatch or detect close.
398                msg = timeout(self.cfg.heartbeat_timeout, ws.next_message()) => {
399                    match msg {
400                        Err(_elapsed) => {
401                            warn!(
402                                timeout_secs = self.cfg.heartbeat_timeout.as_secs(),
403                                "events connector: heartbeat timeout, killing socket"
404                            );
405                            return Err(bezant::Error::WsProtocol(
406                                "heartbeat timeout".into(),
407                            ));
408                        }
409                        Ok(Ok(None)) => {
410                            info!("events connector: upstream ws closed");
411                            return Ok(());
412                        }
413                        Ok(Ok(Some(frame))) => {
414                            self.handle_frame(frame).await;
415                        }
416                        Ok(Err(e)) => {
417                            return Err(e);
418                        }
419                    }
420                }
421                // Command from the REST side (lazy market data subs).
422                Some(cmd) = self.cmd_rx.recv() => {
423                    self.handle_command(ws, cmd).await;
424                }
425            }
426        }
427    }
428
429    /// Decode + push a single WS frame into the appropriate ring.
430    async fn handle_frame(&mut self, frame: WsMessage) {
431        let now = now_iso();
432        // Diagnostic: trace topic + first 200 chars of payload for every
433        // frame so we can see CPAPI's actual topic strings for unrecognised
434        // sor/spl variants. Cheap; gated to debug level so prod is silent.
435        if let Some(v) = frame.as_value() {
436            let snippet = serde_json::to_string(v).unwrap_or_default();
437            let topic_str = v
438                .get("topic")
439                .and_then(|t| t.as_str())
440                .unwrap_or("<no-topic>");
441            debug!(
442                variant = frame.topic(),
443                topic = topic_str,
444                payload = %truncate(&snippet, 200),
445                "events connector: frame"
446            );
447        }
448        match frame {
449            WsMessage::Heartbeat | WsMessage::System(_) | WsMessage::Other(_) => {
450                // Not interesting for downstream consumers; just record
451                // last-message time so the connector status reflects life.
452                self.touch_last_message(now).await;
453            }
454            WsMessage::Order(value) => {
455                self.push_to_topic("orders", value, now).await;
456            }
457            WsMessage::Pnl(value) => {
458                self.push_to_topic("pnl", value, now).await;
459            }
460            WsMessage::MarketData { conid, payload } => {
461                let topic = format!("marketdata:{conid}");
462                self.push_to_topic(&topic, payload, now).await;
463            }
464            WsMessage::Malformed { text, error } => {
465                warn!(error = %error, sample = %truncate(&text, 200), "events connector: malformed frame");
466                self.touch_last_message(now).await;
467            }
468            // `WsMessage` is `#[non_exhaustive]` — future variants
469            // surface as unstructured "other" data so we don't silently
470            // drop them.
471            other => {
472                debug!(topic = other.topic(), "events connector: unhandled ws message variant");
473                self.touch_last_message(now).await;
474            }
475        }
476    }
477
478    async fn handle_command(&mut self, ws: &mut WsClient, cmd: ConnectorCmd) {
479        match cmd {
480            ConnectorCmd::EnsureMarketData { conid, reply } => {
481                if self.active_marketdata_subs.insert(conid) {
482                    debug!(conid, "events connector: subscribing market data");
483                    let result = ws
484                        .subscribe_market_data(conid, &MarketDataFields::default_l1())
485                        .await;
486                    match result {
487                        Ok(_) => {
488                            self.add_topic_to_status(format!("marketdata:{conid}")).await;
489                            let _ = reply.send(Ok(()));
490                        }
491                        Err(e) => {
492                            // Roll back the optimistic insert — next caller
493                            // can retry.
494                            self.active_marketdata_subs.remove(&conid);
495                            let _ = reply.send(Err(format!("subscribe failed: {e}")));
496                        }
497                    }
498                } else {
499                    // Already subscribed.
500                    let _ = reply.send(Ok(()));
501                }
502            }
503        }
504    }
505
506    async fn push_to_topic(&self, topic: &str, payload: serde_json::Value, received_at: String) {
507        let cap = match topic {
508            "orders" => self.cfg.orders_capacity,
509            "pnl" => self.cfg.pnl_capacity,
510            t if t.starts_with("marketdata:") => self.cfg.marketdata_capacity,
511            _ => 1_000,
512        };
513        let epoch = self.status.read().await.reset_epoch;
514
515        let mut rings = self.rings.write().await;
516        let ring = rings
517            .entry(topic.to_string())
518            .or_insert_with(|| TopicRing::new(topic, cap, epoch));
519        let cursor = ring.push(payload.clone(), received_at.clone());
520        drop(rings);
521
522        // Mirror to sqlite if configured. Best-effort — log on failure;
523        // the in-memory ring remains the canonical fast-path read.
524        if let Some(log) = &self.cfg.event_log {
525            let log = log.clone();
526            let evt = ObservedEvent {
527                cursor,
528                topic: topic.to_string(),
529                received_at: received_at.clone(),
530                reset_epoch: epoch,
531                payload,
532            };
533            // sqlite writes are blocking; offload to a worker so we
534            // don't block the connector loop on I/O.
535            tokio::task::spawn_blocking(move || log.append(&evt));
536        }
537
538        // Update last_message_at + ensure topic shows in status.
539        let mut s = self.status.write().await;
540        s.last_message_at = Some(received_at);
541        s.topics_subscribed.insert(topic.to_string());
542    }
543
544    async fn touch_last_message(&self, now: String) {
545        self.status.write().await.last_message_at = Some(now);
546    }
547
548    async fn add_topic_to_status(&self, topic: String) {
549        self.status.write().await.topics_subscribed.insert(topic);
550    }
551
552    async fn set_connected(&self) {
553        let mut s = self.status.write().await;
554        s.connected = true;
555        s.reconnect_count = s.reconnect_count.saturating_add(1);
556        s.topics_subscribed.insert("orders".into());
557        s.topics_subscribed.insert("pnl".into());
558    }
559
560    async fn set_disconnected(&self) {
561        self.status.write().await.connected = false;
562    }
563
564    /// Increment `reset_epoch` and inject a synthetic gap event into
565    /// every existing ring + a new "gap" topic ring so consumers
566    /// polling any topic see the reset.
567    async fn bump_epoch_with_gap(&self, reason: GapReason) {
568        let mut s = self.status.write().await;
569        // First boot: no previous events, nothing to gap-mark.
570        let is_first_boot = s.reset_epoch == 0 && s.last_message_at.is_none();
571        s.reset_epoch = s.reset_epoch.saturating_add(1);
572        let new_epoch = s.reset_epoch;
573        drop(s);
574
575        if is_first_boot {
576            return;
577        }
578
579        // Inject a gap event into every active topic so cursor-advancing
580        // consumers see it on their next poll.
581        let now = now_iso();
582        let payload = json!({
583            "reason": match reason {
584                GapReason::ReconnectedAfterDisconnect => "reconnected_after_disconnect",
585                GapReason::ProcessRestart => "process_restart",
586            },
587            "new_reset_epoch": new_epoch,
588        });
589
590        let mut rings = self.rings.write().await;
591        let topics: Vec<String> = rings.keys().cloned().collect();
592        for topic in topics {
593            // Each ring keeps its own reset_epoch matching the time of
594            // its first push; we DON'T retro-bump that. The gap event's
595            // own `reset_epoch` reflects the new value via a fresh ring
596            // entry if needed.
597            if let Some(r) = rings.get_mut(&topic) {
598                r.push(payload.clone(), now.clone());
599            }
600        }
601        // Always ensure a 'gap' topic exists so consumers that don't
602        // poll any other topic still learn about resets.
603        rings
604            .entry("gap".to_string())
605            .or_insert_with(|| TopicRing::new("gap", 256, new_epoch))
606            .push(payload, now);
607    }
608}
609
610/// Pump WS frames until we see a `system` topic frame containing
611/// `success` (CPAPI's username-ack — see the IBKR campus WS lesson).
612/// That frame indicates the server has finished session bootstrap and
613/// will accept and broadcast on `sor`/`spl` subscribes. Frames seen
614/// before the ready signal are discarded — they're just connection
615/// metadata (`act`, `sts`) we don't need to surface to consumers.
616async fn pump_until_ready(ws: &mut WsClient) -> Result<(), bezant::Error> {
617    while let Some(msg) = ws.next_message().await? {
618        if let WsMessage::System(value) = &msg {
619            if value.get("success").is_some() {
620                return Ok(());
621            }
622        }
623        // `act`, `sts`, anything else — ignore. We're only gating on
624        // the `success` ack which CPAPI sends once per session.
625    }
626    // Stream closed before we saw ready.
627    Err(bezant::Error::WsProtocol(
628        "ws closed before server-ready frame".into(),
629    ))
630}
631
632fn now_iso() -> String {
633    use std::time::SystemTime;
634    let now: chrono::DateTime<chrono::Utc> = SystemTime::now().into();
635    now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
636}
637
638fn truncate(s: &str, max: usize) -> String {
639    if s.len() <= max {
640        s.to_string()
641    } else {
642        format!("{}…", &s[..max])
643    }
644}
645
646#[cfg(test)]
647mod tests {
648    use super::*;
649    use serde_json::json;
650
651    #[tokio::test]
652    async fn handle_frame_dispatches_to_correct_topic() {
653        // Build an actor with a closed cmd channel — we won't drive run().
654        let (_tx, cmd_rx) = mpsc::channel(1);
655        let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
656        let mut actor = ConnectorActor {
657            client,
658            cfg: ConnectorCfg::default(),
659            rings: Arc::new(RwLock::new(HashMap::new())),
660            status: Arc::new(RwLock::new(StatusState::default())),
661            cmd_rx,
662            active_marketdata_subs: BTreeSet::new(),
663        };
664
665        actor
666            .handle_frame(WsMessage::Order(json!({"orderId": 1})))
667            .await;
668        actor.handle_frame(WsMessage::Pnl(json!({"upnl": 1.0}))).await;
669        actor
670            .handle_frame(WsMessage::MarketData {
671                conid: 265598,
672                payload: json!({"31": "150.25"}),
673            })
674            .await;
675
676        let rings = actor.rings.read().await;
677        assert_eq!(rings.get("orders").unwrap().len(), 1);
678        assert_eq!(rings.get("pnl").unwrap().len(), 1);
679        assert_eq!(rings.get("marketdata:265598").unwrap().len(), 1);
680    }
681
682    #[tokio::test]
683    async fn heartbeat_and_system_frames_dont_create_topics() {
684        let (_tx, cmd_rx) = mpsc::channel(1);
685        let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
686        let mut actor = ConnectorActor {
687            client,
688            cfg: ConnectorCfg::default(),
689            rings: Arc::new(RwLock::new(HashMap::new())),
690            status: Arc::new(RwLock::new(StatusState::default())),
691            cmd_rx,
692            active_marketdata_subs: BTreeSet::new(),
693        };
694
695        actor.handle_frame(WsMessage::Heartbeat).await;
696        actor.handle_frame(WsMessage::System(json!({"msg": "ready"}))).await;
697
698        let rings = actor.rings.read().await;
699        assert!(rings.is_empty());
700
701        // But last_message_at IS updated.
702        let s = actor.status.read().await;
703        assert!(s.last_message_at.is_some());
704    }
705
706    #[tokio::test]
707    async fn bump_epoch_skips_gap_on_first_boot() {
708        let (_tx, cmd_rx) = mpsc::channel(1);
709        let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
710        let actor = ConnectorActor {
711            client,
712            cfg: ConnectorCfg::default(),
713            rings: Arc::new(RwLock::new(HashMap::new())),
714            status: Arc::new(RwLock::new(StatusState::default())),
715            cmd_rx,
716            active_marketdata_subs: BTreeSet::new(),
717        };
718
719        actor.bump_epoch_with_gap(GapReason::ReconnectedAfterDisconnect).await;
720
721        // No gap event injected on first boot.
722        let rings = actor.rings.read().await;
723        assert!(rings.is_empty());
724        // But reset_epoch did bump.
725        assert_eq!(actor.status.read().await.reset_epoch, 1);
726    }
727
728    #[tokio::test]
729    async fn bump_epoch_injects_gap_into_existing_topics() {
730        let (_tx, cmd_rx) = mpsc::channel(1);
731        let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
732        let mut actor = ConnectorActor {
733            client,
734            cfg: ConnectorCfg::default(),
735            rings: Arc::new(RwLock::new(HashMap::new())),
736            status: Arc::new(RwLock::new(StatusState::default())),
737            cmd_rx,
738            active_marketdata_subs: BTreeSet::new(),
739        };
740
741        // Seed an order event so the actor knows about the orders topic.
742        actor.handle_frame(WsMessage::Order(json!({"id": 1}))).await;
743        // Now bump epoch as if we'd reconnected.
744        actor.bump_epoch_with_gap(GapReason::ReconnectedAfterDisconnect).await;
745
746        let rings = actor.rings.read().await;
747        let orders_ring = rings.get("orders").unwrap();
748        // Original event + gap marker = 2.
749        assert_eq!(orders_ring.len(), 2);
750        // 'gap' topic also got a marker.
751        assert_eq!(rings.get("gap").unwrap().len(), 1);
752    }
753
754    #[tokio::test]
755    async fn ensure_market_data_dedups_repeat_calls() {
756        // We can't easily test the WS sub call without a mock socket,
757        // but we can verify the dedup logic at the actor level by
758        // hand-mutating the active_marketdata_subs set.
759        let (_tx, cmd_rx) = mpsc::channel(1);
760        let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
761        let mut actor = ConnectorActor {
762            client,
763            cfg: ConnectorCfg::default(),
764            rings: Arc::new(RwLock::new(HashMap::new())),
765            status: Arc::new(RwLock::new(StatusState::default())),
766            cmd_rx,
767            active_marketdata_subs: BTreeSet::new(),
768        };
769
770        assert!(actor.active_marketdata_subs.insert(265_598));
771        // Re-insert returns false (already present).
772        assert!(!actor.active_marketdata_subs.insert(265_598));
773        assert_eq!(actor.active_marketdata_subs.len(), 1);
774    }
775}