Skip to main content

bezant_server/events/
ring.rs

1//! Per-topic ring buffer for captured events.
2//!
3//! Each [`TopicRing`] is bounded in capacity. When a push would exceed
4//! capacity, the oldest event is evicted (FIFO). Cursors are server-
5//! assigned monotonically within `(topic, reset_epoch)` — they never
6//! collide as long as `reset_epoch` is unique per process run, and they
7//! never reset within a single run (so client cursor comparisons stay
8//! cheap).
9//!
10//! The ring is read by cursor: `read_since(cursor, limit)` returns events
11//! whose cursor is strictly greater than `cursor`, up to `limit`. If the
12//! caller's cursor is older than the buffer's current head (the smallest
13//! cursor still buffered), the read returns [`ReadResult::CursorExpired`]
14//! so the client can resync to head and emit a synthetic gap event.
15
16use std::collections::VecDeque;
17
18use serde_json::Value;
19
20use super::ObservedEvent;
21
22/// FIFO ring of [`ObservedEvent`] keyed by cursor. Single owner; not
23/// `Send + Sync` on its own — wrap in `Arc<RwLock<…>>` for cross-task
24/// access.
25#[derive(Debug)]
26pub struct TopicRing {
27    inner: VecDeque<ObservedEvent>,
28    capacity: usize,
29    next_cursor: u64,
30    head_cursor: u64,
31    reset_epoch: u64,
32    topic: String,
33}
34
35/// Outcome of [`TopicRing::read_since`].
36#[derive(Debug, Clone)]
37pub enum ReadResult {
38    /// Zero or more events newer than the requested cursor. `next_cursor`
39    /// is the cursor the caller should send on their next poll.
40    Ok {
41        /// Events strictly newer than the requested cursor, in insertion
42        /// order, capped at the requested limit.
43        events: Vec<ObservedEvent>,
44        /// Cursor to send on the next poll.
45        next_cursor: u64,
46    },
47    /// The caller's cursor is older than the oldest event still buffered.
48    /// They must reset to `head_cursor - 1` (or `0`) and emit a gap.
49    CursorExpired {
50        /// Smallest cursor currently buffered. Use `head_cursor - 1` (or
51        /// `0` if `head_cursor == 0`) as the new `since=` parameter.
52        head_cursor: u64,
53        /// Current `reset_epoch` so the caller can detect resets.
54        reset_epoch: u64,
55    },
56}
57
58impl TopicRing {
59    /// Construct a ring with the given capacity for a specific topic and
60    /// reset epoch.
61    #[must_use]
62    pub fn new(topic: impl Into<String>, capacity: usize, reset_epoch: u64) -> Self {
63        Self {
64            inner: VecDeque::with_capacity(capacity.max(1)),
65            capacity: capacity.max(1),
66            next_cursor: 1,
67            head_cursor: 1,
68            reset_epoch,
69            topic: topic.into(),
70        }
71    }
72
73    /// Topic name this ring captures (e.g. `"orders"`).
74    #[must_use]
75    pub fn topic(&self) -> &str {
76        &self.topic
77    }
78
79    /// Current `reset_epoch`.
80    #[must_use]
81    pub fn reset_epoch(&self) -> u64 {
82        self.reset_epoch
83    }
84
85    /// Number of events currently buffered.
86    #[must_use]
87    pub fn len(&self) -> usize {
88        self.inner.len()
89    }
90
91    /// Whether the ring contains no events.
92    #[must_use]
93    pub fn is_empty(&self) -> bool {
94        self.inner.is_empty()
95    }
96
97    /// Smallest cursor currently buffered. Equal to `next_cursor` when empty.
98    #[must_use]
99    pub fn head_cursor(&self) -> u64 {
100        self.head_cursor
101    }
102
103    /// Cursor that the next push will use. Useful in tests.
104    #[must_use]
105    pub fn next_cursor(&self) -> u64 {
106        self.next_cursor
107    }
108
109    /// Push a new event with the given payload + RFC 3339 receive
110    /// timestamp. Returns the assigned cursor. If the ring is at
111    /// capacity, the oldest event is evicted (and `head_cursor` advances).
112    pub fn push(&mut self, payload: Value, received_at: String) -> u64 {
113        let cursor = self.next_cursor;
114        self.next_cursor = self.next_cursor.saturating_add(1);
115
116        let event = ObservedEvent {
117            cursor,
118            topic: self.topic.clone(),
119            received_at,
120            reset_epoch: self.reset_epoch,
121            payload,
122        };
123
124        if self.inner.len() == self.capacity {
125            // Evict oldest. head_cursor moves to whatever's now the
126            // oldest still-buffered event's cursor.
127            let evicted = self.inner.pop_front();
128            if let Some(_) = evicted {
129                // After eviction the new oldest is whatever's at front
130                // (or, if empty, the cursor we're about to push).
131                self.head_cursor = self
132                    .inner
133                    .front()
134                    .map(|e| e.cursor)
135                    .unwrap_or(cursor);
136            }
137        }
138
139        self.inner.push_back(event);
140        // If this was the very first event in a fresh ring, anchor head.
141        if self.inner.len() == 1 {
142            self.head_cursor = cursor;
143        }
144
145        cursor
146    }
147
148    /// Read events whose cursor is strictly greater than `since`, capped
149    /// at `limit`. `since == 0` means "everything currently buffered".
150    ///
151    /// Returns [`ReadResult::CursorExpired`] when `since < head_cursor - 1`
152    /// AND the ring has evicted at least one event the caller hasn't seen
153    /// — i.e. there's a real gap, not just "you've read everything".
154    #[must_use]
155    pub fn read_since(&self, since: u64, limit: usize) -> ReadResult {
156        // Caller is fully caught up — no gap, no expiry, just empty.
157        if since >= self.next_cursor.saturating_sub(1) {
158            return ReadResult::Ok {
159                events: Vec::new(),
160                next_cursor: self.next_cursor.saturating_sub(1),
161            };
162        }
163
164        // Caller's cursor is older than our oldest buffered event AND we
165        // have evicted at least one event past their cursor.
166        if since + 1 < self.head_cursor {
167            return ReadResult::CursorExpired {
168                head_cursor: self.head_cursor,
169                reset_epoch: self.reset_epoch,
170            };
171        }
172
173        let events: Vec<ObservedEvent> = self
174            .inner
175            .iter()
176            .filter(|e| e.cursor > since)
177            .take(limit.max(1))
178            .cloned()
179            .collect();
180
181        let next_cursor = events
182            .last()
183            .map(|e| e.cursor)
184            .unwrap_or_else(|| self.next_cursor.saturating_sub(1));
185
186        ReadResult::Ok { events, next_cursor }
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use serde_json::json;
194
195    fn ts() -> String {
196        "2026-05-06T00:00:00Z".to_string()
197    }
198
199    #[test]
200    fn push_assigns_monotonic_cursors() {
201        let mut ring = TopicRing::new("orders", 10, 0);
202        let c1 = ring.push(json!({"n": 1}), ts());
203        let c2 = ring.push(json!({"n": 2}), ts());
204        let c3 = ring.push(json!({"n": 3}), ts());
205        assert_eq!(c1, 1);
206        assert_eq!(c2, 2);
207        assert_eq!(c3, 3);
208    }
209
210    #[test]
211    fn read_since_zero_returns_all_buffered() {
212        let mut ring = TopicRing::new("orders", 10, 0);
213        ring.push(json!({"n": 1}), ts());
214        ring.push(json!({"n": 2}), ts());
215        match ring.read_since(0, 100) {
216            ReadResult::Ok { events, next_cursor } => {
217                assert_eq!(events.len(), 2);
218                assert_eq!(next_cursor, 2);
219                assert_eq!(events[0].payload, json!({"n": 1}));
220                assert_eq!(events[1].payload, json!({"n": 2}));
221            }
222            other => panic!("expected Ok, got {other:?}"),
223        }
224    }
225
226    #[test]
227    fn read_since_returns_only_new_events() {
228        let mut ring = TopicRing::new("orders", 10, 0);
229        ring.push(json!({"n": 1}), ts());
230        ring.push(json!({"n": 2}), ts());
231        ring.push(json!({"n": 3}), ts());
232        match ring.read_since(1, 100) {
233            ReadResult::Ok { events, next_cursor } => {
234                assert_eq!(events.len(), 2);
235                assert_eq!(events[0].cursor, 2);
236                assert_eq!(events[1].cursor, 3);
237                assert_eq!(next_cursor, 3);
238            }
239            other => panic!("expected Ok, got {other:?}"),
240        }
241    }
242
243    #[test]
244    fn read_since_caught_up_returns_empty() {
245        let mut ring = TopicRing::new("orders", 10, 0);
246        ring.push(json!({"n": 1}), ts());
247        ring.push(json!({"n": 2}), ts());
248        match ring.read_since(2, 100) {
249            ReadResult::Ok { events, next_cursor } => {
250                assert!(events.is_empty());
251                assert_eq!(next_cursor, 2);
252            }
253            other => panic!("expected empty Ok, got {other:?}"),
254        }
255    }
256
257    #[test]
258    fn read_since_respects_limit() {
259        let mut ring = TopicRing::new("orders", 100, 0);
260        for i in 0..50 {
261            ring.push(json!({ "n": i }), ts());
262        }
263        match ring.read_since(0, 10) {
264            ReadResult::Ok { events, next_cursor } => {
265                assert_eq!(events.len(), 10);
266                assert_eq!(next_cursor, 10);
267            }
268            other => panic!("expected Ok, got {other:?}"),
269        }
270    }
271
272    #[test]
273    fn capacity_evicts_oldest_first() {
274        let mut ring = TopicRing::new("orders", 3, 0);
275        for i in 1..=5 {
276            ring.push(json!({"n": i}), ts());
277        }
278        // After 5 pushes into cap-3 ring, cursors 1+2 evicted; 3,4,5 remain.
279        assert_eq!(ring.len(), 3);
280        assert_eq!(ring.head_cursor(), 3);
281        match ring.read_since(0, 100) {
282            ReadResult::CursorExpired { head_cursor, .. } => {
283                assert_eq!(head_cursor, 3);
284            }
285            other => panic!("expected CursorExpired, got {other:?}"),
286        }
287    }
288
289    #[test]
290    fn read_after_overflow_at_head_succeeds() {
291        let mut ring = TopicRing::new("orders", 3, 0);
292        for i in 1..=5 {
293            ring.push(json!({"n": i}), ts());
294        }
295        // Asking from head-1 (cursor 2) is OK — we still have cursor 3.
296        match ring.read_since(2, 100) {
297            ReadResult::Ok { events, next_cursor } => {
298                assert_eq!(events.len(), 3);
299                assert_eq!(events[0].cursor, 3);
300                assert_eq!(next_cursor, 5);
301            }
302            other => panic!("expected Ok, got {other:?}"),
303        }
304    }
305
306    #[test]
307    fn read_at_or_past_overflow_returns_expired() {
308        let mut ring = TopicRing::new("orders", 3, 0);
309        for i in 1..=5 {
310            ring.push(json!({"n": i}), ts());
311        }
312        // Cursor 1 was evicted; asking from 0 sees a gap.
313        assert!(matches!(
314            ring.read_since(0, 100),
315            ReadResult::CursorExpired { .. }
316        ));
317        assert!(matches!(
318            ring.read_since(1, 100),
319            ReadResult::CursorExpired { .. }
320        ));
321    }
322
323    #[test]
324    fn reset_epoch_propagates_to_read_result() {
325        let mut ring = TopicRing::new("orders", 2, 42);
326        for i in 1..=4 {
327            ring.push(json!({"n": i}), ts());
328        }
329        match ring.read_since(0, 100) {
330            ReadResult::CursorExpired { reset_epoch, .. } => {
331                assert_eq!(reset_epoch, 42);
332            }
333            other => panic!("expected CursorExpired, got {other:?}"),
334        }
335    }
336
337    #[test]
338    fn reset_epoch_propagates_to_pushed_events() {
339        let mut ring = TopicRing::new("orders", 4, 7);
340        ring.push(json!({"n": 1}), ts());
341        match ring.read_since(0, 10) {
342            ReadResult::Ok { events, .. } => {
343                assert_eq!(events[0].reset_epoch, 7);
344            }
345            other => panic!("expected Ok, got {other:?}"),
346        }
347    }
348
349    #[test]
350    fn empty_ring_read_since_zero_is_ok_empty() {
351        let ring = TopicRing::new("orders", 10, 0);
352        match ring.read_since(0, 100) {
353            ReadResult::Ok { events, next_cursor } => {
354                assert!(events.is_empty());
355                assert_eq!(next_cursor, 0);
356            }
357            other => panic!("expected Ok, got {other:?}"),
358        }
359    }
360
361    #[test]
362    fn topic_name_appears_on_pushed_events() {
363        let mut ring = TopicRing::new("marketdata:265598", 4, 0);
364        ring.push(json!({"31": "150.25"}), ts());
365        match ring.read_since(0, 10) {
366            ReadResult::Ok { events, .. } => {
367                assert_eq!(events[0].topic, "marketdata:265598");
368            }
369            other => panic!("expected Ok, got {other:?}"),
370        }
371    }
372}