1use std::collections::VecDeque;
17
18use serde_json::Value;
19
20use super::ObservedEvent;
21
22#[derive(Debug)]
26pub struct TopicRing {
27 inner: VecDeque<ObservedEvent>,
28 capacity: usize,
29 next_cursor: u64,
30 head_cursor: u64,
31 reset_epoch: u64,
32 topic: String,
33}
34
35#[derive(Debug, Clone)]
37pub enum ReadResult {
38 Ok {
41 events: Vec<ObservedEvent>,
44 next_cursor: u64,
46 },
47 CursorExpired {
50 head_cursor: u64,
53 reset_epoch: u64,
55 },
56}
57
58impl TopicRing {
59 #[must_use]
62 pub fn new(topic: impl Into<String>, capacity: usize, reset_epoch: u64) -> Self {
63 Self {
64 inner: VecDeque::with_capacity(capacity.max(1)),
65 capacity: capacity.max(1),
66 next_cursor: 1,
67 head_cursor: 1,
68 reset_epoch,
69 topic: topic.into(),
70 }
71 }
72
73 #[must_use]
75 pub fn topic(&self) -> &str {
76 &self.topic
77 }
78
79 #[must_use]
81 pub fn reset_epoch(&self) -> u64 {
82 self.reset_epoch
83 }
84
85 #[must_use]
87 pub fn len(&self) -> usize {
88 self.inner.len()
89 }
90
91 #[must_use]
93 pub fn is_empty(&self) -> bool {
94 self.inner.is_empty()
95 }
96
97 #[must_use]
99 pub fn head_cursor(&self) -> u64 {
100 self.head_cursor
101 }
102
103 #[must_use]
105 pub fn next_cursor(&self) -> u64 {
106 self.next_cursor
107 }
108
109 pub fn push(&mut self, payload: Value, received_at: String) -> u64 {
113 let cursor = self.next_cursor;
114 self.next_cursor = self.next_cursor.saturating_add(1);
115
116 let event = ObservedEvent {
117 cursor,
118 topic: self.topic.clone(),
119 received_at,
120 reset_epoch: self.reset_epoch,
121 payload,
122 };
123
124 if self.inner.len() == self.capacity {
125 let evicted = self.inner.pop_front();
128 if let Some(_) = evicted {
129 self.head_cursor = self
132 .inner
133 .front()
134 .map(|e| e.cursor)
135 .unwrap_or(cursor);
136 }
137 }
138
139 self.inner.push_back(event);
140 if self.inner.len() == 1 {
142 self.head_cursor = cursor;
143 }
144
145 cursor
146 }
147
148 #[must_use]
155 pub fn read_since(&self, since: u64, limit: usize) -> ReadResult {
156 if since >= self.next_cursor.saturating_sub(1) {
158 return ReadResult::Ok {
159 events: Vec::new(),
160 next_cursor: self.next_cursor.saturating_sub(1),
161 };
162 }
163
164 if since + 1 < self.head_cursor {
167 return ReadResult::CursorExpired {
168 head_cursor: self.head_cursor,
169 reset_epoch: self.reset_epoch,
170 };
171 }
172
173 let events: Vec<ObservedEvent> = self
174 .inner
175 .iter()
176 .filter(|e| e.cursor > since)
177 .take(limit.max(1))
178 .cloned()
179 .collect();
180
181 let next_cursor = events
182 .last()
183 .map(|e| e.cursor)
184 .unwrap_or_else(|| self.next_cursor.saturating_sub(1));
185
186 ReadResult::Ok { events, next_cursor }
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use serde_json::json;
194
195 fn ts() -> String {
196 "2026-05-06T00:00:00Z".to_string()
197 }
198
199 #[test]
200 fn push_assigns_monotonic_cursors() {
201 let mut ring = TopicRing::new("orders", 10, 0);
202 let c1 = ring.push(json!({"n": 1}), ts());
203 let c2 = ring.push(json!({"n": 2}), ts());
204 let c3 = ring.push(json!({"n": 3}), ts());
205 assert_eq!(c1, 1);
206 assert_eq!(c2, 2);
207 assert_eq!(c3, 3);
208 }
209
210 #[test]
211 fn read_since_zero_returns_all_buffered() {
212 let mut ring = TopicRing::new("orders", 10, 0);
213 ring.push(json!({"n": 1}), ts());
214 ring.push(json!({"n": 2}), ts());
215 match ring.read_since(0, 100) {
216 ReadResult::Ok { events, next_cursor } => {
217 assert_eq!(events.len(), 2);
218 assert_eq!(next_cursor, 2);
219 assert_eq!(events[0].payload, json!({"n": 1}));
220 assert_eq!(events[1].payload, json!({"n": 2}));
221 }
222 other => panic!("expected Ok, got {other:?}"),
223 }
224 }
225
226 #[test]
227 fn read_since_returns_only_new_events() {
228 let mut ring = TopicRing::new("orders", 10, 0);
229 ring.push(json!({"n": 1}), ts());
230 ring.push(json!({"n": 2}), ts());
231 ring.push(json!({"n": 3}), ts());
232 match ring.read_since(1, 100) {
233 ReadResult::Ok { events, next_cursor } => {
234 assert_eq!(events.len(), 2);
235 assert_eq!(events[0].cursor, 2);
236 assert_eq!(events[1].cursor, 3);
237 assert_eq!(next_cursor, 3);
238 }
239 other => panic!("expected Ok, got {other:?}"),
240 }
241 }
242
243 #[test]
244 fn read_since_caught_up_returns_empty() {
245 let mut ring = TopicRing::new("orders", 10, 0);
246 ring.push(json!({"n": 1}), ts());
247 ring.push(json!({"n": 2}), ts());
248 match ring.read_since(2, 100) {
249 ReadResult::Ok { events, next_cursor } => {
250 assert!(events.is_empty());
251 assert_eq!(next_cursor, 2);
252 }
253 other => panic!("expected empty Ok, got {other:?}"),
254 }
255 }
256
257 #[test]
258 fn read_since_respects_limit() {
259 let mut ring = TopicRing::new("orders", 100, 0);
260 for i in 0..50 {
261 ring.push(json!({ "n": i }), ts());
262 }
263 match ring.read_since(0, 10) {
264 ReadResult::Ok { events, next_cursor } => {
265 assert_eq!(events.len(), 10);
266 assert_eq!(next_cursor, 10);
267 }
268 other => panic!("expected Ok, got {other:?}"),
269 }
270 }
271
272 #[test]
273 fn capacity_evicts_oldest_first() {
274 let mut ring = TopicRing::new("orders", 3, 0);
275 for i in 1..=5 {
276 ring.push(json!({"n": i}), ts());
277 }
278 assert_eq!(ring.len(), 3);
280 assert_eq!(ring.head_cursor(), 3);
281 match ring.read_since(0, 100) {
282 ReadResult::CursorExpired { head_cursor, .. } => {
283 assert_eq!(head_cursor, 3);
284 }
285 other => panic!("expected CursorExpired, got {other:?}"),
286 }
287 }
288
289 #[test]
290 fn read_after_overflow_at_head_succeeds() {
291 let mut ring = TopicRing::new("orders", 3, 0);
292 for i in 1..=5 {
293 ring.push(json!({"n": i}), ts());
294 }
295 match ring.read_since(2, 100) {
297 ReadResult::Ok { events, next_cursor } => {
298 assert_eq!(events.len(), 3);
299 assert_eq!(events[0].cursor, 3);
300 assert_eq!(next_cursor, 5);
301 }
302 other => panic!("expected Ok, got {other:?}"),
303 }
304 }
305
306 #[test]
307 fn read_at_or_past_overflow_returns_expired() {
308 let mut ring = TopicRing::new("orders", 3, 0);
309 for i in 1..=5 {
310 ring.push(json!({"n": i}), ts());
311 }
312 assert!(matches!(
314 ring.read_since(0, 100),
315 ReadResult::CursorExpired { .. }
316 ));
317 assert!(matches!(
318 ring.read_since(1, 100),
319 ReadResult::CursorExpired { .. }
320 ));
321 }
322
323 #[test]
324 fn reset_epoch_propagates_to_read_result() {
325 let mut ring = TopicRing::new("orders", 2, 42);
326 for i in 1..=4 {
327 ring.push(json!({"n": i}), ts());
328 }
329 match ring.read_since(0, 100) {
330 ReadResult::CursorExpired { reset_epoch, .. } => {
331 assert_eq!(reset_epoch, 42);
332 }
333 other => panic!("expected CursorExpired, got {other:?}"),
334 }
335 }
336
337 #[test]
338 fn reset_epoch_propagates_to_pushed_events() {
339 let mut ring = TopicRing::new("orders", 4, 7);
340 ring.push(json!({"n": 1}), ts());
341 match ring.read_since(0, 10) {
342 ReadResult::Ok { events, .. } => {
343 assert_eq!(events[0].reset_epoch, 7);
344 }
345 other => panic!("expected Ok, got {other:?}"),
346 }
347 }
348
349 #[test]
350 fn empty_ring_read_since_zero_is_ok_empty() {
351 let ring = TopicRing::new("orders", 10, 0);
352 match ring.read_since(0, 100) {
353 ReadResult::Ok { events, next_cursor } => {
354 assert!(events.is_empty());
355 assert_eq!(next_cursor, 0);
356 }
357 other => panic!("expected Ok, got {other:?}"),
358 }
359 }
360
361 #[test]
362 fn topic_name_appears_on_pushed_events() {
363 let mut ring = TopicRing::new("marketdata:265598", 4, 0);
364 ring.push(json!({"31": "150.25"}), ts());
365 match ring.read_since(0, 10) {
366 ReadResult::Ok { events, .. } => {
367 assert_eq!(events[0].topic, "marketdata:265598");
368 }
369 other => panic!("expected Ok, got {other:?}"),
370 }
371 }
372}