Skip to main content

bezant_server/events/
persistence.rs

1//! Optional sqlite persistence for captured events.
2//!
3//! The ring buffer in [`super::ring::TopicRing`] is the primary read
4//! path — fast, in-memory, bounded. Sqlite is the **historical** store:
5//! every event we push to a ring is also appended to a single
6//! `events` table so the `GET /events/{topic}/history?since_ts=…`
7//! endpoint can reach back beyond ring capacity (and beyond container
8//! lifetime).
9//!
10//! Schema:
11//!
12//! ```sql
13//! CREATE TABLE events (
14//!   id           INTEGER PRIMARY KEY AUTOINCREMENT,
15//!   cursor       INTEGER NOT NULL,
16//!   topic        TEXT    NOT NULL,
17//!   received_at  TEXT    NOT NULL,    -- ISO 8601
18//!   reset_epoch  INTEGER NOT NULL,
19//!   payload      TEXT    NOT NULL     -- JSON
20//! );
21//! CREATE INDEX events_topic_received_at ON events(topic, received_at);
22//! ```
23//!
24//! Retention is configurable per topic:
25//! - `orders`     — 90 days (default)
26//! - `pnl`        — 90 days
27//! - `marketdata:*` — 14 days (high volume; bigger window doesn't pay)
28//! - `gap`        — 365 days (low volume, useful forever)
29//!
30//! [`EventLog::prune_older_than`] is intended to be called from a
31//! once-an-hour task; nothing in the read path waits on it.
32
33use std::path::{Path, PathBuf};
34use std::sync::Mutex;
35
36use rusqlite::{params, Connection, OptionalExtension};
37
38use super::ObservedEvent;
39
40/// Sqlite-backed historical event store. Wraps a [`Connection`] in a
41/// [`Mutex`] so it can be shared across the connector + axum tasks
42/// without unsafe juggling. Reads are fast; writes serialise.
43pub struct EventLog {
44    conn: Mutex<Connection>,
45    /// Path the connection was opened against (for diagnostics).
46    path: PathBuf,
47}
48
49impl std::fmt::Debug for EventLog {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("EventLog").field("path", &self.path).finish()
52    }
53}
54
55/// Per-topic retention policy. Topics not in the map default to 30 days.
56#[derive(Clone, Debug)]
57pub struct RetentionPolicy {
58    /// Default retention for topics not explicitly listed.
59    pub default_days: i64,
60    /// Days for `orders`.
61    pub orders_days: i64,
62    /// Days for `pnl`.
63    pub pnl_days: i64,
64    /// Days for `marketdata:*` topics.
65    pub marketdata_days: i64,
66    /// Days for `gap`.
67    pub gap_days: i64,
68}
69
70impl Default for RetentionPolicy {
71    fn default() -> Self {
72        Self {
73            default_days: 30,
74            orders_days: 90,
75            pnl_days: 90,
76            marketdata_days: 14,
77            gap_days: 365,
78        }
79    }
80}
81
82impl RetentionPolicy {
83    /// How many days to keep events for `topic`.
84    #[must_use]
85    pub fn days_for(&self, topic: &str) -> i64 {
86        if topic == "orders" {
87            self.orders_days
88        } else if topic == "pnl" {
89            self.pnl_days
90        } else if topic == "gap" {
91            self.gap_days
92        } else if topic.starts_with("marketdata:") {
93            self.marketdata_days
94        } else {
95            self.default_days
96        }
97    }
98}
99
100impl EventLog {
101    /// Open or create a sqlite database at `path`. Runs the schema
102    /// migration on first call (idempotent).
103    pub fn open(path: impl AsRef<Path>) -> rusqlite::Result<Self> {
104        let conn = Connection::open(path.as_ref())?;
105        conn.pragma_update(None, "journal_mode", "WAL")?;
106        conn.pragma_update(None, "synchronous", "NORMAL")?;
107        conn.execute_batch(
108            r"
109            CREATE TABLE IF NOT EXISTS events (
110                id           INTEGER PRIMARY KEY AUTOINCREMENT,
111                cursor       INTEGER NOT NULL,
112                topic        TEXT    NOT NULL,
113                received_at  TEXT    NOT NULL,
114                reset_epoch  INTEGER NOT NULL,
115                payload      TEXT    NOT NULL
116            );
117            CREATE INDEX IF NOT EXISTS events_topic_received_at
118                ON events(topic, received_at);
119            ",
120        )?;
121        Ok(Self {
122            conn: Mutex::new(conn),
123            path: path.as_ref().to_path_buf(),
124        })
125    }
126
127    /// Open an in-memory database — used by tests.
128    pub fn open_in_memory() -> rusqlite::Result<Self> {
129        let conn = Connection::open_in_memory()?;
130        conn.execute_batch(
131            r"
132            CREATE TABLE events (
133                id           INTEGER PRIMARY KEY AUTOINCREMENT,
134                cursor       INTEGER NOT NULL,
135                topic        TEXT    NOT NULL,
136                received_at  TEXT    NOT NULL,
137                reset_epoch  INTEGER NOT NULL,
138                payload      TEXT    NOT NULL
139            );
140            CREATE INDEX events_topic_received_at
141                ON events(topic, received_at);
142            ",
143        )?;
144        Ok(Self {
145            conn: Mutex::new(conn),
146            path: PathBuf::from(":memory:"),
147        })
148    }
149
150    /// Path the log was opened against.
151    #[must_use]
152    pub fn path(&self) -> &Path {
153        &self.path
154    }
155
156    /// Append an event. Returns the row id.
157    pub fn append(&self, event: &ObservedEvent) -> rusqlite::Result<i64> {
158        let payload_str = serde_json::to_string(&event.payload)
159            .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
160        let conn = self.conn.lock().unwrap();
161        conn.execute(
162            "INSERT INTO events (cursor, topic, received_at, reset_epoch, payload) \
163             VALUES (?1, ?2, ?3, ?4, ?5)",
164            params![
165                event.cursor as i64,
166                event.topic,
167                event.received_at,
168                event.reset_epoch as i64,
169                payload_str,
170            ],
171        )?;
172        Ok(conn.last_insert_rowid())
173    }
174
175    /// Read events for a topic since `since_ts` (ISO 8601). Newest-last.
176    pub fn query_since(
177        &self,
178        topic: &str,
179        since_ts: &str,
180        limit: usize,
181    ) -> rusqlite::Result<Vec<ObservedEvent>> {
182        let conn = self.conn.lock().unwrap();
183        let mut stmt = conn.prepare(
184            "SELECT cursor, topic, received_at, reset_epoch, payload \
185             FROM events WHERE topic = ?1 AND received_at > ?2 \
186             ORDER BY received_at ASC LIMIT ?3",
187        )?;
188        let rows = stmt.query_map(
189            params![topic, since_ts, limit as i64],
190            |row| {
191                let cursor: i64 = row.get(0)?;
192                let topic: String = row.get(1)?;
193                let received_at: String = row.get(2)?;
194                let reset_epoch: i64 = row.get(3)?;
195                let payload_str: String = row.get(4)?;
196                let payload: serde_json::Value = serde_json::from_str(&payload_str)
197                    .unwrap_or(serde_json::Value::Null);
198                Ok(ObservedEvent {
199                    cursor: cursor as u64,
200                    topic,
201                    received_at,
202                    reset_epoch: reset_epoch as u64,
203                    payload,
204                })
205            },
206        )?;
207        let mut out = Vec::new();
208        for r in rows {
209            out.push(r?);
210        }
211        Ok(out)
212    }
213
214    /// Drop events older than the topic's retention cutoff. Returns
215    /// total rows deleted across all topics.
216    pub fn prune(&self, policy: &RetentionPolicy) -> rusqlite::Result<usize> {
217        let conn = self.conn.lock().unwrap();
218        let now: chrono::DateTime<chrono::Utc> = std::time::SystemTime::now().into();
219        let mut total = 0usize;
220        let topics: Vec<String> = {
221            let mut stmt = conn.prepare("SELECT DISTINCT topic FROM events")?;
222            let mapped = stmt.query_map([], |r| r.get::<_, String>(0))?;
223            let mut v = Vec::new();
224            for m in mapped {
225                v.push(m?);
226            }
227            v
228        };
229        for topic in topics {
230            let days = policy.days_for(&topic);
231            let cutoff = now - chrono::Duration::days(days);
232            let cutoff_str = cutoff.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
233            let n = conn.execute(
234                "DELETE FROM events WHERE topic = ?1 AND received_at < ?2",
235                params![topic, cutoff_str],
236            )?;
237            total += n;
238        }
239        Ok(total)
240    }
241
242    /// Total row count. Used by tests + diagnostics.
243    pub fn count(&self) -> rusqlite::Result<usize> {
244        let conn = self.conn.lock().unwrap();
245        let n: i64 = conn
246            .query_row("SELECT COUNT(*) FROM events", [], |r| r.get(0))
247            .optional()?
248            .unwrap_or(0);
249        Ok(n as usize)
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use serde_json::json;
257
258    fn evt(cursor: u64, topic: &str, ts: &str) -> ObservedEvent {
259        ObservedEvent {
260            cursor,
261            topic: topic.to_string(),
262            received_at: ts.to_string(),
263            reset_epoch: 1,
264            payload: json!({"id": cursor}),
265        }
266    }
267
268    #[test]
269    fn append_and_query_round_trip() {
270        let log = EventLog::open_in_memory().unwrap();
271        log.append(&evt(1, "orders", "2026-05-06T13:30:00Z")).unwrap();
272        log.append(&evt(2, "orders", "2026-05-06T13:31:00Z")).unwrap();
273        log.append(&evt(3, "pnl", "2026-05-06T13:32:00Z")).unwrap();
274
275        let orders = log.query_since("orders", "1970-01-01T00:00:00Z", 100).unwrap();
276        assert_eq!(orders.len(), 2);
277        assert_eq!(orders[0].cursor, 1);
278        assert_eq!(orders[1].cursor, 2);
279
280        let pnl = log.query_since("pnl", "1970-01-01T00:00:00Z", 100).unwrap();
281        assert_eq!(pnl.len(), 1);
282        assert_eq!(pnl[0].topic, "pnl");
283    }
284
285    #[test]
286    fn query_filters_by_since_ts() {
287        let log = EventLog::open_in_memory().unwrap();
288        log.append(&evt(1, "orders", "2026-05-06T13:30:00Z")).unwrap();
289        log.append(&evt(2, "orders", "2026-05-06T13:31:00Z")).unwrap();
290        log.append(&evt(3, "orders", "2026-05-06T13:32:00Z")).unwrap();
291
292        let recent = log.query_since("orders", "2026-05-06T13:30:30Z", 100).unwrap();
293        assert_eq!(recent.len(), 2);
294        assert_eq!(recent[0].cursor, 2);
295    }
296
297    #[test]
298    fn query_respects_limit() {
299        let log = EventLog::open_in_memory().unwrap();
300        for i in 1..=10 {
301            let ts = format!("2026-05-06T13:{:02}:00Z", 30 + i);
302            log.append(&evt(i, "orders", &ts)).unwrap();
303        }
304        let result = log.query_since("orders", "1970-01-01T00:00:00Z", 3).unwrap();
305        assert_eq!(result.len(), 3);
306        assert_eq!(result[0].cursor, 1);
307    }
308
309    #[test]
310    fn prune_drops_old_events_per_topic_policy() {
311        let log = EventLog::open_in_memory().unwrap();
312        // Old events (>90 days for orders / >14 days for marketdata)
313        let ancient = (chrono::Utc::now() - chrono::Duration::days(120))
314            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
315        let mid = (chrono::Utc::now() - chrono::Duration::days(20))
316            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
317        let recent = (chrono::Utc::now() - chrono::Duration::days(1))
318            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
319
320        log.append(&evt(1, "orders", &ancient)).unwrap();
321        log.append(&evt(2, "orders", &mid)).unwrap();
322        log.append(&evt(3, "orders", &recent)).unwrap();
323        log.append(&evt(10, "marketdata:1", &mid)).unwrap();
324        log.append(&evt(11, "marketdata:1", &recent)).unwrap();
325
326        let policy = RetentionPolicy::default();
327        let dropped = log.prune(&policy).unwrap();
328        // orders: 120-day-old dropped; marketdata: 20-day-old dropped
329        assert_eq!(dropped, 2);
330
331        assert_eq!(log.count().unwrap(), 3);
332    }
333
334    #[test]
335    fn retention_policy_picks_per_topic_days() {
336        let p = RetentionPolicy::default();
337        assert_eq!(p.days_for("orders"), 90);
338        assert_eq!(p.days_for("pnl"), 90);
339        assert_eq!(p.days_for("gap"), 365);
340        assert_eq!(p.days_for("marketdata:265598"), 14);
341        assert_eq!(p.days_for("unknown"), 30);
342    }
343}