1use std::path::{Path, PathBuf};
34use std::sync::Mutex;
35
36use rusqlite::{params, Connection, OptionalExtension};
37
38use super::ObservedEvent;
39
40pub struct EventLog {
44 conn: Mutex<Connection>,
45 path: PathBuf,
47}
48
49impl std::fmt::Debug for EventLog {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("EventLog").field("path", &self.path).finish()
52 }
53}
54
55#[derive(Clone, Debug)]
57pub struct RetentionPolicy {
58 pub default_days: i64,
60 pub orders_days: i64,
62 pub pnl_days: i64,
64 pub marketdata_days: i64,
66 pub gap_days: i64,
68}
69
70impl Default for RetentionPolicy {
71 fn default() -> Self {
72 Self {
73 default_days: 30,
74 orders_days: 90,
75 pnl_days: 90,
76 marketdata_days: 14,
77 gap_days: 365,
78 }
79 }
80}
81
82impl RetentionPolicy {
83 #[must_use]
85 pub fn days_for(&self, topic: &str) -> i64 {
86 if topic == "orders" {
87 self.orders_days
88 } else if topic == "pnl" {
89 self.pnl_days
90 } else if topic == "gap" {
91 self.gap_days
92 } else if topic.starts_with("marketdata:") {
93 self.marketdata_days
94 } else {
95 self.default_days
96 }
97 }
98}
99
100impl EventLog {
101 pub fn open(path: impl AsRef<Path>) -> rusqlite::Result<Self> {
104 let conn = Connection::open(path.as_ref())?;
105 conn.pragma_update(None, "journal_mode", "WAL")?;
106 conn.pragma_update(None, "synchronous", "NORMAL")?;
107 conn.execute_batch(
108 r"
109 CREATE TABLE IF NOT EXISTS events (
110 id INTEGER PRIMARY KEY AUTOINCREMENT,
111 cursor INTEGER NOT NULL,
112 topic TEXT NOT NULL,
113 received_at TEXT NOT NULL,
114 reset_epoch INTEGER NOT NULL,
115 payload TEXT NOT NULL
116 );
117 CREATE INDEX IF NOT EXISTS events_topic_received_at
118 ON events(topic, received_at);
119 ",
120 )?;
121 Ok(Self {
122 conn: Mutex::new(conn),
123 path: path.as_ref().to_path_buf(),
124 })
125 }
126
127 pub fn open_in_memory() -> rusqlite::Result<Self> {
129 let conn = Connection::open_in_memory()?;
130 conn.execute_batch(
131 r"
132 CREATE TABLE events (
133 id INTEGER PRIMARY KEY AUTOINCREMENT,
134 cursor INTEGER NOT NULL,
135 topic TEXT NOT NULL,
136 received_at TEXT NOT NULL,
137 reset_epoch INTEGER NOT NULL,
138 payload TEXT NOT NULL
139 );
140 CREATE INDEX events_topic_received_at
141 ON events(topic, received_at);
142 ",
143 )?;
144 Ok(Self {
145 conn: Mutex::new(conn),
146 path: PathBuf::from(":memory:"),
147 })
148 }
149
150 #[must_use]
152 pub fn path(&self) -> &Path {
153 &self.path
154 }
155
156 pub fn append(&self, event: &ObservedEvent) -> rusqlite::Result<i64> {
158 let payload_str = serde_json::to_string(&event.payload)
159 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
160 let conn = self.conn.lock().unwrap();
161 conn.execute(
162 "INSERT INTO events (cursor, topic, received_at, reset_epoch, payload) \
163 VALUES (?1, ?2, ?3, ?4, ?5)",
164 params![
165 event.cursor as i64,
166 event.topic,
167 event.received_at,
168 event.reset_epoch as i64,
169 payload_str,
170 ],
171 )?;
172 Ok(conn.last_insert_rowid())
173 }
174
175 pub fn query_since(
177 &self,
178 topic: &str,
179 since_ts: &str,
180 limit: usize,
181 ) -> rusqlite::Result<Vec<ObservedEvent>> {
182 let conn = self.conn.lock().unwrap();
183 let mut stmt = conn.prepare(
184 "SELECT cursor, topic, received_at, reset_epoch, payload \
185 FROM events WHERE topic = ?1 AND received_at > ?2 \
186 ORDER BY received_at ASC LIMIT ?3",
187 )?;
188 let rows = stmt.query_map(
189 params![topic, since_ts, limit as i64],
190 |row| {
191 let cursor: i64 = row.get(0)?;
192 let topic: String = row.get(1)?;
193 let received_at: String = row.get(2)?;
194 let reset_epoch: i64 = row.get(3)?;
195 let payload_str: String = row.get(4)?;
196 let payload: serde_json::Value = serde_json::from_str(&payload_str)
197 .unwrap_or(serde_json::Value::Null);
198 Ok(ObservedEvent {
199 cursor: cursor as u64,
200 topic,
201 received_at,
202 reset_epoch: reset_epoch as u64,
203 payload,
204 })
205 },
206 )?;
207 let mut out = Vec::new();
208 for r in rows {
209 out.push(r?);
210 }
211 Ok(out)
212 }
213
214 pub fn prune(&self, policy: &RetentionPolicy) -> rusqlite::Result<usize> {
217 let conn = self.conn.lock().unwrap();
218 let now: chrono::DateTime<chrono::Utc> = std::time::SystemTime::now().into();
219 let mut total = 0usize;
220 let topics: Vec<String> = {
221 let mut stmt = conn.prepare("SELECT DISTINCT topic FROM events")?;
222 let mapped = stmt.query_map([], |r| r.get::<_, String>(0))?;
223 let mut v = Vec::new();
224 for m in mapped {
225 v.push(m?);
226 }
227 v
228 };
229 for topic in topics {
230 let days = policy.days_for(&topic);
231 let cutoff = now - chrono::Duration::days(days);
232 let cutoff_str = cutoff.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
233 let n = conn.execute(
234 "DELETE FROM events WHERE topic = ?1 AND received_at < ?2",
235 params![topic, cutoff_str],
236 )?;
237 total += n;
238 }
239 Ok(total)
240 }
241
242 pub fn count(&self) -> rusqlite::Result<usize> {
244 let conn = self.conn.lock().unwrap();
245 let n: i64 = conn
246 .query_row("SELECT COUNT(*) FROM events", [], |r| r.get(0))
247 .optional()?
248 .unwrap_or(0);
249 Ok(n as usize)
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use serde_json::json;
257
258 fn evt(cursor: u64, topic: &str, ts: &str) -> ObservedEvent {
259 ObservedEvent {
260 cursor,
261 topic: topic.to_string(),
262 received_at: ts.to_string(),
263 reset_epoch: 1,
264 payload: json!({"id": cursor}),
265 }
266 }
267
268 #[test]
269 fn append_and_query_round_trip() {
270 let log = EventLog::open_in_memory().unwrap();
271 log.append(&evt(1, "orders", "2026-05-06T13:30:00Z")).unwrap();
272 log.append(&evt(2, "orders", "2026-05-06T13:31:00Z")).unwrap();
273 log.append(&evt(3, "pnl", "2026-05-06T13:32:00Z")).unwrap();
274
275 let orders = log.query_since("orders", "1970-01-01T00:00:00Z", 100).unwrap();
276 assert_eq!(orders.len(), 2);
277 assert_eq!(orders[0].cursor, 1);
278 assert_eq!(orders[1].cursor, 2);
279
280 let pnl = log.query_since("pnl", "1970-01-01T00:00:00Z", 100).unwrap();
281 assert_eq!(pnl.len(), 1);
282 assert_eq!(pnl[0].topic, "pnl");
283 }
284
285 #[test]
286 fn query_filters_by_since_ts() {
287 let log = EventLog::open_in_memory().unwrap();
288 log.append(&evt(1, "orders", "2026-05-06T13:30:00Z")).unwrap();
289 log.append(&evt(2, "orders", "2026-05-06T13:31:00Z")).unwrap();
290 log.append(&evt(3, "orders", "2026-05-06T13:32:00Z")).unwrap();
291
292 let recent = log.query_since("orders", "2026-05-06T13:30:30Z", 100).unwrap();
293 assert_eq!(recent.len(), 2);
294 assert_eq!(recent[0].cursor, 2);
295 }
296
297 #[test]
298 fn query_respects_limit() {
299 let log = EventLog::open_in_memory().unwrap();
300 for i in 1..=10 {
301 let ts = format!("2026-05-06T13:{:02}:00Z", 30 + i);
302 log.append(&evt(i, "orders", &ts)).unwrap();
303 }
304 let result = log.query_since("orders", "1970-01-01T00:00:00Z", 3).unwrap();
305 assert_eq!(result.len(), 3);
306 assert_eq!(result[0].cursor, 1);
307 }
308
309 #[test]
310 fn prune_drops_old_events_per_topic_policy() {
311 let log = EventLog::open_in_memory().unwrap();
312 let ancient = (chrono::Utc::now() - chrono::Duration::days(120))
314 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
315 let mid = (chrono::Utc::now() - chrono::Duration::days(20))
316 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
317 let recent = (chrono::Utc::now() - chrono::Duration::days(1))
318 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
319
320 log.append(&evt(1, "orders", &ancient)).unwrap();
321 log.append(&evt(2, "orders", &mid)).unwrap();
322 log.append(&evt(3, "orders", &recent)).unwrap();
323 log.append(&evt(10, "marketdata:1", &mid)).unwrap();
324 log.append(&evt(11, "marketdata:1", &recent)).unwrap();
325
326 let policy = RetentionPolicy::default();
327 let dropped = log.prune(&policy).unwrap();
328 assert_eq!(dropped, 2);
330
331 assert_eq!(log.count().unwrap(), 3);
332 }
333
334 #[test]
335 fn retention_policy_picks_per_topic_days() {
336 let p = RetentionPolicy::default();
337 assert_eq!(p.days_for("orders"), 90);
338 assert_eq!(p.days_for("pnl"), 90);
339 assert_eq!(p.days_for("gap"), 365);
340 assert_eq!(p.days_for("marketdata:265598"), 14);
341 assert_eq!(p.days_for("unknown"), 30);
342 }
343}