Skip to main content

bezant_server/
routes.rs

1//! HTTP route handlers.
2//!
3//! The server mostly acts as an untyped pass-through: it takes the CPAPI
4//! path, hits the Gateway via the `bezant::Client`'s inner reqwest client,
5//! and forwards the JSON body back to the caller. This lets downstream
6//! apps in any language consume CPAPI over plain HTTP without touching
7//! the typed Rust layer.
8//!
9//! A handful of handlers (like `/health`) use the typed facade because
10//! they project the raw response into a narrower shape.
11
12use axum::body::Body;
13use axum::extract::{Path, Query, State};
14use axum::http::{header, HeaderMap, HeaderValue, Response, StatusCode};
15use axum::response::IntoResponse;
16use axum::routing::{delete, get};
17use axum::{Json, Router};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20
21use crate::error::AppError;
22use crate::state::AppState;
23
24/// Build the full axum router wired to `state`. Exposed for integration
25/// tests that want to drive the router without opening a TCP listener.
26pub fn router(state: AppState) -> Router {
27    Router::new()
28        .route("/health", get(health))
29        .route("/debug/jar", get(debug_jar))
30        .route("/debug/probe", get(debug_probe))
31        .route("/accounts", get(accounts))
32        .route("/accounts/{account_id}/summary", get(account_summary))
33        .route("/accounts/{account_id}/positions", get(account_positions))
34        .route("/accounts/{account_id}/ledger", get(account_ledger))
35        .route(
36            "/accounts/{account_id}/orders",
37            get(account_orders).post(submit_order),
38        )
39        .route(
40            "/accounts/{account_id}/orders/{order_id}",
41            delete(cancel_order),
42        )
43        .route("/contracts/search", get(contract_search))
44        .route("/market/snapshot", get(market_snapshot))
45        .route("/events/orders", get(events_orders))
46        .route("/events/pnl", get(events_pnl))
47        .route("/events/marketdata", get(events_marketdata))
48        .route("/events/gap", get(events_gap))
49        .route("/events/_status", get(events_status))
50        .route("/events/{topic}/history", get(events_history))
51        // Anything we haven't explicitly wrapped falls through to the
52        // Gateway verbatim. The big reason this matters: the CPGateway's
53        // interactive login flow (`/sso/Login`, static JS/CSS/img assets,
54        // `/v1/api/iserver/auth/ssodh/init`, …) has to run through
55        // *this* HTTP client so its cookie jar captures the session. Any
56        // future endpoints we add to bezant-server just take precedence
57        // over this catch-all via specific routes.
58        .fallback(passthrough_any)
59        .with_state(state)
60}
61
62/// Headers we MUST NOT forward through a proxy hop. Subset of RFC 7230 §6.1
63/// (`host`, `content-length`, `connection`, `keep-alive`, `proxy-*`, `te`,
64/// `trailer`, `transfer-encoding`, `upgrade`) extended with:
65///
66/// * **`cookie`** — reqwest's shared jar is the single source of truth.
67/// * **`authorization`** — CPGateway doesn't consume bearer/basic auth;
68///   forwarding lets a caller probe whatever auth scheme the upstream
69///   might (incorrectly) honour. Pure attack surface, drop it.
70/// * **`x-forwarded-*` / `forwarded` / `x-real-ip`** — caller-controlled
71///   client-IP claims. Forwarding lets a caller spoof their apparent
72///   source IP to anything that logs/rate-limits/audits on those headers
73///   downstream. Strip at the boundary; the proxy itself doesn't need
74///   them.
75const HOP_BY_HOP: &[&str] = &[
76    "host",
77    "content-length",
78    "connection",
79    "keep-alive",
80    "proxy-authenticate",
81    "proxy-authorization",
82    "te",
83    "trailer",
84    "transfer-encoding",
85    "upgrade",
86    "cookie",
87    "authorization",
88    "x-forwarded-for",
89    "x-forwarded-host",
90    "x-forwarded-proto",
91    "x-real-ip",
92    "forwarded",
93];
94
95fn is_hop_by_hop(name: &str) -> bool {
96    HOP_BY_HOP.iter().any(|h| name.eq_ignore_ascii_case(h))
97}
98
99// `skip_all` because `req` and `state` aren't `Display`; we emit
100// only the safe scalars (method + path) as span fields. Path is
101// already query-stripped — see the `path_only` derivation below.
102#[tracing::instrument(skip_all, fields(method, path))]
103async fn passthrough_any(
104    State(state): State<AppState>,
105    req: axum::extract::Request,
106) -> Result<Response<Body>, AppError> {
107    use axum::http::Method;
108    let method = req.method().clone();
109    let path_and_query = req
110        .uri()
111        .path_and_query()
112        .map(|pq| pq.as_str())
113        .unwrap_or("/")
114        .to_string();
115    // For logs we keep just the path — query strings frequently carry SSO
116    // tokens / session ids that we don't want fanned out into log shippers.
117    let path_only = req.uri().path().to_string();
118    // Record into the parent span so the rest of the handler's
119    // tracing events inherit them automatically.
120    tracing::Span::current().record("method", tracing::field::display(&method));
121    tracing::Span::current().record("path", tracing::field::display(&path_only));
122
123    // Compose the target URL: Gateway root + the incoming URI. We use
124    // the client's own derived root (scheme + host + trailing '/')
125    // instead of hand-trimming the base URL so a non-standard prefix
126    // doesn't silently break passthrough.
127    let gateway_root = state.client().gateway_root_url().as_str();
128    let target = format!("{}{}", gateway_root.trim_end_matches('/'), path_and_query);
129    let target_url: reqwest::Url = target
130        .parse()
131        .map_err(|e| bezant::Error::BadRequest(format!("target url: {e}")))?;
132
133    let headers = req.headers().clone();
134
135    // Replay any cookies the browser sent into the shared jar so typed
136    // API calls (`/health`, `/accounts`, …) see the same session that
137    // the interactive login established.
138    //
139    // The jar (`bezant::NameKeyedJar`) keys cookies purely by name, so
140    // inserting `JSESSIONID=NEW` always replaces `JSESSIONID=OLD` —
141    // duplicates can't accumulate even if the Gateway sets the same
142    // cookie at different paths in different responses. CPGateway
143    // rejects requests that arrive with two values for the same cookie
144    // name, so this single-source-of-truth model is required.
145    //
146    // **Trust model:** bezant-server is single-tenant. The shared jar
147    // is intentionally visible to *all* server-side typed callers.
148    // Don't deploy this proxy multi-tenant.
149    let jar = state.client().cookie_jar();
150    let mut pairs: Vec<&str> = Vec::new();
151    for cookie_header in headers.get_all(axum::http::header::COOKIE) {
152        if let Ok(raw) = cookie_header.to_str() {
153            for pair in raw.split(';') {
154                let trimmed = pair.trim();
155                if trimmed.is_empty() {
156                    continue;
157                }
158                // Drop edge-proxy auth cookies (Cloudflare Access /
159                // Cloudflare-style infrastructure cookies). They're set
160                // by the proxy layer for *its* session — not anything
161                // CPGateway / api.ibkr.com expects, and Akamai 401s the
162                // upstream call when an unrecognised `CF_Authorization=…`
163                // cookie shows up alongside the IBKR session cookies.
164                let name = trimmed.split('=').next().unwrap_or("").trim();
165                if is_edge_auth_cookie(name) {
166                    continue;
167                }
168                pairs.push(trimmed);
169            }
170        }
171    }
172    let injected = pairs.len();
173    if injected > 0 {
174        jar.set_pairs(&pairs);
175    }
176    // The Railway-deploy diagnostic phase is over (the residential-Pi
177    // pattern is settled). Cookie replay is a per-request event and
178    // belongs at debug level — no need to fan it out to log shippers.
179    tracing::debug!(
180        path = %path_only,
181        cookies = injected,
182        "passthrough cookie replay"
183    );
184
185    let body_bytes = axum::body::to_bytes(req.into_body(), 10 * 1024 * 1024)
186        .await
187        .map_err(|e| bezant::Error::BadRequest(format!("read body: {e}")))?;
188
189    let method_reqwest = reqwest::Method::from_bytes(method.as_str().as_bytes())
190        .map_err(|e| bezant::Error::BadRequest(format!("method: {e}")))?;
191    // Origin/Referer policy is path-conditional:
192    //   * `/sso/*` (the interactive login flow) keeps the browser's
193    //     `Origin` verbatim — IBKR's 2FA polling validates it as part
194    //     of its session check, and rewriting it silently breaks the
195    //     `/sso/Authenticator` poll.
196    //   * `/v1/api/*` (CPAPI calls — the post-login surface) rewrites
197    //     `Origin` to the Gateway's own host so its CPAPI CSRF guard
198    //     accepts the call. Without this, post-login `/v1/api/*`
199    //     returns 401 when the proxy is on a different public host
200    //     than the Gateway thinks it's running on (Railway, fly.io,
201    //     ngrok, …).
202    let rewrite_origin = path_only.starts_with("/v1/api/") || path_only == "/v1/api";
203    let gateway_origin = if rewrite_origin {
204        let scheme = target_url.scheme();
205        target_url.host_str().map(|h| match target_url.port() {
206            Some(p) => format!("{scheme}://{h}:{p}"),
207            None => format!("{scheme}://{h}"),
208        })
209    } else {
210        None
211    };
212    let mut builder = state.client().http().request(method_reqwest, &target);
213    for (name, value) in headers.iter() {
214        // Drop hop-by-hop headers per RFC 7230 §6.1 plus `host`/`cookie`
215        // (reqwest rebuilds the former, the shared jar replaces the
216        // latter).
217        if is_hop_by_hop(name.as_str()) {
218            continue;
219        }
220        let lower = name.as_str().to_ascii_lowercase();
221        if let Some(ref origin) = gateway_origin {
222            if lower == "origin" {
223                if let Ok(v) = reqwest::header::HeaderValue::from_str(origin) {
224                    builder = builder.header(reqwest::header::ORIGIN, v);
225                }
226                continue;
227            }
228            if lower == "referer" {
229                // Replace the origin prefix of the Referer URL but keep
230                // the path/query — the upstream uses the path to drive
231                // post-login redirects, so we don't want to lose it.
232                if let Ok(orig) = value.to_str() {
233                    let rewritten = rewrite_referer_origin(orig, origin);
234                    if let Ok(v) = reqwest::header::HeaderValue::from_str(&rewritten) {
235                        builder = builder.header(reqwest::header::REFERER, v);
236                    }
237                }
238                continue;
239            }
240        }
241        if let Ok(v) = reqwest::header::HeaderValue::from_bytes(value.as_bytes()) {
242            if let Ok(name) = reqwest::header::HeaderName::from_bytes(name.as_str().as_bytes()) {
243                builder = builder.header(name, v);
244            }
245        }
246    }
247    if method != Method::GET && method != Method::HEAD {
248        // Pin Content-Length even for empty bodies — Akamai (fronting
249        // the CPAPI) returns 411 if the POST arrives with neither
250        // Content-Length nor Transfer-Encoding, which is the wire
251        // shape reqwest/hyper can produce for an empty Vec.
252        let len = body_bytes.len();
253        builder = builder
254            .header(reqwest::header::CONTENT_LENGTH, len.to_string())
255            .body(body_bytes.to_vec());
256    }
257
258    let resp = builder.send().await.map_err(bezant::Error::Http)?;
259    forward(resp).await
260}
261
262#[derive(Serialize)]
263struct HealthBody {
264    authenticated: bool,
265    connected: bool,
266    competing: bool,
267    message: Option<String>,
268}
269
270/// Diagnostic-only endpoint: lists shared cookie jar entries by name
271/// (and value length — never value itself). Useful when chasing
272/// "browser is logged in but typed /health says not_authenticated"
273/// issues without leaking the live IBKR session cookie to anyone who
274/// can hit the bind address.
275///
276/// Gated on `BEZANT_DEBUG_TOKEN`: returns 404 when no token is
277/// configured, 401 when the caller's token doesn't match. Callers
278/// authenticate via `?token=…` or the `X-Bezant-Debug-Token` header.
279async fn debug_jar(
280    State(state): State<AppState>,
281    headers: HeaderMap,
282    Query(q): Query<HashMap<String, String>>,
283) -> Response<Body> {
284    if let Err(resp) = debug_auth(&state, &headers, &q) {
285        return resp;
286    }
287    let jar = state.client().cookie_jar();
288    let entries: Vec<serde_json::Value> = jar
289        .snapshot()
290        .into_iter()
291        .map(|(name, value)| {
292            serde_json::json!({
293                "name": name,
294                "value_length": value.len(),
295            })
296        })
297        .collect();
298    let body = serde_json::json!({
299        "gateway_root": state.client().gateway_root_url().as_str(),
300        "size": entries.len(),
301        "entries": entries,
302    });
303    Json(body).into_response()
304}
305
306/// Token check shared by every `/debug/*` handler.
307///
308/// Returns a 404 response when debug is disabled (no token
309/// configured) so the existence of the endpoints isn't disclosed to
310/// a probing attacker — they look identical to any other unmapped
311/// route.
312///
313/// Returns a 401 response when a token IS configured but the caller
314/// didn't present a matching one (different status because the
315/// endpoint clearly exists; the caller is just unauthorised).
316///
317/// Token comparison is constant-time to avoid leaking length or
318/// prefix-match info via response timing.
319#[allow(clippy::result_large_err)]
320// `Response<Body>` is the return type axum hands back; boxing it would
321// just add an alloc on the (rare) auth-failure path. Suppress the lint.
322fn debug_auth(
323    state: &AppState,
324    headers: &HeaderMap,
325    query: &HashMap<String, String>,
326) -> Result<(), Response<Body>> {
327    let Some(expected) = state.debug_token() else {
328        return Err(Response::builder()
329            .status(StatusCode::NOT_FOUND)
330            .body(Body::empty())
331            .unwrap_or_default());
332    };
333    let presented = headers
334        .get("x-bezant-debug-token")
335        .and_then(|v| v.to_str().ok())
336        .or_else(|| query.get("token").map(String::as_str))
337        .unwrap_or("");
338    if constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
339        return Ok(());
340    }
341    Err(Response::builder()
342        .status(StatusCode::UNAUTHORIZED)
343        .header(header::CONTENT_TYPE, "application/json")
344        .body(Body::from(
345            r#"{"code":"debug_unauthorized","message":"missing or invalid debug token"}"#,
346        ))
347        .unwrap_or_default())
348}
349
350/// Constant-time byte comparison so token mismatch can't be timed.
351/// Naive `==` short-circuits on first differing byte, leaking length
352/// + prefix-match info via response timing.
353fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
354    if a.len() != b.len() {
355        return false;
356    }
357    let mut diff = 0u8;
358    for (x, y) in a.iter().zip(b.iter()) {
359        diff |= x ^ y;
360    }
361    diff == 0
362}
363
364/// Diagnostic-only endpoint: walks the post-login CPAPI sequence
365/// (`auth/status` → `ssodh/init` → `tickle` → `portfolio/accounts`)
366/// against the Gateway and reports each step's status, latency, body
367/// preview, and Set-Cookie names — *plus* a top-level `verdict` that
368/// pins down which step diverges from the happy path.
369///
370/// Built to discriminate between proxy-layer regressions and upstream
371/// failures (e.g. CPGateway's internal SSODH bridge to `api.ibkr.com`
372/// being rejected from a datacenter egress IP). The probe never aborts
373/// on a step failure — it runs all four so the full picture is in
374/// the response body.
375///
376/// Response shape:
377/// ```json
378/// {
379///   "gateway_root": "https://localhost:5000/",
380///   "elapsed_ms": 412,
381///   "jar_size_before": 7,
382///   "jar_size_after": 7,
383///   "verdict": "ok",
384///   "steps": [{ "name": "auth_status", "status": 200, ... }, ...]
385/// }
386/// ```
387///
388/// Returns 200 with the diagnostic body when authenticated for debug
389/// (diagnostic-step failures surface in the body, not the HTTP status).
390/// Returns 404-style error when debug isn't enabled, 401 when the
391/// caller's debug token is missing/wrong.
392async fn debug_probe(
393    State(state): State<AppState>,
394    headers: HeaderMap,
395    Query(q): Query<HashMap<String, String>>,
396) -> Response<Body> {
397    if let Err(resp) = debug_auth(&state, &headers, &q) {
398        return resp;
399    }
400    let client = state.client();
401    let started = std::time::Instant::now();
402    let jar_before = client.cookie_jar().snapshot().len();
403
404    let auth_status = probe_step(
405        client,
406        "auth_status",
407        reqwest::Method::POST,
408        &["iserver", "auth", "status"],
409        None,
410    )
411    .await;
412    // `ssodh/init` is the bridge-establish call. Issuing it against a
413    // session that's *already* bridged tears the session down — every
414    // subsequent call then 401s. We discovered this the hard way: probe
415    // would show `auth_status:200, ssodh_init:401, accounts:401` and
416    // make a working session look broken. So skip the bridge step when
417    // auth_status already reports `authenticated:true`; the diagnostic
418    // is meant to *observe*, not perturb.
419    let already_bridged = is_authenticated(&auth_status);
420    let ssodh_init = if already_bridged {
421        skipped_step(
422            "ssodh_init",
423            "POST",
424            "/v1/api/iserver/auth/ssodh/init",
425            "session already bridged (auth_status authenticated)",
426        )
427    } else {
428        probe_step(
429            client,
430            "ssodh_init",
431            reqwest::Method::POST,
432            &["iserver", "auth", "ssodh", "init"],
433            Some(serde_json::json!({ "publish": true, "compete": true })),
434        )
435        .await
436    };
437    let tickle = probe_step(client, "tickle", reqwest::Method::POST, &["tickle"], None).await;
438    let accounts = probe_step(
439        client,
440        "accounts",
441        reqwest::Method::GET,
442        &["portfolio", "accounts"],
443        None,
444    )
445    .await;
446
447    let verdict = compute_verdict(&auth_status, &ssodh_init, &tickle, &accounts);
448    let jar_after = client.cookie_jar().snapshot().len();
449
450    let body = serde_json::json!({
451        "gateway_root": client.gateway_root_url().as_str(),
452        "elapsed_ms": started.elapsed().as_millis() as u64,
453        "jar_size_before": jar_before,
454        "jar_size_after": jar_after,
455        "verdict": verdict,
456        "steps": [auth_status, ssodh_init, tickle, accounts],
457    });
458    Json(body).into_response()
459}
460
461/// One step in the diagnostic probe.
462///
463/// Builds the request the same way the real proxy / typed client does:
464/// pins `Content-Length` (Akamai 411 workaround), rewrites `Origin` and
465/// `Referer` to the Gateway's own origin (CPAPI CSRF guard), and
466/// captures the response without touching the shared cookie jar's
467/// happy-path semantics — every Set-Cookie still flows back into the
468/// jar via reqwest's cookie provider.
469async fn probe_step(
470    client: &bezant::Client,
471    name: &'static str,
472    method: reqwest::Method,
473    path_segments: &[&str],
474    body: Option<serde_json::Value>,
475) -> serde_json::Value {
476    let mut url = client.base_url().clone();
477    if let Ok(mut segs) = url.path_segments_mut() {
478        for seg in path_segments {
479            segs.push(seg);
480        }
481    }
482    let path_for_log = url.path().to_owned();
483
484    let gateway_origin = client
485        .gateway_root_url()
486        .as_str()
487        .trim_end_matches('/')
488        .to_owned();
489
490    let mut builder = client
491        .http()
492        .request(method.clone(), url.clone())
493        .header(reqwest::header::ORIGIN, &gateway_origin)
494        .header(reqwest::header::REFERER, format!("{gateway_origin}/"));
495
496    let body_bytes: Vec<u8> = match (&method, body) {
497        (m, _) if m == reqwest::Method::GET || m == reqwest::Method::HEAD => Vec::new(),
498        (_, Some(json)) => serde_json::to_vec(&json).unwrap_or_default(),
499        (_, None) => Vec::new(),
500    };
501    if method != reqwest::Method::GET && method != reqwest::Method::HEAD {
502        builder = builder
503            .header(
504                reqwest::header::CONTENT_LENGTH,
505                body_bytes.len().to_string(),
506            )
507            .body(body_bytes.clone());
508        if !body_bytes.is_empty() {
509            builder = builder.header(reqwest::header::CONTENT_TYPE, "application/json");
510        }
511    }
512
513    let started = std::time::Instant::now();
514    // Per-step deadline: a hung Gateway shouldn't take the whole
515    // probe with it. 5s is generous for the small JSON payloads we
516    // hit; tickle/auth_status/accounts all return in <500 ms in
517    // healthy operation.
518    let result = match tokio::time::timeout(std::time::Duration::from_secs(5), builder.send()).await
519    {
520        Ok(send_result) => send_result.map_err(|e| e.to_string()),
521        Err(_) => Err("step timed out after 5s".to_string()),
522    };
523    let latency_ms = started.elapsed().as_millis() as u64;
524
525    match result {
526        Ok(resp) => {
527            let status = resp.status().as_u16();
528            let set_cookie_names: Vec<String> = resp
529                .headers()
530                .get_all(reqwest::header::SET_COOKIE)
531                .iter()
532                .filter_map(|v| v.to_str().ok())
533                .map(|raw| {
534                    raw.split(';')
535                        .next()
536                        .and_then(|s| s.split('=').next())
537                        .map(|s| s.trim().to_owned())
538                        .unwrap_or_default()
539                })
540                .filter(|s| !s.is_empty())
541                .collect();
542            // Cap the upstream read at 1 MiB so a misbehaving Gateway
543            // can't OOM the probe with an unbounded body. Probe targets
544            // are JSON status payloads — any response over 1 MiB is
545            // already broken, the cap just bounds the damage.
546            let bytes = read_capped(resp, 1024 * 1024).await.unwrap_or_default();
547            // Parse the FULL body (not the truncated preview) for
548            // discriminator fields the verdict logic needs. Doing it
549            // here means `compute_verdict` can't be misled by a
550            // response whose `authenticated` field happens to land
551            // past the 512-byte preview window.
552            let parsed_authenticated = serde_json::from_slice::<serde_json::Value>(&bytes)
553                .ok()
554                .and_then(|v| v["authenticated"].as_bool());
555            // Redact known token-bearing keys from the preview before
556            // exposing it to the operator — `session`, `ssoConclusion`,
557            // and any field with `token` in the name commonly carry
558            // resumable session material that shouldn't ride out via
559            // a debug endpoint or log shipper.
560            let preview_len = bytes.len().min(512);
561            let raw_preview = String::from_utf8_lossy(&bytes[..preview_len]).into_owned();
562            let body_preview = redact_tokens(&raw_preview);
563            serde_json::json!({
564                "name": name,
565                "method": method.as_str(),
566                "path": path_for_log,
567                "status": status,
568                "latency_ms": latency_ms,
569                "body_bytes": bytes.len(),
570                "body_preview": body_preview,
571                "set_cookie_names": set_cookie_names,
572                "error": serde_json::Value::Null,
573                "_authenticated": parsed_authenticated,
574            })
575        }
576        Err(e) => serde_json::json!({
577            "name": name,
578            "method": method.as_str(),
579            "path": path_for_log,
580            "status": serde_json::Value::Null,
581            "latency_ms": latency_ms,
582            "body_bytes": 0,
583            "body_preview": "",
584            "set_cookie_names": [],
585            "error": e,
586        }),
587    }
588}
589
590/// Best-effort redaction of token-bearing JSON keys in a body preview.
591///
592/// Walks the JSON once, replaces values for keys named `session`,
593/// `ssoConclusion`, or anything containing `token` (case-insensitive)
594/// with `"<redacted>"`. Falls back to returning the input verbatim if
595/// the preview isn't valid JSON — this is a best-effort guard, not a
596/// security boundary.
597///
598/// Used by `/debug/probe` because preview output ships across HTTP
599/// (and into the operator's terminal / log shipper) — leaking a live
600/// session token there would let anyone with read access to the
601/// debug response resume the IBKR session.
602fn redact_tokens(preview: &str) -> String {
603    let Ok(mut value) = serde_json::from_str::<serde_json::Value>(preview) else {
604        // Not JSON — return verbatim. Most non-JSON CPAPI responses
605        // are status pages or HTML error bodies that don't carry
606        // tokens.
607        return preview.to_owned();
608    };
609    redact_in_place(&mut value);
610    value.to_string()
611}
612
613fn redact_in_place(value: &mut serde_json::Value) {
614    match value {
615        serde_json::Value::Object(map) => {
616            for (k, v) in map.iter_mut() {
617                let lower = k.to_ascii_lowercase();
618                if lower == "session"
619                    || lower == "ssoconclusion"
620                    || lower.contains("token")
621                    || lower.contains("secret")
622                {
623                    *v = serde_json::Value::String("<redacted>".to_owned());
624                } else {
625                    redact_in_place(v);
626                }
627            }
628        }
629        serde_json::Value::Array(arr) => {
630            for v in arr.iter_mut() {
631                redact_in_place(v);
632            }
633        }
634        _ => {}
635    }
636}
637
638/// Pin the failure to the first diverging step.
639///
640/// `auth_status` carries an extra parse: a 200 with `authenticated:false`
641/// is a "needs_login" situation, not a transport success. Subsequent
642/// steps are pure HTTP-status checks — any non-2xx pins the verdict to
643/// that step's name. A `skipped` step is treated as success (the probe
644/// chose not to run it because it would have been destructive).
645fn compute_verdict(
646    auth_status: &serde_json::Value,
647    ssodh_init: &serde_json::Value,
648    tickle: &serde_json::Value,
649    accounts: &serde_json::Value,
650) -> &'static str {
651    if !is_2xx(auth_status) {
652        return "auth_status_failed";
653    }
654    // ssodh_init is the canonical Railway-vs-Pi discriminator: when
655    // auth_status reports unauthenticated AND a manual bridge attempt
656    // also fails, that pins the failure to the SSODH leg upstream of
657    // bezant-server. Check this BEFORE the needs_login short-circuit
658    // so a real ssodh failure surfaces with its own verdict instead of
659    // collapsing into the generic needs_login bucket.
660    let ssodh_ran_and_failed = !is_skipped(ssodh_init) && !is_2xx(ssodh_init);
661    if ssodh_ran_and_failed {
662        return "ssodh_failed";
663    }
664    if !is_authenticated(auth_status) {
665        return "needs_login";
666    }
667    if !is_2xx_or_skipped(tickle) {
668        return "tickle_failed";
669    }
670    if !is_2xx_or_skipped(accounts) {
671        return "accounts_failed";
672    }
673    "ok"
674}
675
676fn is_skipped(step: &serde_json::Value) -> bool {
677    step["skipped"].as_bool().unwrap_or(false)
678}
679
680fn is_2xx(step: &serde_json::Value) -> bool {
681    matches!(step["status"].as_u64(), Some(200..=299))
682}
683
684fn is_2xx_or_skipped(step: &serde_json::Value) -> bool {
685    is_2xx(step) || step["skipped"].as_bool().unwrap_or(false)
686}
687
688/// Decide whether the session is already bridged. Reads the
689/// `_authenticated` discriminator that `probe_step` extracted from the
690/// FULL response body (not the 512-byte preview), so an `authenticated`
691/// field that lands past the preview window doesn't silently flip the
692/// verdict to "needs_login" and trigger the destructive ssodh path.
693fn is_authenticated(step: &serde_json::Value) -> bool {
694    if !is_2xx(step) {
695        return false;
696    }
697    step["_authenticated"].as_bool().unwrap_or(false)
698}
699
700/// Build a placeholder step entry for a step the probe deliberately
701/// did not execute. Surfaces in the JSON body so a reader can tell
702/// "didn't fail, didn't run" apart from "ran and succeeded".
703fn skipped_step(
704    name: &'static str,
705    method: &'static str,
706    path: &'static str,
707    reason: &'static str,
708) -> serde_json::Value {
709    serde_json::json!({
710        "name": name,
711        "method": method,
712        "path": path,
713        "status": serde_json::Value::Null,
714        "latency_ms": 0,
715        "body_bytes": 0,
716        "body_preview": "",
717        "set_cookie_names": [],
718        "error": serde_json::Value::Null,
719        "skipped": true,
720        "skipped_reason": reason,
721    })
722}
723
724#[tracing::instrument(skip_all)]
725async fn health(State(state): State<AppState>) -> Result<Json<HealthBody>, AppError> {
726    let status = state.client().auth_status().await?;
727    Ok(Json(HealthBody {
728        authenticated: status.authenticated,
729        connected: status.connected,
730        competing: status.competing,
731        message: status.message,
732    }))
733}
734
735#[tracing::instrument(skip_all)]
736async fn accounts(State(state): State<AppState>) -> Result<Response<Body>, AppError> {
737    passthrough_get(&state, &["portfolio", "accounts"], &[]).await
738}
739
740#[tracing::instrument(skip(state), fields(account_id = %account_id))]
741async fn account_summary(
742    State(state): State<AppState>,
743    Path(account_id): Path<String>,
744) -> Result<Response<Body>, AppError> {
745    passthrough_get(&state, &["portfolio", account_id.as_str(), "summary"], &[]).await
746}
747
748#[derive(Deserialize, Debug)]
749struct PositionsQuery {
750    #[serde(default)]
751    page: u32,
752}
753
754#[tracing::instrument(skip(state), fields(account_id = %account_id, page = q.page))]
755async fn account_positions(
756    State(state): State<AppState>,
757    Path(account_id): Path<String>,
758    Query(q): Query<PositionsQuery>,
759) -> Result<Response<Body>, AppError> {
760    let page = q.page.to_string();
761    passthrough_get(
762        &state,
763        &["portfolio", account_id.as_str(), "positions", page.as_str()],
764        &[],
765    )
766    .await
767}
768
769#[tracing::instrument(skip(state), fields(account_id = %account_id))]
770async fn account_ledger(
771    State(state): State<AppState>,
772    Path(account_id): Path<String>,
773) -> Result<Response<Body>, AppError> {
774    passthrough_get(&state, &["portfolio", account_id.as_str(), "ledger"], &[]).await
775}
776
777/// List live + recently-filled orders for one account.
778#[tracing::instrument(skip(state), fields(account_id = %account_id))]
779async fn account_orders(
780    State(state): State<AppState>,
781    Path(account_id): Path<String>,
782) -> Result<Response<Body>, AppError> {
783    // CPAPI exposes this under /iserver/account/orders?accountId=…
784    passthrough_get(
785        &state,
786        &["iserver", "account", "orders"],
787        &[("accountId", account_id.as_str())],
788    )
789    .await
790}
791
792/// Submit one or more orders for an account.
793///
794/// The body is forwarded verbatim to `POST /iserver/account/{id}/orders`.
795/// CPAPI accepts either `{ "orders": [...] }` or a single order object; we
796/// stay out of the way and let IBKR's own validator surface errors.
797#[tracing::instrument(skip(state, body), fields(account_id = %account_id))]
798async fn submit_order(
799    State(state): State<AppState>,
800    Path(account_id): Path<String>,
801    axum::extract::Json(body): axum::extract::Json<serde_json::Value>,
802) -> Result<Response<Body>, AppError> {
803    let mut url = state.client().base_url().clone();
804    {
805        let mut segs = url
806            .path_segments_mut()
807            .map_err(|()| bezant::Error::UrlNotABase {
808                url: state.client().base_url().to_string(),
809            })?;
810        segs.push("iserver")
811            .push("account")
812            .push(account_id.as_str())
813            .push("orders");
814    }
815    let resp = state
816        .client()
817        .http()
818        .post(url)
819        .json(&body)
820        .send()
821        .await
822        .map_err(bezant::Error::Http)?;
823    forward(resp).await
824}
825
826/// Cancel a live order.
827#[tracing::instrument(skip(state), fields(account_id = %account_id, order_id = %order_id))]
828async fn cancel_order(
829    State(state): State<AppState>,
830    Path((account_id, order_id)): Path<(String, String)>,
831) -> Result<Response<Body>, AppError> {
832    let mut url = state.client().base_url().clone();
833    {
834        let mut segs = url
835            .path_segments_mut()
836            .map_err(|()| bezant::Error::UrlNotABase {
837                url: state.client().base_url().to_string(),
838            })?;
839        segs.push("iserver")
840            .push("account")
841            .push(account_id.as_str())
842            .push("order")
843            .push(order_id.as_str());
844    }
845    let resp = state
846        .client()
847        .http()
848        .delete(url)
849        .send()
850        .await
851        .map_err(bezant::Error::Http)?;
852    forward(resp).await
853}
854
855#[derive(Deserialize)]
856struct ContractSearchQuery {
857    symbol: String,
858    #[serde(default)]
859    name: bool,
860    #[serde(rename = "secType", default = "default_sec_type")]
861    sec_type: String,
862}
863
864fn default_sec_type() -> String {
865    "STK".into()
866}
867
868async fn contract_search(
869    State(state): State<AppState>,
870    Query(q): Query<ContractSearchQuery>,
871) -> Result<Response<Body>, AppError> {
872    // Symbol lookup is a POST with a JSON body on the CPAPI side.
873    let mut url = state.client().base_url().clone();
874    {
875        let mut segs = url
876            .path_segments_mut()
877            .map_err(|()| bezant::Error::UrlNotABase {
878                url: state.client().base_url().to_string(),
879            })?;
880        segs.push("iserver").push("secdef").push("search");
881    }
882    let body = serde_json::json!({
883        "symbol": q.symbol,
884        "name": q.name,
885        "secType": q.sec_type,
886    });
887    let resp = state
888        .client()
889        .http()
890        .post(url)
891        .json(&body)
892        .send()
893        .await
894        .map_err(bezant::Error::Http)?;
895    forward(resp).await
896}
897
898async fn market_snapshot(
899    State(state): State<AppState>,
900    Query(q): Query<HashMap<String, String>>,
901) -> Result<Response<Body>, AppError> {
902    let conids = q
903        .get("conids")
904        .ok_or(bezant::Error::MissingQuery { name: "conids" })?;
905    let fields = q
906        .get("fields")
907        .cloned()
908        .unwrap_or_else(|| "31,84,86,87".into());
909    passthrough_get(
910        &state,
911        &["iserver", "marketdata", "snapshot"],
912        &[("conids", conids), ("fields", &fields)],
913    )
914    .await
915}
916
917/// Shared pass-through helper: builds `<base>/a/b/c`, appends query params,
918/// forwards the Gateway response verbatim (status + content-type + body).
919async fn passthrough_get(
920    state: &AppState,
921    path_segments: &[&str],
922    query: &[(&str, &str)],
923) -> Result<Response<Body>, AppError> {
924    let mut url = state.client().base_url().clone();
925    {
926        let mut segs = url
927            .path_segments_mut()
928            .map_err(|()| bezant::Error::UrlNotABase {
929                url: state.client().base_url().to_string(),
930            })?;
931        for seg in path_segments {
932            segs.push(seg);
933        }
934    }
935    if !query.is_empty() {
936        let mut q = url.query_pairs_mut();
937        for (k, v) in query {
938            q.append_pair(k, v);
939        }
940    }
941    let resp = state
942        .client()
943        .http()
944        .get(url)
945        .send()
946        .await
947        .map_err(bezant::Error::Http)?;
948    forward(resp).await
949}
950
951/// Forward a `reqwest::Response` as an axum response.
952///
953/// What we do, header by header:
954/// * **Hop-by-hop** (`content-length`, `transfer-encoding`, `connection`,
955///   `keep-alive`, `te`, `trailer`, `upgrade`, `proxy-authenticate`,
956///   `proxy-authorization`) are dropped — RFC 7230 §6.1.
957/// * **`Set-Cookie`** has any `Domain=` attribute stripped. The Gateway's
958///   upstream (IBKR) sets `Domain=.ibkr.com`, which the browser silently
959///   discards when the response arrives from a different host. Falling
960///   back to a host-only cookie keeps the SSO flow working behind any
961///   hostname; the `Secure`, `HttpOnly`, `SameSite`, and `Path` flags
962///   are preserved.
963/// * **`Content-Type`** is rewritten / defaulted in two cases (skipped
964///   when the body is empty *or* the status is 1xx/204/304):
965///     - upstream sent `application/octet-stream` for what's actually a
966///       text/HTML response (CPGateway does this on `/sso/Dispatcher`),
967///     - upstream sent no Content-Type at all and our `Vec<u8>` body
968///       would default to `application/octet-stream` (which makes
969///       browsers offer to download instead of rendering).
970/// * **Body decode failures** are tolerated *only* on 1xx/204/304/3xx,
971///   where any body is required-or-conventionally empty. On 2xx/4xx/5xx
972///   the decode failure surfaces as a normal upstream-transport error so
973///   real data loss can't slip through silently.
974///
975/// `Origin` and `Referer` on the *request* side are deliberately
976/// forwarded verbatim — see `passthrough_any`.
977/// Maximum upstream response body the proxy will buffer. CPAPI
978/// payloads are JSON status / position / order objects — anything
979/// over a few hundred KB is already malformed. Cap at 25 MiB so a
980/// hostile or buggy upstream sending unbounded chunks can't OOM the
981/// proxy. (Inbound side has the matching 10 MiB limit via
982/// `RequestBodyLimitLayer`.)
983const MAX_UPSTREAM_BODY_BYTES: usize = 25 * 1024 * 1024;
984
985#[tracing::instrument(skip_all, fields(upstream_status = %resp.status()))]
986async fn forward(resp: reqwest::Response) -> Result<Response<Body>, AppError> {
987    let status = resp.status();
988    let headers_src = resp.headers().clone();
989    let body_must_be_empty =
990        matches!(status.as_u16(), 100..=199 | 204 | 304) || status.is_redirection();
991
992    let bytes: Vec<u8> = match read_capped(resp, MAX_UPSTREAM_BODY_BYTES).await {
993        Ok(b) => b,
994        Err(e) if body_must_be_empty => {
995            // Reqwest sometimes errors finalising chunked-encoded
996            // empty bodies on 3xx/204/304; the headers we care about
997            // (Location, Set-Cookie) are already in `headers_src`, so
998            // recover instead of 502-ing the redirect.
999            tracing::debug!(
1000                %status,
1001                error = %e,
1002                "forward: empty-body fallback on no-body status"
1003            );
1004            Vec::new()
1005        }
1006        Err(e) => {
1007            return Err(bezant::Error::UpstreamStatus {
1008                endpoint: "passthrough",
1009                status: status.as_u16(),
1010                body_preview: Some(e),
1011            }
1012            .into())
1013        }
1014    };
1015
1016    let body_is_empty = bytes.is_empty();
1017    let status = StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
1018
1019    let mut headers = HeaderMap::new();
1020    let mut had_content_type = false;
1021    for (name, value) in headers_src.iter() {
1022        let n = name.as_str().to_ascii_lowercase();
1023        if is_hop_by_hop_response(&n) {
1024            continue;
1025        }
1026        // Set-Cookie may legitimately appear multiple times.
1027        if n == "set-cookie" {
1028            let value_bytes: Vec<u8> = match value.to_str() {
1029                Ok(raw) => strip_cookie_domain(raw).into_bytes(),
1030                Err(_) => value.as_bytes().to_vec(),
1031            };
1032            if let (Ok(name), Ok(value)) = (
1033                header::HeaderName::from_bytes(name.as_str().as_bytes()),
1034                HeaderValue::from_bytes(&value_bytes),
1035            ) {
1036                headers.append(name, value);
1037            }
1038            continue;
1039        }
1040        // Content-Type: rewrite octet-stream → text/html only when there
1041        // *is* a body and the status normally has one. Use `insert` so
1042        // we never accidentally emit two Content-Type headers.
1043        if n == "content-type" {
1044            let raw = value.to_str().unwrap_or("");
1045            let rewrite = !body_is_empty
1046                && !body_must_be_empty
1047                && raw.eq_ignore_ascii_case("application/octet-stream");
1048            let bytes_to_emit: &[u8] = if rewrite {
1049                b"text/html; charset=UTF-8"
1050            } else {
1051                value.as_bytes()
1052            };
1053            if let (Ok(name), Ok(value)) = (
1054                header::HeaderName::from_bytes(name.as_str().as_bytes()),
1055                HeaderValue::from_bytes(bytes_to_emit),
1056            ) {
1057                headers.insert(name, value);
1058                had_content_type = true;
1059            }
1060            continue;
1061        }
1062        if let (Ok(name), Ok(value)) = (
1063            header::HeaderName::from_bytes(name.as_str().as_bytes()),
1064            HeaderValue::from_bytes(value.as_bytes()),
1065        ) {
1066            headers.append(name, value);
1067        }
1068    }
1069
1070    // Default Content-Type to text/html only when we actually have a
1071    // body to render. On 1xx/204/304/3xx we leave it absent to comply
1072    // with RFC 9110 §8.3.
1073    if !had_content_type && !body_is_empty && !body_must_be_empty {
1074        headers.insert(
1075            header::CONTENT_TYPE,
1076            HeaderValue::from_static("text/html; charset=UTF-8"),
1077        );
1078    }
1079
1080    // Build the response body directly instead of via
1081    // `(status, headers, bytes).into_response()`, because the latter
1082    // unconditionally inserts `Content-Type: application/octet-stream`
1083    // when none is set — which would defeat the explicit "no
1084    // Content-Type on 204" branch above.
1085    let mut response = Response::builder()
1086        .status(status)
1087        .body(Body::from(bytes))
1088        .map_err(|e| bezant::Error::ResponseBuild(e.to_string()))?;
1089    *response.headers_mut() = headers;
1090    Ok(response)
1091}
1092
1093/// Response-side hop-by-hop denylist.
1094fn is_hop_by_hop_response(name: &str) -> bool {
1095    matches!(
1096        name,
1097        "content-length"
1098            | "connection"
1099            | "keep-alive"
1100            | "proxy-authenticate"
1101            | "proxy-authorization"
1102            | "te"
1103            | "trailer"
1104            | "transfer-encoding"
1105            | "upgrade"
1106    )
1107}
1108
1109/// Replace the scheme + host[:port] prefix of `original` with
1110/// `new_origin`, preserving path + query. Used to rewrite the Referer
1111/// header on `/v1/api/*` requests when the proxy lives on a different
1112/// host than the Gateway.
1113fn rewrite_referer_origin(original: &str, new_origin: &str) -> String {
1114    match url::Url::parse(original) {
1115        Ok(u) => {
1116            let mut path_and_query = u.path().to_owned();
1117            if let Some(q) = u.query() {
1118                path_and_query.push('?');
1119                path_and_query.push_str(q);
1120            }
1121            format!("{}{}", new_origin.trim_end_matches('/'), path_and_query)
1122        }
1123        Err(_) => new_origin.to_owned(),
1124    }
1125}
1126
1127/// Recognise cookies set by an edge-proxy / Zero-Trust front so we
1128/// don't replay them into bezant-server's shared jar — they're for
1129/// the proxy's own session and confuse upstream CPAPI/Akamai.
1130///
1131/// Built-in matches cover the common managed Zero-Trust providers:
1132/// * **Cloudflare Access** — `CF_Authorization` (JWT), `CF_AppSession`
1133/// * **AWS ALB OIDC** — `AWSELBAuthSessionCookie-*` (split across
1134///   multiple cookies on long sessions)
1135/// * **OAuth2 Proxy** — `_oauth2_proxy*`
1136/// * **Vercel Authentication** — `_vercel_jwt`, `_vercel_sso_nonce`
1137/// * **Pomerium** — `_pomerium*`
1138///
1139/// For deployments behind a custom edge, set `BEZANT_EDGE_COOKIE_PREFIXES`
1140/// to a comma-separated list of additional prefixes — e.g.
1141/// `BEZANT_EDGE_COOKIE_PREFIXES=MyEdge_,_internal_` — and any cookie
1142/// whose name starts with one of those is dropped at the boundary.
1143fn is_edge_auth_cookie(name: &str) -> bool {
1144    const BUILTIN_PREFIXES: &[&str] = &[
1145        "CF_Authorization",
1146        "CF_AppSession",
1147        "AWSELBAuthSessionCookie",
1148        "_oauth2_proxy",
1149        "_vercel_jwt",
1150        "_vercel_sso_nonce",
1151        "_pomerium",
1152    ];
1153    if BUILTIN_PREFIXES
1154        .iter()
1155        .any(|p| name.eq_ignore_ascii_case(p) || name.starts_with(p))
1156    {
1157        return true;
1158    }
1159    // Lazy env lookup: keeps the helper cheap for the no-config case.
1160    if let Ok(extra) = std::env::var("BEZANT_EDGE_COOKIE_PREFIXES") {
1161        for prefix in extra.split(',').map(str::trim).filter(|p| !p.is_empty()) {
1162            if name.starts_with(prefix) {
1163                return true;
1164            }
1165        }
1166    }
1167    false
1168}
1169
1170/// Stream an upstream response body into a `Vec<u8>` while enforcing a
1171/// hard byte cap. Stops as soon as the cap is exceeded so a hostile
1172/// upstream sending unbounded chunks can't OOM the proxy. Returns the
1173/// fully buffered bytes on success, or an error if the cap is hit or
1174/// the upstream connection breaks. Used by both `forward()` (for
1175/// passthrough responses, large cap) and `probe_step` (smaller cap,
1176/// for diagnostic JSON).
1177async fn read_capped(resp: reqwest::Response, max: usize) -> std::result::Result<Vec<u8>, String> {
1178    use futures_util::StreamExt;
1179    let mut bytes = Vec::new();
1180    let mut stream = resp.bytes_stream();
1181    while let Some(chunk) = stream.next().await {
1182        let chunk = chunk.map_err(|e| format!("read chunk: {e}"))?;
1183        if bytes.len() + chunk.len() > max {
1184            return Err(format!(
1185                "upstream body exceeded {max} byte cap (>{}B)",
1186                bytes.len() + chunk.len()
1187            ));
1188        }
1189        bytes.extend_from_slice(&chunk);
1190    }
1191    Ok(bytes)
1192}
1193
1194/// Strip any `Domain=...` attribute from a Set-Cookie value. The browser
1195/// will fall back to a host-only cookie scoped to whatever host it saw
1196/// the response on, which is what we want when proxying a cookie that
1197/// the Gateway pinned to `Domain=.ibkr.com`.
1198fn strip_cookie_domain(value: &str) -> String {
1199    value
1200        .split(';')
1201        .filter(|part| !part.trim().to_ascii_lowercase().starts_with("domain="))
1202        .collect::<Vec<_>>()
1203        .join(";")
1204}
1205
1206// ===========================================================================
1207// /events/* handlers — cursor-based reads against the in-memory ring buffers
1208// maintained by the connector task. See `events::connector::EventsHandle`.
1209// ===========================================================================
1210
1211/// Query string for `/events/{topic}` reads.
1212#[derive(Debug, Deserialize)]
1213struct EventsQuery {
1214    /// Resume from this cursor (exclusive). `0` (or omitted) means "from
1215    /// the head of the buffer".
1216    #[serde(default)]
1217    since: u64,
1218    /// Maximum number of events to return. Defaults to `100`, capped at
1219    /// `1000` server-side to bound response size.
1220    #[serde(default = "EventsQuery::default_limit")]
1221    limit: usize,
1222}
1223
1224impl EventsQuery {
1225    fn default_limit() -> usize {
1226        100
1227    }
1228}
1229
1230/// 412 body when the caller's cursor falls past the ring buffer's head.
1231#[derive(Debug, Serialize)]
1232struct CursorExpiredBody {
1233    code: &'static str,
1234    head_cursor: u64,
1235    reset_epoch: u64,
1236    message: &'static str,
1237}
1238
1239async fn events_orders(
1240    State(state): State<AppState>,
1241    Query(q): Query<EventsQuery>,
1242) -> Response<Body> {
1243    read_events_topic(&state, "orders", q).await
1244}
1245
1246async fn events_pnl(
1247    State(state): State<AppState>,
1248    Query(q): Query<EventsQuery>,
1249) -> Response<Body> {
1250    read_events_topic(&state, "pnl", q).await
1251}
1252
1253async fn events_gap(
1254    State(state): State<AppState>,
1255    Query(q): Query<EventsQuery>,
1256) -> Response<Body> {
1257    read_events_topic(&state, "gap", q).await
1258}
1259
1260#[derive(Debug, Deserialize)]
1261struct MarketDataEventsQuery {
1262    conid: i64,
1263    #[serde(default)]
1264    since: u64,
1265    #[serde(default = "EventsQuery::default_limit")]
1266    limit: usize,
1267}
1268
1269async fn events_marketdata(
1270    State(state): State<AppState>,
1271    Query(q): Query<MarketDataEventsQuery>,
1272) -> Response<Body> {
1273    let Some(handle) = state.events() else {
1274        return events_disabled_response();
1275    };
1276    // Lazily ensure the upstream WS is subscribed for this conid.
1277    if let Err(e) = handle.ensure_market_data(q.conid).await {
1278        return (
1279            StatusCode::BAD_GATEWAY,
1280            Json(serde_json::json!({
1281                "code": "events_subscribe_failed",
1282                "message": e,
1283            })),
1284        )
1285            .into_response();
1286    }
1287
1288    let topic = format!("marketdata:{}", q.conid);
1289    let limit = q.limit.min(1_000);
1290    read_events_topic_resolved(handle, &topic, q.since, limit).await
1291}
1292
1293async fn events_status(State(state): State<AppState>) -> Response<Body> {
1294    let Some(handle) = state.events() else {
1295        return events_disabled_response();
1296    };
1297    Json(handle.status().await).into_response()
1298}
1299
1300#[derive(Debug, Deserialize)]
1301struct HistoryQuery {
1302    /// RFC 3339 timestamp lower bound (exclusive). Required.
1303    since_ts: String,
1304    /// Cap on returned rows. Default 500, server cap 5000.
1305    #[serde(default = "HistoryQuery::default_limit")]
1306    limit: usize,
1307}
1308
1309impl HistoryQuery {
1310    fn default_limit() -> usize {
1311        500
1312    }
1313}
1314
1315async fn events_history(
1316    State(state): State<AppState>,
1317    Path(topic): Path<String>,
1318    Query(q): Query<HistoryQuery>,
1319) -> Response<Body> {
1320    let Some(handle) = state.events() else {
1321        return events_disabled_response();
1322    };
1323    let Some(log) = handle.event_log() else {
1324        return (
1325            StatusCode::SERVICE_UNAVAILABLE,
1326            Json(serde_json::json!({
1327                "code": "events_history_disabled",
1328                "message": "events sqlite persistence is not enabled \
1329                            (set BEZANT_EVENTS_DB_PATH to turn it on)"
1330            })),
1331        )
1332            .into_response();
1333    };
1334    let limit = q.limit.min(5_000).max(1);
1335    let log_clone = log.clone();
1336    let topic_clone = topic.clone();
1337    let since_ts = q.since_ts.clone();
1338    let result = tokio::task::spawn_blocking(move || {
1339        log_clone.query_since(&topic_clone, &since_ts, limit)
1340    })
1341    .await;
1342    match result {
1343        Ok(Ok(events)) => {
1344            let body = serde_json::json!({
1345                "topic": topic,
1346                "events": events,
1347                "count": events.len(),
1348            });
1349            (StatusCode::OK, Json(body)).into_response()
1350        }
1351        Ok(Err(e)) => (
1352            StatusCode::INTERNAL_SERVER_ERROR,
1353            Json(serde_json::json!({
1354                "code": "events_history_query_failed",
1355                "message": e.to_string(),
1356            })),
1357        )
1358            .into_response(),
1359        Err(e) => (
1360            StatusCode::INTERNAL_SERVER_ERROR,
1361            Json(serde_json::json!({
1362                "code": "events_history_join_failed",
1363                "message": e.to_string(),
1364            })),
1365        )
1366            .into_response(),
1367    }
1368}
1369
1370async fn read_events_topic(state: &AppState, topic: &str, q: EventsQuery) -> Response<Body> {
1371    let Some(handle) = state.events() else {
1372        return events_disabled_response();
1373    };
1374    read_events_topic_resolved(handle, topic, q.since, q.limit.min(1_000)).await
1375}
1376
1377async fn read_events_topic_resolved(
1378    handle: &crate::events::EventsHandle,
1379    topic: &str,
1380    since: u64,
1381    limit: usize,
1382) -> Response<Body> {
1383    let Some(result) = handle.read_topic(topic, since, limit.max(1)).await else {
1384        // Topic doesn't exist yet — no events have ever arrived for it.
1385        // Return a 200 with empty array + the caller's cursor so polls
1386        // remain idempotent until the first event lands.
1387        let status = handle.status().await;
1388        let body = serde_json::json!({
1389            "events": [],
1390            "next_cursor": since,
1391            "reset_epoch": status.reset_epoch,
1392        });
1393        return (StatusCode::OK, Json(body)).into_response();
1394    };
1395
1396    use crate::events::ReadResult;
1397    match result {
1398        ReadResult::Ok { events, next_cursor } => {
1399            if events.is_empty() {
1400                // No new events — 204 to make "nothing happened" cheap to
1401                // detect on the client side without parsing a body.
1402                let mut response = Response::builder()
1403                    .status(StatusCode::NO_CONTENT)
1404                    .body(Body::empty())
1405                    .unwrap();
1406                let cursor_str = next_cursor.to_string();
1407                if let Ok(v) = HeaderValue::from_str(&cursor_str) {
1408                    response.headers_mut().insert("x-bezant-cursor", v);
1409                }
1410                return response;
1411            }
1412            let reset_epoch = events
1413                .first()
1414                .map(|e| e.reset_epoch)
1415                .unwrap_or_else(|| 0);
1416            let body = serde_json::json!({
1417                "events": events,
1418                "next_cursor": next_cursor,
1419                "reset_epoch": reset_epoch,
1420            });
1421            (StatusCode::OK, Json(body)).into_response()
1422        }
1423        ReadResult::CursorExpired {
1424            head_cursor,
1425            reset_epoch,
1426        } => {
1427            let body = CursorExpiredBody {
1428                code: "cursor_expired",
1429                head_cursor,
1430                reset_epoch,
1431                message:
1432                    "the requested cursor is older than the oldest buffered event; \
1433                     reset to head_cursor and emit a synthetic gap on the consumer side",
1434            };
1435            (StatusCode::PRECONDITION_FAILED, Json(body)).into_response()
1436        }
1437    }
1438}
1439
1440fn events_disabled_response() -> Response<Body> {
1441    (
1442        StatusCode::SERVICE_UNAVAILABLE,
1443        Json(serde_json::json!({
1444            "code": "events_disabled",
1445            "message": "events capture is not enabled on this bezant-server instance \
1446                        (set BEZANT_EVENTS_ENABLED=1 to turn it on)"
1447        })),
1448    )
1449        .into_response()
1450}
1451
1452#[cfg(test)]
1453mod redact_tests {
1454    use super::redact_tokens;
1455
1456    #[test]
1457    fn redacts_session_field() {
1458        let input = r#"{"session":"AAAA-real-token","other":1}"#;
1459        let out = redact_tokens(input);
1460        assert!(out.contains("<redacted>"), "got: {out}");
1461        assert!(!out.contains("AAAA-real-token"), "raw token leaked: {out}");
1462        // Non-token fields preserved.
1463        assert!(out.contains("\"other\":1"), "got: {out}");
1464    }
1465
1466    #[test]
1467    fn redacts_token_substring_keys() {
1468        let input = r#"{"accessToken":"x","refresh_token":"y","tokenExpiry":99}"#;
1469        let out = redact_tokens(input);
1470        assert!(!out.contains(r#""x""#), "accessToken leaked: {out}");
1471        assert!(!out.contains(r#""y""#), "refresh_token leaked: {out}");
1472        // tokenExpiry has "token" in the name → also redacted (intentional;
1473        // expiry timestamps don't carry secrets but the conservative bias
1474        // is correct for a debug surface).
1475        assert!(!out.contains("99"), "tokenExpiry value leaked: {out}");
1476    }
1477
1478    #[test]
1479    fn passes_non_json_through_verbatim() {
1480        let input = "Not a JSON body, just text.";
1481        let out = redact_tokens(input);
1482        assert_eq!(out, input);
1483    }
1484
1485    #[test]
1486    fn redacts_inside_arrays_and_nested_objects() {
1487        let input = r#"{"sessions":[{"session":"a"},{"session":"b"}]}"#;
1488        let out = redact_tokens(input);
1489        assert!(!out.contains(r#""a""#), "got: {out}");
1490        assert!(!out.contains(r#""b""#), "got: {out}");
1491    }
1492}
1493
1494#[cfg(test)]
1495mod forward_tests {
1496    use super::strip_cookie_domain;
1497
1498    #[test]
1499    fn drops_ibkr_domain_attribute() {
1500        let input = "SID=abc; Domain=.ibkr.com; Path=/; Secure";
1501        // Whitespace retention is deliberate — we only drop the Domain
1502        // segment and keep the original spacing elsewhere.
1503        assert_eq!(strip_cookie_domain(input), "SID=abc; Path=/; Secure");
1504    }
1505
1506    #[test]
1507    fn leaves_cookie_without_domain_untouched() {
1508        let input = "SID=abc; Path=/; Secure";
1509        assert_eq!(strip_cookie_domain(input), input);
1510    }
1511
1512    #[test]
1513    fn case_insensitive() {
1514        let input = "SID=abc; DOMAIN=ibkr.com; Path=/";
1515        assert_eq!(strip_cookie_domain(input), "SID=abc; Path=/");
1516    }
1517}