bezant_server/events/mod.rs
1//! Event capture + REST surface for streaming CPAPI topics.
2//!
3//! `bezant-server` runs an internal [`bezant::WsClient`] that subscribes
4//! to order, PnL, and (lazily) market-data feeds. Decoded frames go into
5//! per-topic ring buffers behind cursor-based REST endpoints
6//! (`/events/{topic}?since=<cursor>`), so polling consumers can read every
7//! event the socket has seen since their last visit — no events lost
8//! between client polls, no need to keep a long-lived WS open from the
9//! consumer side.
10//!
11//! See the architecture sketch:
12//!
13//! ```text
14//! CPGateway WS ─► bezant_core::WsClient ─► Connector (tokio task)
15//! │
16//! ▼
17//! per-topic TopicRing
18//! │
19//! ▼
20//! GET /events/{topic}?since=<cursor>
21//! ```
22
23pub mod connector;
24pub mod persistence;
25pub mod ring;
26
27use std::collections::BTreeMap;
28
29use serde::Serialize;
30
31pub use connector::{spawn_connector, ConnectorCfg, EventsHandle, TestSink};
32pub use persistence::{EventLog, RetentionPolicy};
33pub use ring::{ReadResult, TopicRing};
34
35/// One captured event. Wire-shape returned by `/events/{topic}` endpoints.
36#[derive(Clone, Debug, Serialize)]
37pub struct ObservedEvent {
38 /// Server-assigned monotonic cursor within `(topic, reset_epoch)`.
39 /// Use it as the `since=` parameter on the next poll.
40 pub cursor: u64,
41 /// Topic name — `"orders"`, `"pnl"`, `"marketdata:265598"`, `"gap"`.
42 pub topic: String,
43 /// RFC 3339 timestamp at which the connector pushed this into the ring.
44 pub received_at: String,
45 /// Increments every time the underlying WS reconnects or the server
46 /// restarts. Clients use it to detect "the cursor space reset under me".
47 pub reset_epoch: u64,
48 /// The decoded JSON frame. Shape depends on the topic — see the
49 /// generated `bezant-client` TS types for the typed views.
50 pub payload: serde_json::Value,
51}
52
53/// Snapshot of the connector's state, returned by `GET /events/_status`.
54#[derive(Clone, Debug, Default, Serialize)]
55pub struct EventsStatus {
56 /// `true` if the underlying WS is currently connected.
57 pub connected: bool,
58 /// RFC 3339 timestamp of the last frame the connector received, if any.
59 pub last_message_at: Option<String>,
60 /// How many times the connector has reconnected since the process
61 /// started. Bumps on every successful reconnect, not on each retry.
62 pub reconnect_count: u64,
63 /// Wall-clock seconds since the connector task spawned.
64 pub uptime_seconds: u64,
65 /// Current `reset_epoch` — bumps on each reconnect or process restart.
66 pub reset_epoch: u64,
67 /// Topics currently subscribed at the upstream WS. Always includes
68 /// `"orders"` and `"pnl"`; market data topics appear when a client
69 /// has polled `/events/marketdata?conid=…` recently.
70 pub topics_subscribed: Vec<String>,
71 /// Per-topic ring buffer occupancy. Useful for "are we close to
72 /// wraparound?" capacity planning.
73 pub buffer_sizes: BTreeMap<String, usize>,
74}
75
76/// Reason a synthetic [`ObservedEvent`] of topic `"gap"` was injected.
77/// Surfaced to the client so they know they missed something.
78#[derive(Clone, Copy, Debug, Serialize)]
79#[serde(rename_all = "snake_case")]
80pub enum GapReason {
81 /// The connector reconnected — anything that happened while
82 /// disconnected is permanently gone from the upstream feed.
83 ReconnectedAfterDisconnect,
84 /// Process restarted — ring buffers were freshly initialised.
85 ProcessRestart,
86}