1use std::collections::{BTreeMap, BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bezant::{MarketDataFields, WsClient, WsMessage};
22use serde_json::json;
23use tokio::sync::{mpsc, oneshot, RwLock};
24use tokio::time::{sleep, timeout};
25use tracing::{debug, info, warn};
26
27use super::persistence::{EventLog, RetentionPolicy};
28use super::ring::{ReadResult, TopicRing};
29use super::{EventsStatus, GapReason, ObservedEvent};
30
31#[derive(Clone, Debug)]
33pub struct ConnectorCfg {
34 pub orders_capacity: usize,
36 pub pnl_capacity: usize,
38 pub marketdata_capacity: usize,
40 pub backoff_min: Duration,
42 pub backoff_max: Duration,
44 pub heartbeat_timeout: Duration,
47 pub marketdata_idle_unsubscribe: Duration,
51 pub event_log: Option<Arc<EventLog>>,
55 pub retention: RetentionPolicy,
57 pub prune_every: Duration,
59}
60
61impl Default for ConnectorCfg {
62 fn default() -> Self {
63 Self {
64 orders_capacity: 1_000,
65 pnl_capacity: 5_000,
66 marketdata_capacity: 2_000,
67 backoff_min: Duration::from_secs(1),
68 backoff_max: Duration::from_secs(60),
69 heartbeat_timeout: Duration::from_secs(90),
70 marketdata_idle_unsubscribe: Duration::from_secs(300),
71 event_log: None,
72 retention: RetentionPolicy::default(),
73 prune_every: Duration::from_secs(3_600),
74 }
75 }
76}
77
78#[derive(Debug)]
80enum ConnectorCmd {
81 EnsureMarketData {
82 conid: i64,
83 reply: oneshot::Sender<Result<(), String>>,
84 },
85}
86
87#[derive(Clone)]
90pub struct EventsHandle {
91 rings: Arc<RwLock<HashMap<String, TopicRing>>>,
92 status: Arc<RwLock<StatusState>>,
93 cmd_tx: mpsc::Sender<ConnectorCmd>,
94 started_at: Instant,
95 event_log: Option<Arc<EventLog>>,
96}
97
98impl EventsHandle {
99 #[must_use]
102 pub fn event_log(&self) -> Option<&Arc<EventLog>> {
103 self.event_log.as_ref()
104 }
105}
106
107#[derive(Debug, Default)]
108struct StatusState {
109 connected: bool,
110 last_message_at: Option<String>,
111 reconnect_count: u64,
112 reset_epoch: u64,
113 topics_subscribed: BTreeSet<String>,
114}
115
116impl EventsHandle {
117 pub async fn read_topic(&self, topic: &str, since: u64, limit: usize) -> Option<ReadResult> {
122 let rings = self.rings.read().await;
123 rings.get(topic).map(|r| r.read_since(since, limit))
124 }
125
126 pub async fn status(&self) -> EventsStatus {
129 let s = self.status.read().await;
130 let buffer_sizes: BTreeMap<String, usize> = self
131 .rings
132 .read()
133 .await
134 .iter()
135 .map(|(k, v)| (k.clone(), v.len()))
136 .collect();
137 EventsStatus {
138 connected: s.connected,
139 last_message_at: s.last_message_at.clone(),
140 reconnect_count: s.reconnect_count,
141 uptime_seconds: self.started_at.elapsed().as_secs(),
142 reset_epoch: s.reset_epoch,
143 topics_subscribed: s.topics_subscribed.iter().cloned().collect(),
144 buffer_sizes,
145 }
146 }
147
148 pub async fn ensure_market_data(&self, conid: i64) -> Result<(), String> {
153 let (tx, rx) = oneshot::channel();
154 self.cmd_tx
155 .send(ConnectorCmd::EnsureMarketData { conid, reply: tx })
156 .await
157 .map_err(|_| "connector task is not running".to_string())?;
158 rx.await
159 .map_err(|_| "connector task dropped reply channel".to_string())?
160 }
161}
162
163impl EventsHandle {
164 #[doc(hidden)]
171 #[must_use]
172 pub fn for_test() -> (Self, TestSink) {
173 Self::for_test_with_log(None)
174 }
175
176 #[doc(hidden)]
179 #[must_use]
180 pub fn for_test_with_log(event_log: Option<Arc<EventLog>>) -> (Self, TestSink) {
181 let rings: Arc<RwLock<HashMap<String, TopicRing>>> =
182 Arc::new(RwLock::new(HashMap::new()));
183 let status = Arc::new(RwLock::new(StatusState {
184 connected: true,
185 reset_epoch: 1,
186 ..Default::default()
187 }));
188 let (cmd_tx, _cmd_rx) = mpsc::channel::<ConnectorCmd>(1);
189 let started_at = Instant::now();
190 let handle = Self {
191 rings: rings.clone(),
192 status: status.clone(),
193 cmd_tx,
194 started_at,
195 event_log: event_log.clone(),
196 };
197 let sink = TestSink { rings, status, event_log };
198 (handle, sink)
199 }
200}
201
202#[doc(hidden)]
208#[derive(Clone)]
209pub struct TestSink {
210 rings: Arc<RwLock<HashMap<String, TopicRing>>>,
211 status: Arc<RwLock<StatusState>>,
212 event_log: Option<Arc<EventLog>>,
213}
214
215#[doc(hidden)]
216impl TestSink {
217 pub async fn push(&self, topic: &str, payload: serde_json::Value) -> u64 {
221 let cap = match topic {
222 "orders" => 1_000,
223 "pnl" => 5_000,
224 t if t.starts_with("marketdata:") => 2_000,
225 _ => 256,
226 };
227 let epoch = self.status.read().await.reset_epoch;
228 let received_at = now_iso();
229 let mut rings = self.rings.write().await;
230 let ring = rings
231 .entry(topic.to_string())
232 .or_insert_with(|| TopicRing::new(topic, cap, epoch));
233 let cursor = ring.push(payload.clone(), received_at.clone());
234 drop(rings);
235 if let Some(log) = &self.event_log {
236 let _ = log.append(&ObservedEvent {
237 cursor,
238 topic: topic.to_string(),
239 received_at: received_at.clone(),
240 reset_epoch: epoch,
241 payload,
242 });
243 }
244 self.status
245 .write()
246 .await
247 .topics_subscribed
248 .insert(topic.to_string());
249 cursor
250 }
251
252 pub async fn set_reset_epoch(&self, epoch: u64) {
254 self.status.write().await.reset_epoch = epoch;
255 }
256
257 pub async fn set_connected(&self, connected: bool) {
259 self.status.write().await.connected = connected;
260 }
261}
262
263pub fn spawn_connector(client: bezant::Client, cfg: ConnectorCfg) -> EventsHandle {
270 let rings: Arc<RwLock<HashMap<String, TopicRing>>> = Arc::new(RwLock::new(HashMap::new()));
271 let status = Arc::new(RwLock::new(StatusState::default()));
272 let (cmd_tx, cmd_rx) = mpsc::channel::<ConnectorCmd>(64);
273 let started_at = Instant::now();
274 let event_log = cfg.event_log.clone();
275
276 if let Some(log) = event_log.clone() {
277 let prune_every = cfg.prune_every;
278 let policy = cfg.retention.clone();
279 tokio::spawn(async move {
280 loop {
281 sleep(prune_every).await;
282 let log = log.clone();
283 let policy = policy.clone();
284 let dropped = tokio::task::spawn_blocking(move || log.prune(&policy)).await;
285 match dropped {
286 Ok(Ok(n)) if n > 0 => info!(rows = n, "events sqlite: prune dropped rows"),
287 Ok(Err(e)) => warn!(error = %e, "events sqlite: prune failed"),
288 _ => {}
289 }
290 }
291 });
292 }
293
294 let actor = ConnectorActor {
295 client,
296 cfg,
297 rings: rings.clone(),
298 status: status.clone(),
299 cmd_rx,
300 active_marketdata_subs: BTreeSet::new(),
301 };
302
303 tokio::spawn(actor.run());
304
305 EventsHandle {
306 rings,
307 status,
308 cmd_tx,
309 started_at,
310 event_log,
311 }
312}
313
314struct ConnectorActor {
315 client: bezant::Client,
316 cfg: ConnectorCfg,
317 rings: Arc<RwLock<HashMap<String, TopicRing>>>,
318 status: Arc<RwLock<StatusState>>,
319 cmd_rx: mpsc::Receiver<ConnectorCmd>,
320 active_marketdata_subs: BTreeSet<i64>,
321}
322
323impl ConnectorActor {
324 async fn run(mut self) {
325 info!("events connector starting");
326 let mut backoff = self.cfg.backoff_min;
327 loop {
328 self.bump_epoch_with_gap(GapReason::ReconnectedAfterDisconnect)
332 .await;
333
334 match self.connect_and_run().await {
335 Ok(()) => {
336 info!("events connector: ws closed cleanly, reconnecting");
337 backoff = self.cfg.backoff_min;
338 }
339 Err(e) => {
340 warn!(error = %e, "events connector: ws failed, will retry");
341 self.set_disconnected().await;
342 }
343 }
344
345 sleep(backoff).await;
346 backoff = (backoff * 2).min(self.cfg.backoff_max);
347 }
348 }
349
350 async fn connect_and_run(&mut self) -> Result<(), bezant::Error> {
353 let mut ws = WsClient::connect(&self.client).await?;
354
355 let ready = timeout(Duration::from_secs(5), pump_until_ready(&mut ws)).await;
361 match ready {
362 Ok(Ok(())) => debug!("events connector: server ready signal received"),
363 Ok(Err(e)) => return Err(e),
364 Err(_) => warn!(
365 "events connector: didn't see server-ready frame within 5s; \
366 subscribing anyway (CPAPI may silently drop these)"
367 ),
368 }
369
370 ws.subscribe_orders().await?;
371 ws.subscribe_pnl().await?;
372 for conid in self.active_marketdata_subs.clone() {
375 if let Err(e) = ws
376 .subscribe_market_data(conid, &MarketDataFields::default_l1())
377 .await
378 {
379 warn!(conid, error = %e, "events connector: re-subscribe market data failed");
380 }
381 }
382
383 self.set_connected().await;
384 info!("events connector: connected, orders + pnl subscribed");
385
386 let result = self.dispatch_loop(&mut ws).await;
387 self.set_disconnected().await;
388 result
389 }
390
391 async fn dispatch_loop(&mut self, ws: &mut WsClient) -> Result<(), bezant::Error> {
395 loop {
396 tokio::select! {
397 msg = timeout(self.cfg.heartbeat_timeout, ws.next_message()) => {
399 match msg {
400 Err(_elapsed) => {
401 warn!(
402 timeout_secs = self.cfg.heartbeat_timeout.as_secs(),
403 "events connector: heartbeat timeout, killing socket"
404 );
405 return Err(bezant::Error::WsProtocol(
406 "heartbeat timeout".into(),
407 ));
408 }
409 Ok(Ok(None)) => {
410 info!("events connector: upstream ws closed");
411 return Ok(());
412 }
413 Ok(Ok(Some(frame))) => {
414 self.handle_frame(frame).await;
415 }
416 Ok(Err(e)) => {
417 return Err(e);
418 }
419 }
420 }
421 Some(cmd) = self.cmd_rx.recv() => {
423 self.handle_command(ws, cmd).await;
424 }
425 }
426 }
427 }
428
429 async fn handle_frame(&mut self, frame: WsMessage) {
431 let now = now_iso();
432 if let Some(v) = frame.as_value() {
436 let snippet = serde_json::to_string(v).unwrap_or_default();
437 let topic_str = v
438 .get("topic")
439 .and_then(|t| t.as_str())
440 .unwrap_or("<no-topic>");
441 debug!(
442 variant = frame.topic(),
443 topic = topic_str,
444 payload = %truncate(&snippet, 200),
445 "events connector: frame"
446 );
447 }
448 match frame {
449 WsMessage::Heartbeat | WsMessage::System(_) | WsMessage::Other(_) => {
450 self.touch_last_message(now).await;
453 }
454 WsMessage::Order(value) => {
455 self.push_to_topic("orders", value, now).await;
456 }
457 WsMessage::Pnl(value) => {
458 self.push_to_topic("pnl", value, now).await;
459 }
460 WsMessage::MarketData { conid, payload } => {
461 let topic = format!("marketdata:{conid}");
462 self.push_to_topic(&topic, payload, now).await;
463 }
464 WsMessage::Malformed { text, error } => {
465 warn!(error = %error, sample = %truncate(&text, 200), "events connector: malformed frame");
466 self.touch_last_message(now).await;
467 }
468 other => {
472 debug!(topic = other.topic(), "events connector: unhandled ws message variant");
473 self.touch_last_message(now).await;
474 }
475 }
476 }
477
478 async fn handle_command(&mut self, ws: &mut WsClient, cmd: ConnectorCmd) {
479 match cmd {
480 ConnectorCmd::EnsureMarketData { conid, reply } => {
481 if self.active_marketdata_subs.insert(conid) {
482 debug!(conid, "events connector: subscribing market data");
483 let result = ws
484 .subscribe_market_data(conid, &MarketDataFields::default_l1())
485 .await;
486 match result {
487 Ok(_) => {
488 self.add_topic_to_status(format!("marketdata:{conid}")).await;
489 let _ = reply.send(Ok(()));
490 }
491 Err(e) => {
492 self.active_marketdata_subs.remove(&conid);
495 let _ = reply.send(Err(format!("subscribe failed: {e}")));
496 }
497 }
498 } else {
499 let _ = reply.send(Ok(()));
501 }
502 }
503 }
504 }
505
506 async fn push_to_topic(&self, topic: &str, payload: serde_json::Value, received_at: String) {
507 let cap = match topic {
508 "orders" => self.cfg.orders_capacity,
509 "pnl" => self.cfg.pnl_capacity,
510 t if t.starts_with("marketdata:") => self.cfg.marketdata_capacity,
511 _ => 1_000,
512 };
513 let epoch = self.status.read().await.reset_epoch;
514
515 let mut rings = self.rings.write().await;
516 let ring = rings
517 .entry(topic.to_string())
518 .or_insert_with(|| TopicRing::new(topic, cap, epoch));
519 let cursor = ring.push(payload.clone(), received_at.clone());
520 drop(rings);
521
522 if let Some(log) = &self.cfg.event_log {
525 let log = log.clone();
526 let evt = ObservedEvent {
527 cursor,
528 topic: topic.to_string(),
529 received_at: received_at.clone(),
530 reset_epoch: epoch,
531 payload,
532 };
533 tokio::task::spawn_blocking(move || log.append(&evt));
536 }
537
538 let mut s = self.status.write().await;
540 s.last_message_at = Some(received_at);
541 s.topics_subscribed.insert(topic.to_string());
542 }
543
544 async fn touch_last_message(&self, now: String) {
545 self.status.write().await.last_message_at = Some(now);
546 }
547
548 async fn add_topic_to_status(&self, topic: String) {
549 self.status.write().await.topics_subscribed.insert(topic);
550 }
551
552 async fn set_connected(&self) {
553 let mut s = self.status.write().await;
554 s.connected = true;
555 s.reconnect_count = s.reconnect_count.saturating_add(1);
556 s.topics_subscribed.insert("orders".into());
557 s.topics_subscribed.insert("pnl".into());
558 }
559
560 async fn set_disconnected(&self) {
561 self.status.write().await.connected = false;
562 }
563
564 async fn bump_epoch_with_gap(&self, reason: GapReason) {
568 let mut s = self.status.write().await;
569 let is_first_boot = s.reset_epoch == 0 && s.last_message_at.is_none();
571 s.reset_epoch = s.reset_epoch.saturating_add(1);
572 let new_epoch = s.reset_epoch;
573 drop(s);
574
575 if is_first_boot {
576 return;
577 }
578
579 let now = now_iso();
582 let payload = json!({
583 "reason": match reason {
584 GapReason::ReconnectedAfterDisconnect => "reconnected_after_disconnect",
585 GapReason::ProcessRestart => "process_restart",
586 },
587 "new_reset_epoch": new_epoch,
588 });
589
590 let mut rings = self.rings.write().await;
591 let topics: Vec<String> = rings.keys().cloned().collect();
592 for topic in topics {
593 if let Some(r) = rings.get_mut(&topic) {
598 r.push(payload.clone(), now.clone());
599 }
600 }
601 rings
604 .entry("gap".to_string())
605 .or_insert_with(|| TopicRing::new("gap", 256, new_epoch))
606 .push(payload, now);
607 }
608}
609
610async fn pump_until_ready(ws: &mut WsClient) -> Result<(), bezant::Error> {
617 while let Some(msg) = ws.next_message().await? {
618 if let WsMessage::System(value) = &msg {
619 if value.get("success").is_some() {
620 return Ok(());
621 }
622 }
623 }
626 Err(bezant::Error::WsProtocol(
628 "ws closed before server-ready frame".into(),
629 ))
630}
631
632fn now_iso() -> String {
633 use std::time::SystemTime;
634 let now: chrono::DateTime<chrono::Utc> = SystemTime::now().into();
635 now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
636}
637
638fn truncate(s: &str, max: usize) -> String {
639 if s.len() <= max {
640 s.to_string()
641 } else {
642 format!("{}…", &s[..max])
643 }
644}
645
646#[cfg(test)]
647mod tests {
648 use super::*;
649 use serde_json::json;
650
651 #[tokio::test]
652 async fn handle_frame_dispatches_to_correct_topic() {
653 let (_tx, cmd_rx) = mpsc::channel(1);
655 let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
656 let mut actor = ConnectorActor {
657 client,
658 cfg: ConnectorCfg::default(),
659 rings: Arc::new(RwLock::new(HashMap::new())),
660 status: Arc::new(RwLock::new(StatusState::default())),
661 cmd_rx,
662 active_marketdata_subs: BTreeSet::new(),
663 };
664
665 actor
666 .handle_frame(WsMessage::Order(json!({"orderId": 1})))
667 .await;
668 actor.handle_frame(WsMessage::Pnl(json!({"upnl": 1.0}))).await;
669 actor
670 .handle_frame(WsMessage::MarketData {
671 conid: 265598,
672 payload: json!({"31": "150.25"}),
673 })
674 .await;
675
676 let rings = actor.rings.read().await;
677 assert_eq!(rings.get("orders").unwrap().len(), 1);
678 assert_eq!(rings.get("pnl").unwrap().len(), 1);
679 assert_eq!(rings.get("marketdata:265598").unwrap().len(), 1);
680 }
681
682 #[tokio::test]
683 async fn heartbeat_and_system_frames_dont_create_topics() {
684 let (_tx, cmd_rx) = mpsc::channel(1);
685 let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
686 let mut actor = ConnectorActor {
687 client,
688 cfg: ConnectorCfg::default(),
689 rings: Arc::new(RwLock::new(HashMap::new())),
690 status: Arc::new(RwLock::new(StatusState::default())),
691 cmd_rx,
692 active_marketdata_subs: BTreeSet::new(),
693 };
694
695 actor.handle_frame(WsMessage::Heartbeat).await;
696 actor.handle_frame(WsMessage::System(json!({"msg": "ready"}))).await;
697
698 let rings = actor.rings.read().await;
699 assert!(rings.is_empty());
700
701 let s = actor.status.read().await;
703 assert!(s.last_message_at.is_some());
704 }
705
706 #[tokio::test]
707 async fn bump_epoch_skips_gap_on_first_boot() {
708 let (_tx, cmd_rx) = mpsc::channel(1);
709 let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
710 let actor = ConnectorActor {
711 client,
712 cfg: ConnectorCfg::default(),
713 rings: Arc::new(RwLock::new(HashMap::new())),
714 status: Arc::new(RwLock::new(StatusState::default())),
715 cmd_rx,
716 active_marketdata_subs: BTreeSet::new(),
717 };
718
719 actor.bump_epoch_with_gap(GapReason::ReconnectedAfterDisconnect).await;
720
721 let rings = actor.rings.read().await;
723 assert!(rings.is_empty());
724 assert_eq!(actor.status.read().await.reset_epoch, 1);
726 }
727
728 #[tokio::test]
729 async fn bump_epoch_injects_gap_into_existing_topics() {
730 let (_tx, cmd_rx) = mpsc::channel(1);
731 let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
732 let mut actor = ConnectorActor {
733 client,
734 cfg: ConnectorCfg::default(),
735 rings: Arc::new(RwLock::new(HashMap::new())),
736 status: Arc::new(RwLock::new(StatusState::default())),
737 cmd_rx,
738 active_marketdata_subs: BTreeSet::new(),
739 };
740
741 actor.handle_frame(WsMessage::Order(json!({"id": 1}))).await;
743 actor.bump_epoch_with_gap(GapReason::ReconnectedAfterDisconnect).await;
745
746 let rings = actor.rings.read().await;
747 let orders_ring = rings.get("orders").unwrap();
748 assert_eq!(orders_ring.len(), 2);
750 assert_eq!(rings.get("gap").unwrap().len(), 1);
752 }
753
754 #[tokio::test]
755 async fn ensure_market_data_dedups_repeat_calls() {
756 let (_tx, cmd_rx) = mpsc::channel(1);
760 let client = bezant::Client::new("https://localhost:5000/v1/api").unwrap();
761 let mut actor = ConnectorActor {
762 client,
763 cfg: ConnectorCfg::default(),
764 rings: Arc::new(RwLock::new(HashMap::new())),
765 status: Arc::new(RwLock::new(StatusState::default())),
766 cmd_rx,
767 active_marketdata_subs: BTreeSet::new(),
768 };
769
770 assert!(actor.active_marketdata_subs.insert(265_598));
771 assert!(!actor.active_marketdata_subs.insert(265_598));
773 assert_eq!(actor.active_marketdata_subs.len(), 1);
774 }
775}