Skip to main content

bezant_server/events/
mod.rs

1//! Event capture + REST surface for streaming CPAPI topics.
2//!
3//! `bezant-server` runs an internal [`bezant::WsClient`] that subscribes
4//! to order, PnL, and (lazily) market-data feeds. Decoded frames go into
5//! per-topic ring buffers behind cursor-based REST endpoints
6//! (`/events/{topic}?since=<cursor>`), so polling consumers can read every
7//! event the socket has seen since their last visit — no events lost
8//! between client polls, no need to keep a long-lived WS open from the
9//! consumer side.
10//!
11//! See the architecture sketch:
12//!
13//! ```text
14//! CPGateway WS  ─► bezant_core::WsClient  ─► Connector (tokio task)
15//!                                              │
16//!                                              ▼
17//!                                       per-topic TopicRing
18//!                                              │
19//!                                              ▼
20//!                                  GET /events/{topic}?since=<cursor>
21//! ```
22
23pub mod connector;
24pub mod persistence;
25pub mod ring;
26
27use std::collections::BTreeMap;
28
29use serde::Serialize;
30
31pub use connector::{spawn_connector, ConnectorCfg, EventsHandle, TestSink};
32pub use persistence::{EventLog, RetentionPolicy};
33pub use ring::{ReadResult, TopicRing};
34
35/// One captured event. Wire-shape returned by `/events/{topic}` endpoints.
36#[derive(Clone, Debug, Serialize)]
37pub struct ObservedEvent {
38    /// Server-assigned monotonic cursor within `(topic, reset_epoch)`.
39    /// Use it as the `since=` parameter on the next poll.
40    pub cursor: u64,
41    /// Topic name — `"orders"`, `"pnl"`, `"marketdata:265598"`, `"gap"`.
42    pub topic: String,
43    /// RFC 3339 timestamp at which the connector pushed this into the ring.
44    pub received_at: String,
45    /// Increments every time the underlying WS reconnects or the server
46    /// restarts. Clients use it to detect "the cursor space reset under me".
47    pub reset_epoch: u64,
48    /// The decoded JSON frame. Shape depends on the topic — see the
49    /// generated `bezant-client` TS types for the typed views.
50    pub payload: serde_json::Value,
51}
52
53/// Snapshot of the connector's state, returned by `GET /events/_status`.
54#[derive(Clone, Debug, Default, Serialize)]
55pub struct EventsStatus {
56    /// `true` if the underlying WS is currently connected.
57    pub connected: bool,
58    /// RFC 3339 timestamp of the last frame the connector received, if any.
59    pub last_message_at: Option<String>,
60    /// How many times the connector has reconnected since the process
61    /// started. Bumps on every successful reconnect, not on each retry.
62    pub reconnect_count: u64,
63    /// Wall-clock seconds since the connector task spawned.
64    pub uptime_seconds: u64,
65    /// Current `reset_epoch` — bumps on each reconnect or process restart.
66    pub reset_epoch: u64,
67    /// Topics currently subscribed at the upstream WS. Always includes
68    /// `"orders"` and `"pnl"`; market data topics appear when a client
69    /// has polled `/events/marketdata?conid=…` recently.
70    pub topics_subscribed: Vec<String>,
71    /// Per-topic ring buffer occupancy. Useful for "are we close to
72    /// wraparound?" capacity planning.
73    pub buffer_sizes: BTreeMap<String, usize>,
74}
75
76/// Reason a synthetic [`ObservedEvent`] of topic `"gap"` was injected.
77/// Surfaced to the client so they know they missed something.
78#[derive(Clone, Copy, Debug, Serialize)]
79#[serde(rename_all = "snake_case")]
80pub enum GapReason {
81    /// The connector reconnected — anything that happened while
82    /// disconnected is permanently gone from the upstream feed.
83    ReconnectedAfterDisconnect,
84    /// Process restarted — ring buffers were freshly initialised.
85    ProcessRestart,
86}