1use axum::body::Body;
13use axum::extract::{Path, Query, State};
14use axum::http::{header, HeaderMap, HeaderValue, Response, StatusCode};
15use axum::response::IntoResponse;
16use axum::routing::{delete, get};
17use axum::{Json, Router};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20
21use crate::error::AppError;
22use crate::state::AppState;
23
24pub fn router(state: AppState) -> Router {
27 Router::new()
28 .route("/health", get(health))
29 .route("/debug/jar", get(debug_jar))
30 .route("/debug/probe", get(debug_probe))
31 .route("/accounts", get(accounts))
32 .route("/accounts/{account_id}/summary", get(account_summary))
33 .route("/accounts/{account_id}/positions", get(account_positions))
34 .route("/accounts/{account_id}/ledger", get(account_ledger))
35 .route(
36 "/accounts/{account_id}/orders",
37 get(account_orders).post(submit_order),
38 )
39 .route(
40 "/accounts/{account_id}/orders/{order_id}",
41 delete(cancel_order),
42 )
43 .route("/contracts/search", get(contract_search))
44 .route("/market/snapshot", get(market_snapshot))
45 .route("/events/orders", get(events_orders))
46 .route("/events/pnl", get(events_pnl))
47 .route("/events/marketdata", get(events_marketdata))
48 .route("/events/gap", get(events_gap))
49 .route("/events/_status", get(events_status))
50 .route("/events/{topic}/history", get(events_history))
51 .fallback(passthrough_any)
59 .with_state(state)
60}
61
62const HOP_BY_HOP: &[&str] = &[
76 "host",
77 "content-length",
78 "connection",
79 "keep-alive",
80 "proxy-authenticate",
81 "proxy-authorization",
82 "te",
83 "trailer",
84 "transfer-encoding",
85 "upgrade",
86 "cookie",
87 "authorization",
88 "x-forwarded-for",
89 "x-forwarded-host",
90 "x-forwarded-proto",
91 "x-real-ip",
92 "forwarded",
93];
94
95fn is_hop_by_hop(name: &str) -> bool {
96 HOP_BY_HOP.iter().any(|h| name.eq_ignore_ascii_case(h))
97}
98
99#[tracing::instrument(skip_all, fields(method, path))]
103async fn passthrough_any(
104 State(state): State<AppState>,
105 req: axum::extract::Request,
106) -> Result<Response<Body>, AppError> {
107 use axum::http::Method;
108 let method = req.method().clone();
109 let path_and_query = req
110 .uri()
111 .path_and_query()
112 .map(|pq| pq.as_str())
113 .unwrap_or("/")
114 .to_string();
115 let path_only = req.uri().path().to_string();
118 tracing::Span::current().record("method", tracing::field::display(&method));
121 tracing::Span::current().record("path", tracing::field::display(&path_only));
122
123 let gateway_root = state.client().gateway_root_url().as_str();
128 let target = format!("{}{}", gateway_root.trim_end_matches('/'), path_and_query);
129 let target_url: reqwest::Url = target
130 .parse()
131 .map_err(|e| bezant::Error::BadRequest(format!("target url: {e}")))?;
132
133 let headers = req.headers().clone();
134
135 let jar = state.client().cookie_jar();
150 let mut pairs: Vec<&str> = Vec::new();
151 for cookie_header in headers.get_all(axum::http::header::COOKIE) {
152 if let Ok(raw) = cookie_header.to_str() {
153 for pair in raw.split(';') {
154 let trimmed = pair.trim();
155 if trimmed.is_empty() {
156 continue;
157 }
158 let name = trimmed.split('=').next().unwrap_or("").trim();
165 if is_edge_auth_cookie(name) {
166 continue;
167 }
168 pairs.push(trimmed);
169 }
170 }
171 }
172 let injected = pairs.len();
173 if injected > 0 {
174 jar.set_pairs(&pairs);
175 }
176 tracing::debug!(
180 path = %path_only,
181 cookies = injected,
182 "passthrough cookie replay"
183 );
184
185 let body_bytes = axum::body::to_bytes(req.into_body(), 10 * 1024 * 1024)
186 .await
187 .map_err(|e| bezant::Error::BadRequest(format!("read body: {e}")))?;
188
189 let method_reqwest = reqwest::Method::from_bytes(method.as_str().as_bytes())
190 .map_err(|e| bezant::Error::BadRequest(format!("method: {e}")))?;
191 let rewrite_origin = path_only.starts_with("/v1/api/") || path_only == "/v1/api";
203 let gateway_origin = if rewrite_origin {
204 let scheme = target_url.scheme();
205 target_url.host_str().map(|h| match target_url.port() {
206 Some(p) => format!("{scheme}://{h}:{p}"),
207 None => format!("{scheme}://{h}"),
208 })
209 } else {
210 None
211 };
212 let mut builder = state.client().http().request(method_reqwest, &target);
213 for (name, value) in headers.iter() {
214 if is_hop_by_hop(name.as_str()) {
218 continue;
219 }
220 let lower = name.as_str().to_ascii_lowercase();
221 if let Some(ref origin) = gateway_origin {
222 if lower == "origin" {
223 if let Ok(v) = reqwest::header::HeaderValue::from_str(origin) {
224 builder = builder.header(reqwest::header::ORIGIN, v);
225 }
226 continue;
227 }
228 if lower == "referer" {
229 if let Ok(orig) = value.to_str() {
233 let rewritten = rewrite_referer_origin(orig, origin);
234 if let Ok(v) = reqwest::header::HeaderValue::from_str(&rewritten) {
235 builder = builder.header(reqwest::header::REFERER, v);
236 }
237 }
238 continue;
239 }
240 }
241 if let Ok(v) = reqwest::header::HeaderValue::from_bytes(value.as_bytes()) {
242 if let Ok(name) = reqwest::header::HeaderName::from_bytes(name.as_str().as_bytes()) {
243 builder = builder.header(name, v);
244 }
245 }
246 }
247 if method != Method::GET && method != Method::HEAD {
248 let len = body_bytes.len();
253 builder = builder
254 .header(reqwest::header::CONTENT_LENGTH, len.to_string())
255 .body(body_bytes.to_vec());
256 }
257
258 let resp = builder.send().await.map_err(bezant::Error::Http)?;
259 forward(resp).await
260}
261
262#[derive(Serialize)]
263struct HealthBody {
264 authenticated: bool,
265 connected: bool,
266 competing: bool,
267 message: Option<String>,
268}
269
270async fn debug_jar(
280 State(state): State<AppState>,
281 headers: HeaderMap,
282 Query(q): Query<HashMap<String, String>>,
283) -> Response<Body> {
284 if let Err(resp) = debug_auth(&state, &headers, &q) {
285 return resp;
286 }
287 let jar = state.client().cookie_jar();
288 let entries: Vec<serde_json::Value> = jar
289 .snapshot()
290 .into_iter()
291 .map(|(name, value)| {
292 serde_json::json!({
293 "name": name,
294 "value_length": value.len(),
295 })
296 })
297 .collect();
298 let body = serde_json::json!({
299 "gateway_root": state.client().gateway_root_url().as_str(),
300 "size": entries.len(),
301 "entries": entries,
302 });
303 Json(body).into_response()
304}
305
306#[allow(clippy::result_large_err)]
320fn debug_auth(
323 state: &AppState,
324 headers: &HeaderMap,
325 query: &HashMap<String, String>,
326) -> Result<(), Response<Body>> {
327 let Some(expected) = state.debug_token() else {
328 return Err(Response::builder()
329 .status(StatusCode::NOT_FOUND)
330 .body(Body::empty())
331 .unwrap_or_default());
332 };
333 let presented = headers
334 .get("x-bezant-debug-token")
335 .and_then(|v| v.to_str().ok())
336 .or_else(|| query.get("token").map(String::as_str))
337 .unwrap_or("");
338 if constant_time_eq(presented.as_bytes(), expected.as_bytes()) {
339 return Ok(());
340 }
341 Err(Response::builder()
342 .status(StatusCode::UNAUTHORIZED)
343 .header(header::CONTENT_TYPE, "application/json")
344 .body(Body::from(
345 r#"{"code":"debug_unauthorized","message":"missing or invalid debug token"}"#,
346 ))
347 .unwrap_or_default())
348}
349
350fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
354 if a.len() != b.len() {
355 return false;
356 }
357 let mut diff = 0u8;
358 for (x, y) in a.iter().zip(b.iter()) {
359 diff |= x ^ y;
360 }
361 diff == 0
362}
363
364async fn debug_probe(
393 State(state): State<AppState>,
394 headers: HeaderMap,
395 Query(q): Query<HashMap<String, String>>,
396) -> Response<Body> {
397 if let Err(resp) = debug_auth(&state, &headers, &q) {
398 return resp;
399 }
400 let client = state.client();
401 let started = std::time::Instant::now();
402 let jar_before = client.cookie_jar().snapshot().len();
403
404 let auth_status = probe_step(
405 client,
406 "auth_status",
407 reqwest::Method::POST,
408 &["iserver", "auth", "status"],
409 None,
410 )
411 .await;
412 let already_bridged = is_authenticated(&auth_status);
420 let ssodh_init = if already_bridged {
421 skipped_step(
422 "ssodh_init",
423 "POST",
424 "/v1/api/iserver/auth/ssodh/init",
425 "session already bridged (auth_status authenticated)",
426 )
427 } else {
428 probe_step(
429 client,
430 "ssodh_init",
431 reqwest::Method::POST,
432 &["iserver", "auth", "ssodh", "init"],
433 Some(serde_json::json!({ "publish": true, "compete": true })),
434 )
435 .await
436 };
437 let tickle = probe_step(client, "tickle", reqwest::Method::POST, &["tickle"], None).await;
438 let accounts = probe_step(
439 client,
440 "accounts",
441 reqwest::Method::GET,
442 &["portfolio", "accounts"],
443 None,
444 )
445 .await;
446
447 let verdict = compute_verdict(&auth_status, &ssodh_init, &tickle, &accounts);
448 let jar_after = client.cookie_jar().snapshot().len();
449
450 let body = serde_json::json!({
451 "gateway_root": client.gateway_root_url().as_str(),
452 "elapsed_ms": started.elapsed().as_millis() as u64,
453 "jar_size_before": jar_before,
454 "jar_size_after": jar_after,
455 "verdict": verdict,
456 "steps": [auth_status, ssodh_init, tickle, accounts],
457 });
458 Json(body).into_response()
459}
460
461async fn probe_step(
470 client: &bezant::Client,
471 name: &'static str,
472 method: reqwest::Method,
473 path_segments: &[&str],
474 body: Option<serde_json::Value>,
475) -> serde_json::Value {
476 let mut url = client.base_url().clone();
477 if let Ok(mut segs) = url.path_segments_mut() {
478 for seg in path_segments {
479 segs.push(seg);
480 }
481 }
482 let path_for_log = url.path().to_owned();
483
484 let gateway_origin = client
485 .gateway_root_url()
486 .as_str()
487 .trim_end_matches('/')
488 .to_owned();
489
490 let mut builder = client
491 .http()
492 .request(method.clone(), url.clone())
493 .header(reqwest::header::ORIGIN, &gateway_origin)
494 .header(reqwest::header::REFERER, format!("{gateway_origin}/"));
495
496 let body_bytes: Vec<u8> = match (&method, body) {
497 (m, _) if m == reqwest::Method::GET || m == reqwest::Method::HEAD => Vec::new(),
498 (_, Some(json)) => serde_json::to_vec(&json).unwrap_or_default(),
499 (_, None) => Vec::new(),
500 };
501 if method != reqwest::Method::GET && method != reqwest::Method::HEAD {
502 builder = builder
503 .header(
504 reqwest::header::CONTENT_LENGTH,
505 body_bytes.len().to_string(),
506 )
507 .body(body_bytes.clone());
508 if !body_bytes.is_empty() {
509 builder = builder.header(reqwest::header::CONTENT_TYPE, "application/json");
510 }
511 }
512
513 let started = std::time::Instant::now();
514 let result = match tokio::time::timeout(std::time::Duration::from_secs(5), builder.send()).await
519 {
520 Ok(send_result) => send_result.map_err(|e| e.to_string()),
521 Err(_) => Err("step timed out after 5s".to_string()),
522 };
523 let latency_ms = started.elapsed().as_millis() as u64;
524
525 match result {
526 Ok(resp) => {
527 let status = resp.status().as_u16();
528 let set_cookie_names: Vec<String> = resp
529 .headers()
530 .get_all(reqwest::header::SET_COOKIE)
531 .iter()
532 .filter_map(|v| v.to_str().ok())
533 .map(|raw| {
534 raw.split(';')
535 .next()
536 .and_then(|s| s.split('=').next())
537 .map(|s| s.trim().to_owned())
538 .unwrap_or_default()
539 })
540 .filter(|s| !s.is_empty())
541 .collect();
542 let bytes = read_capped(resp, 1024 * 1024).await.unwrap_or_default();
547 let parsed_authenticated = serde_json::from_slice::<serde_json::Value>(&bytes)
553 .ok()
554 .and_then(|v| v["authenticated"].as_bool());
555 let preview_len = bytes.len().min(512);
561 let raw_preview = String::from_utf8_lossy(&bytes[..preview_len]).into_owned();
562 let body_preview = redact_tokens(&raw_preview);
563 serde_json::json!({
564 "name": name,
565 "method": method.as_str(),
566 "path": path_for_log,
567 "status": status,
568 "latency_ms": latency_ms,
569 "body_bytes": bytes.len(),
570 "body_preview": body_preview,
571 "set_cookie_names": set_cookie_names,
572 "error": serde_json::Value::Null,
573 "_authenticated": parsed_authenticated,
574 })
575 }
576 Err(e) => serde_json::json!({
577 "name": name,
578 "method": method.as_str(),
579 "path": path_for_log,
580 "status": serde_json::Value::Null,
581 "latency_ms": latency_ms,
582 "body_bytes": 0,
583 "body_preview": "",
584 "set_cookie_names": [],
585 "error": e,
586 }),
587 }
588}
589
590fn redact_tokens(preview: &str) -> String {
603 let Ok(mut value) = serde_json::from_str::<serde_json::Value>(preview) else {
604 return preview.to_owned();
608 };
609 redact_in_place(&mut value);
610 value.to_string()
611}
612
613fn redact_in_place(value: &mut serde_json::Value) {
614 match value {
615 serde_json::Value::Object(map) => {
616 for (k, v) in map.iter_mut() {
617 let lower = k.to_ascii_lowercase();
618 if lower == "session"
619 || lower == "ssoconclusion"
620 || lower.contains("token")
621 || lower.contains("secret")
622 {
623 *v = serde_json::Value::String("<redacted>".to_owned());
624 } else {
625 redact_in_place(v);
626 }
627 }
628 }
629 serde_json::Value::Array(arr) => {
630 for v in arr.iter_mut() {
631 redact_in_place(v);
632 }
633 }
634 _ => {}
635 }
636}
637
638fn compute_verdict(
646 auth_status: &serde_json::Value,
647 ssodh_init: &serde_json::Value,
648 tickle: &serde_json::Value,
649 accounts: &serde_json::Value,
650) -> &'static str {
651 if !is_2xx(auth_status) {
652 return "auth_status_failed";
653 }
654 let ssodh_ran_and_failed = !is_skipped(ssodh_init) && !is_2xx(ssodh_init);
661 if ssodh_ran_and_failed {
662 return "ssodh_failed";
663 }
664 if !is_authenticated(auth_status) {
665 return "needs_login";
666 }
667 if !is_2xx_or_skipped(tickle) {
668 return "tickle_failed";
669 }
670 if !is_2xx_or_skipped(accounts) {
671 return "accounts_failed";
672 }
673 "ok"
674}
675
676fn is_skipped(step: &serde_json::Value) -> bool {
677 step["skipped"].as_bool().unwrap_or(false)
678}
679
680fn is_2xx(step: &serde_json::Value) -> bool {
681 matches!(step["status"].as_u64(), Some(200..=299))
682}
683
684fn is_2xx_or_skipped(step: &serde_json::Value) -> bool {
685 is_2xx(step) || step["skipped"].as_bool().unwrap_or(false)
686}
687
688fn is_authenticated(step: &serde_json::Value) -> bool {
694 if !is_2xx(step) {
695 return false;
696 }
697 step["_authenticated"].as_bool().unwrap_or(false)
698}
699
700fn skipped_step(
704 name: &'static str,
705 method: &'static str,
706 path: &'static str,
707 reason: &'static str,
708) -> serde_json::Value {
709 serde_json::json!({
710 "name": name,
711 "method": method,
712 "path": path,
713 "status": serde_json::Value::Null,
714 "latency_ms": 0,
715 "body_bytes": 0,
716 "body_preview": "",
717 "set_cookie_names": [],
718 "error": serde_json::Value::Null,
719 "skipped": true,
720 "skipped_reason": reason,
721 })
722}
723
724#[tracing::instrument(skip_all)]
725async fn health(State(state): State<AppState>) -> Result<Json<HealthBody>, AppError> {
726 let status = state.client().auth_status().await?;
727 Ok(Json(HealthBody {
728 authenticated: status.authenticated,
729 connected: status.connected,
730 competing: status.competing,
731 message: status.message,
732 }))
733}
734
735#[tracing::instrument(skip_all)]
736async fn accounts(State(state): State<AppState>) -> Result<Response<Body>, AppError> {
737 passthrough_get(&state, &["portfolio", "accounts"], &[]).await
738}
739
740#[tracing::instrument(skip(state), fields(account_id = %account_id))]
741async fn account_summary(
742 State(state): State<AppState>,
743 Path(account_id): Path<String>,
744) -> Result<Response<Body>, AppError> {
745 passthrough_get(&state, &["portfolio", account_id.as_str(), "summary"], &[]).await
746}
747
748#[derive(Deserialize, Debug)]
749struct PositionsQuery {
750 #[serde(default)]
751 page: u32,
752}
753
754#[tracing::instrument(skip(state), fields(account_id = %account_id, page = q.page))]
755async fn account_positions(
756 State(state): State<AppState>,
757 Path(account_id): Path<String>,
758 Query(q): Query<PositionsQuery>,
759) -> Result<Response<Body>, AppError> {
760 let page = q.page.to_string();
761 passthrough_get(
762 &state,
763 &["portfolio", account_id.as_str(), "positions", page.as_str()],
764 &[],
765 )
766 .await
767}
768
769#[tracing::instrument(skip(state), fields(account_id = %account_id))]
770async fn account_ledger(
771 State(state): State<AppState>,
772 Path(account_id): Path<String>,
773) -> Result<Response<Body>, AppError> {
774 passthrough_get(&state, &["portfolio", account_id.as_str(), "ledger"], &[]).await
775}
776
777#[tracing::instrument(skip(state), fields(account_id = %account_id))]
779async fn account_orders(
780 State(state): State<AppState>,
781 Path(account_id): Path<String>,
782) -> Result<Response<Body>, AppError> {
783 passthrough_get(
785 &state,
786 &["iserver", "account", "orders"],
787 &[("accountId", account_id.as_str())],
788 )
789 .await
790}
791
792#[tracing::instrument(skip(state, body), fields(account_id = %account_id))]
798async fn submit_order(
799 State(state): State<AppState>,
800 Path(account_id): Path<String>,
801 axum::extract::Json(body): axum::extract::Json<serde_json::Value>,
802) -> Result<Response<Body>, AppError> {
803 let mut url = state.client().base_url().clone();
804 {
805 let mut segs = url
806 .path_segments_mut()
807 .map_err(|()| bezant::Error::UrlNotABase {
808 url: state.client().base_url().to_string(),
809 })?;
810 segs.push("iserver")
811 .push("account")
812 .push(account_id.as_str())
813 .push("orders");
814 }
815 let resp = state
816 .client()
817 .http()
818 .post(url)
819 .json(&body)
820 .send()
821 .await
822 .map_err(bezant::Error::Http)?;
823 forward(resp).await
824}
825
826#[tracing::instrument(skip(state), fields(account_id = %account_id, order_id = %order_id))]
828async fn cancel_order(
829 State(state): State<AppState>,
830 Path((account_id, order_id)): Path<(String, String)>,
831) -> Result<Response<Body>, AppError> {
832 let mut url = state.client().base_url().clone();
833 {
834 let mut segs = url
835 .path_segments_mut()
836 .map_err(|()| bezant::Error::UrlNotABase {
837 url: state.client().base_url().to_string(),
838 })?;
839 segs.push("iserver")
840 .push("account")
841 .push(account_id.as_str())
842 .push("order")
843 .push(order_id.as_str());
844 }
845 let resp = state
846 .client()
847 .http()
848 .delete(url)
849 .send()
850 .await
851 .map_err(bezant::Error::Http)?;
852 forward(resp).await
853}
854
855#[derive(Deserialize)]
856struct ContractSearchQuery {
857 symbol: String,
858 #[serde(default)]
859 name: bool,
860 #[serde(rename = "secType", default = "default_sec_type")]
861 sec_type: String,
862}
863
864fn default_sec_type() -> String {
865 "STK".into()
866}
867
868async fn contract_search(
869 State(state): State<AppState>,
870 Query(q): Query<ContractSearchQuery>,
871) -> Result<Response<Body>, AppError> {
872 let mut url = state.client().base_url().clone();
874 {
875 let mut segs = url
876 .path_segments_mut()
877 .map_err(|()| bezant::Error::UrlNotABase {
878 url: state.client().base_url().to_string(),
879 })?;
880 segs.push("iserver").push("secdef").push("search");
881 }
882 let body = serde_json::json!({
883 "symbol": q.symbol,
884 "name": q.name,
885 "secType": q.sec_type,
886 });
887 let resp = state
888 .client()
889 .http()
890 .post(url)
891 .json(&body)
892 .send()
893 .await
894 .map_err(bezant::Error::Http)?;
895 forward(resp).await
896}
897
898async fn market_snapshot(
899 State(state): State<AppState>,
900 Query(q): Query<HashMap<String, String>>,
901) -> Result<Response<Body>, AppError> {
902 let conids = q
903 .get("conids")
904 .ok_or(bezant::Error::MissingQuery { name: "conids" })?;
905 let fields = q
906 .get("fields")
907 .cloned()
908 .unwrap_or_else(|| "31,84,86,87".into());
909 passthrough_get(
910 &state,
911 &["iserver", "marketdata", "snapshot"],
912 &[("conids", conids), ("fields", &fields)],
913 )
914 .await
915}
916
917async fn passthrough_get(
920 state: &AppState,
921 path_segments: &[&str],
922 query: &[(&str, &str)],
923) -> Result<Response<Body>, AppError> {
924 let mut url = state.client().base_url().clone();
925 {
926 let mut segs = url
927 .path_segments_mut()
928 .map_err(|()| bezant::Error::UrlNotABase {
929 url: state.client().base_url().to_string(),
930 })?;
931 for seg in path_segments {
932 segs.push(seg);
933 }
934 }
935 if !query.is_empty() {
936 let mut q = url.query_pairs_mut();
937 for (k, v) in query {
938 q.append_pair(k, v);
939 }
940 }
941 let resp = state
942 .client()
943 .http()
944 .get(url)
945 .send()
946 .await
947 .map_err(bezant::Error::Http)?;
948 forward(resp).await
949}
950
951const MAX_UPSTREAM_BODY_BYTES: usize = 25 * 1024 * 1024;
984
985#[tracing::instrument(skip_all, fields(upstream_status = %resp.status()))]
986async fn forward(resp: reqwest::Response) -> Result<Response<Body>, AppError> {
987 let status = resp.status();
988 let headers_src = resp.headers().clone();
989 let body_must_be_empty =
990 matches!(status.as_u16(), 100..=199 | 204 | 304) || status.is_redirection();
991
992 let bytes: Vec<u8> = match read_capped(resp, MAX_UPSTREAM_BODY_BYTES).await {
993 Ok(b) => b,
994 Err(e) if body_must_be_empty => {
995 tracing::debug!(
1000 %status,
1001 error = %e,
1002 "forward: empty-body fallback on no-body status"
1003 );
1004 Vec::new()
1005 }
1006 Err(e) => {
1007 return Err(bezant::Error::UpstreamStatus {
1008 endpoint: "passthrough",
1009 status: status.as_u16(),
1010 body_preview: Some(e),
1011 }
1012 .into())
1013 }
1014 };
1015
1016 let body_is_empty = bytes.is_empty();
1017 let status = StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
1018
1019 let mut headers = HeaderMap::new();
1020 let mut had_content_type = false;
1021 for (name, value) in headers_src.iter() {
1022 let n = name.as_str().to_ascii_lowercase();
1023 if is_hop_by_hop_response(&n) {
1024 continue;
1025 }
1026 if n == "set-cookie" {
1028 let value_bytes: Vec<u8> = match value.to_str() {
1029 Ok(raw) => strip_cookie_domain(raw).into_bytes(),
1030 Err(_) => value.as_bytes().to_vec(),
1031 };
1032 if let (Ok(name), Ok(value)) = (
1033 header::HeaderName::from_bytes(name.as_str().as_bytes()),
1034 HeaderValue::from_bytes(&value_bytes),
1035 ) {
1036 headers.append(name, value);
1037 }
1038 continue;
1039 }
1040 if n == "content-type" {
1044 let raw = value.to_str().unwrap_or("");
1045 let rewrite = !body_is_empty
1046 && !body_must_be_empty
1047 && raw.eq_ignore_ascii_case("application/octet-stream");
1048 let bytes_to_emit: &[u8] = if rewrite {
1049 b"text/html; charset=UTF-8"
1050 } else {
1051 value.as_bytes()
1052 };
1053 if let (Ok(name), Ok(value)) = (
1054 header::HeaderName::from_bytes(name.as_str().as_bytes()),
1055 HeaderValue::from_bytes(bytes_to_emit),
1056 ) {
1057 headers.insert(name, value);
1058 had_content_type = true;
1059 }
1060 continue;
1061 }
1062 if let (Ok(name), Ok(value)) = (
1063 header::HeaderName::from_bytes(name.as_str().as_bytes()),
1064 HeaderValue::from_bytes(value.as_bytes()),
1065 ) {
1066 headers.append(name, value);
1067 }
1068 }
1069
1070 if !had_content_type && !body_is_empty && !body_must_be_empty {
1074 headers.insert(
1075 header::CONTENT_TYPE,
1076 HeaderValue::from_static("text/html; charset=UTF-8"),
1077 );
1078 }
1079
1080 let mut response = Response::builder()
1086 .status(status)
1087 .body(Body::from(bytes))
1088 .map_err(|e| bezant::Error::ResponseBuild(e.to_string()))?;
1089 *response.headers_mut() = headers;
1090 Ok(response)
1091}
1092
1093fn is_hop_by_hop_response(name: &str) -> bool {
1095 matches!(
1096 name,
1097 "content-length"
1098 | "connection"
1099 | "keep-alive"
1100 | "proxy-authenticate"
1101 | "proxy-authorization"
1102 | "te"
1103 | "trailer"
1104 | "transfer-encoding"
1105 | "upgrade"
1106 )
1107}
1108
1109fn rewrite_referer_origin(original: &str, new_origin: &str) -> String {
1114 match url::Url::parse(original) {
1115 Ok(u) => {
1116 let mut path_and_query = u.path().to_owned();
1117 if let Some(q) = u.query() {
1118 path_and_query.push('?');
1119 path_and_query.push_str(q);
1120 }
1121 format!("{}{}", new_origin.trim_end_matches('/'), path_and_query)
1122 }
1123 Err(_) => new_origin.to_owned(),
1124 }
1125}
1126
1127fn is_edge_auth_cookie(name: &str) -> bool {
1144 const BUILTIN_PREFIXES: &[&str] = &[
1145 "CF_Authorization",
1146 "CF_AppSession",
1147 "AWSELBAuthSessionCookie",
1148 "_oauth2_proxy",
1149 "_vercel_jwt",
1150 "_vercel_sso_nonce",
1151 "_pomerium",
1152 ];
1153 if BUILTIN_PREFIXES
1154 .iter()
1155 .any(|p| name.eq_ignore_ascii_case(p) || name.starts_with(p))
1156 {
1157 return true;
1158 }
1159 if let Ok(extra) = std::env::var("BEZANT_EDGE_COOKIE_PREFIXES") {
1161 for prefix in extra.split(',').map(str::trim).filter(|p| !p.is_empty()) {
1162 if name.starts_with(prefix) {
1163 return true;
1164 }
1165 }
1166 }
1167 false
1168}
1169
1170async fn read_capped(resp: reqwest::Response, max: usize) -> std::result::Result<Vec<u8>, String> {
1178 use futures_util::StreamExt;
1179 let mut bytes = Vec::new();
1180 let mut stream = resp.bytes_stream();
1181 while let Some(chunk) = stream.next().await {
1182 let chunk = chunk.map_err(|e| format!("read chunk: {e}"))?;
1183 if bytes.len() + chunk.len() > max {
1184 return Err(format!(
1185 "upstream body exceeded {max} byte cap (>{}B)",
1186 bytes.len() + chunk.len()
1187 ));
1188 }
1189 bytes.extend_from_slice(&chunk);
1190 }
1191 Ok(bytes)
1192}
1193
1194fn strip_cookie_domain(value: &str) -> String {
1199 value
1200 .split(';')
1201 .filter(|part| !part.trim().to_ascii_lowercase().starts_with("domain="))
1202 .collect::<Vec<_>>()
1203 .join(";")
1204}
1205
1206#[derive(Debug, Deserialize)]
1213struct EventsQuery {
1214 #[serde(default)]
1217 since: u64,
1218 #[serde(default = "EventsQuery::default_limit")]
1221 limit: usize,
1222}
1223
1224impl EventsQuery {
1225 fn default_limit() -> usize {
1226 100
1227 }
1228}
1229
1230#[derive(Debug, Serialize)]
1232struct CursorExpiredBody {
1233 code: &'static str,
1234 head_cursor: u64,
1235 reset_epoch: u64,
1236 message: &'static str,
1237}
1238
1239async fn events_orders(
1240 State(state): State<AppState>,
1241 Query(q): Query<EventsQuery>,
1242) -> Response<Body> {
1243 read_events_topic(&state, "orders", q).await
1244}
1245
1246async fn events_pnl(
1247 State(state): State<AppState>,
1248 Query(q): Query<EventsQuery>,
1249) -> Response<Body> {
1250 read_events_topic(&state, "pnl", q).await
1251}
1252
1253async fn events_gap(
1254 State(state): State<AppState>,
1255 Query(q): Query<EventsQuery>,
1256) -> Response<Body> {
1257 read_events_topic(&state, "gap", q).await
1258}
1259
1260#[derive(Debug, Deserialize)]
1261struct MarketDataEventsQuery {
1262 conid: i64,
1263 #[serde(default)]
1264 since: u64,
1265 #[serde(default = "EventsQuery::default_limit")]
1266 limit: usize,
1267}
1268
1269async fn events_marketdata(
1270 State(state): State<AppState>,
1271 Query(q): Query<MarketDataEventsQuery>,
1272) -> Response<Body> {
1273 let Some(handle) = state.events() else {
1274 return events_disabled_response();
1275 };
1276 if let Err(e) = handle.ensure_market_data(q.conid).await {
1278 return (
1279 StatusCode::BAD_GATEWAY,
1280 Json(serde_json::json!({
1281 "code": "events_subscribe_failed",
1282 "message": e,
1283 })),
1284 )
1285 .into_response();
1286 }
1287
1288 let topic = format!("marketdata:{}", q.conid);
1289 let limit = q.limit.min(1_000);
1290 read_events_topic_resolved(handle, &topic, q.since, limit).await
1291}
1292
1293async fn events_status(State(state): State<AppState>) -> Response<Body> {
1294 let Some(handle) = state.events() else {
1295 return events_disabled_response();
1296 };
1297 Json(handle.status().await).into_response()
1298}
1299
1300#[derive(Debug, Deserialize)]
1301struct HistoryQuery {
1302 since_ts: String,
1304 #[serde(default = "HistoryQuery::default_limit")]
1306 limit: usize,
1307}
1308
1309impl HistoryQuery {
1310 fn default_limit() -> usize {
1311 500
1312 }
1313}
1314
1315async fn events_history(
1316 State(state): State<AppState>,
1317 Path(topic): Path<String>,
1318 Query(q): Query<HistoryQuery>,
1319) -> Response<Body> {
1320 let Some(handle) = state.events() else {
1321 return events_disabled_response();
1322 };
1323 let Some(log) = handle.event_log() else {
1324 return (
1325 StatusCode::SERVICE_UNAVAILABLE,
1326 Json(serde_json::json!({
1327 "code": "events_history_disabled",
1328 "message": "events sqlite persistence is not enabled \
1329 (set BEZANT_EVENTS_DB_PATH to turn it on)"
1330 })),
1331 )
1332 .into_response();
1333 };
1334 let limit = q.limit.min(5_000).max(1);
1335 let log_clone = log.clone();
1336 let topic_clone = topic.clone();
1337 let since_ts = q.since_ts.clone();
1338 let result = tokio::task::spawn_blocking(move || {
1339 log_clone.query_since(&topic_clone, &since_ts, limit)
1340 })
1341 .await;
1342 match result {
1343 Ok(Ok(events)) => {
1344 let body = serde_json::json!({
1345 "topic": topic,
1346 "events": events,
1347 "count": events.len(),
1348 });
1349 (StatusCode::OK, Json(body)).into_response()
1350 }
1351 Ok(Err(e)) => (
1352 StatusCode::INTERNAL_SERVER_ERROR,
1353 Json(serde_json::json!({
1354 "code": "events_history_query_failed",
1355 "message": e.to_string(),
1356 })),
1357 )
1358 .into_response(),
1359 Err(e) => (
1360 StatusCode::INTERNAL_SERVER_ERROR,
1361 Json(serde_json::json!({
1362 "code": "events_history_join_failed",
1363 "message": e.to_string(),
1364 })),
1365 )
1366 .into_response(),
1367 }
1368}
1369
1370async fn read_events_topic(state: &AppState, topic: &str, q: EventsQuery) -> Response<Body> {
1371 let Some(handle) = state.events() else {
1372 return events_disabled_response();
1373 };
1374 read_events_topic_resolved(handle, topic, q.since, q.limit.min(1_000)).await
1375}
1376
1377async fn read_events_topic_resolved(
1378 handle: &crate::events::EventsHandle,
1379 topic: &str,
1380 since: u64,
1381 limit: usize,
1382) -> Response<Body> {
1383 let Some(result) = handle.read_topic(topic, since, limit.max(1)).await else {
1384 let status = handle.status().await;
1388 let body = serde_json::json!({
1389 "events": [],
1390 "next_cursor": since,
1391 "reset_epoch": status.reset_epoch,
1392 });
1393 return (StatusCode::OK, Json(body)).into_response();
1394 };
1395
1396 use crate::events::ReadResult;
1397 match result {
1398 ReadResult::Ok { events, next_cursor } => {
1399 if events.is_empty() {
1400 let mut response = Response::builder()
1403 .status(StatusCode::NO_CONTENT)
1404 .body(Body::empty())
1405 .unwrap();
1406 let cursor_str = next_cursor.to_string();
1407 if let Ok(v) = HeaderValue::from_str(&cursor_str) {
1408 response.headers_mut().insert("x-bezant-cursor", v);
1409 }
1410 return response;
1411 }
1412 let reset_epoch = events
1413 .first()
1414 .map(|e| e.reset_epoch)
1415 .unwrap_or_else(|| 0);
1416 let body = serde_json::json!({
1417 "events": events,
1418 "next_cursor": next_cursor,
1419 "reset_epoch": reset_epoch,
1420 });
1421 (StatusCode::OK, Json(body)).into_response()
1422 }
1423 ReadResult::CursorExpired {
1424 head_cursor,
1425 reset_epoch,
1426 } => {
1427 let body = CursorExpiredBody {
1428 code: "cursor_expired",
1429 head_cursor,
1430 reset_epoch,
1431 message:
1432 "the requested cursor is older than the oldest buffered event; \
1433 reset to head_cursor and emit a synthetic gap on the consumer side",
1434 };
1435 (StatusCode::PRECONDITION_FAILED, Json(body)).into_response()
1436 }
1437 }
1438}
1439
1440fn events_disabled_response() -> Response<Body> {
1441 (
1442 StatusCode::SERVICE_UNAVAILABLE,
1443 Json(serde_json::json!({
1444 "code": "events_disabled",
1445 "message": "events capture is not enabled on this bezant-server instance \
1446 (set BEZANT_EVENTS_ENABLED=1 to turn it on)"
1447 })),
1448 )
1449 .into_response()
1450}
1451
1452#[cfg(test)]
1453mod redact_tests {
1454 use super::redact_tokens;
1455
1456 #[test]
1457 fn redacts_session_field() {
1458 let input = r#"{"session":"AAAA-real-token","other":1}"#;
1459 let out = redact_tokens(input);
1460 assert!(out.contains("<redacted>"), "got: {out}");
1461 assert!(!out.contains("AAAA-real-token"), "raw token leaked: {out}");
1462 assert!(out.contains("\"other\":1"), "got: {out}");
1464 }
1465
1466 #[test]
1467 fn redacts_token_substring_keys() {
1468 let input = r#"{"accessToken":"x","refresh_token":"y","tokenExpiry":99}"#;
1469 let out = redact_tokens(input);
1470 assert!(!out.contains(r#""x""#), "accessToken leaked: {out}");
1471 assert!(!out.contains(r#""y""#), "refresh_token leaked: {out}");
1472 assert!(!out.contains("99"), "tokenExpiry value leaked: {out}");
1476 }
1477
1478 #[test]
1479 fn passes_non_json_through_verbatim() {
1480 let input = "Not a JSON body, just text.";
1481 let out = redact_tokens(input);
1482 assert_eq!(out, input);
1483 }
1484
1485 #[test]
1486 fn redacts_inside_arrays_and_nested_objects() {
1487 let input = r#"{"sessions":[{"session":"a"},{"session":"b"}]}"#;
1488 let out = redact_tokens(input);
1489 assert!(!out.contains(r#""a""#), "got: {out}");
1490 assert!(!out.contains(r#""b""#), "got: {out}");
1491 }
1492}
1493
1494#[cfg(test)]
1495mod forward_tests {
1496 use super::strip_cookie_domain;
1497
1498 #[test]
1499 fn drops_ibkr_domain_attribute() {
1500 let input = "SID=abc; Domain=.ibkr.com; Path=/; Secure";
1501 assert_eq!(strip_cookie_domain(input), "SID=abc; Path=/; Secure");
1504 }
1505
1506 #[test]
1507 fn leaves_cookie_without_domain_untouched() {
1508 let input = "SID=abc; Path=/; Secure";
1509 assert_eq!(strip_cookie_domain(input), input);
1510 }
1511
1512 #[test]
1513 fn case_insensitive() {
1514 let input = "SID=abc; DOMAIN=ibkr.com; Path=/";
1515 assert_eq!(strip_cookie_domain(input), "SID=abc; Path=/");
1516 }
1517}