diff --git a/crates/zx_document_core/src/events_v2.rs b/crates/zx_document_core/src/events_v2.rs index a2174d7..69bb3b7 100644 --- a/crates/zx_document_core/src/events_v2.rs +++ b/crates/zx_document_core/src/events_v2.rs @@ -7,9 +7,24 @@ use crate::session_v2; const MAX_BUFFER_SIZE: usize = 1000; -fn buffer() -> &'static Mutex> { +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum BufferedEventState { + Pending, + Exported, + Failed, +} + +#[derive(Debug, Clone)] +pub struct BufferedReadingEventV2 { + pub event: ReadingEventV2, + pub state: BufferedEventState, + pub exported_at_ms: Option, + pub retry_count: u32, +} + +fn buffer() -> &'static Mutex> { use std::sync::OnceLock; - static BUF: OnceLock>> = OnceLock::new(); + static BUF: OnceLock>> = OnceLock::new(); BUF.get_or_init(|| Mutex::new(Vec::new())) } @@ -68,14 +83,25 @@ fn push_event( sequence: seq, }; + let buffered = BufferedReadingEventV2 { + event: event.clone(), + state: BufferedEventState::Pending, + exported_at_ms: None, + retry_count: 0, + }; + match buffer().lock() { Ok(mut buf) => { if buf.len() >= MAX_BUFFER_SIZE { - buf.remove(0); + // Evict in order: Failed → Exported → oldest Pending + let idx = buf.iter().position(|b| b.state == BufferedEventState::Failed) + .or_else(|| buf.iter().position(|b| b.state == BufferedEventState::Exported)) + .unwrap_or(0); + buf.remove(idx); } - buf.push(event.clone()); + buf.push(buffered); } - Err(_) => { /* poison: drop */ } + Err(_) => {} } Ok(event) @@ -125,16 +151,56 @@ pub fn push_marked_as_read_v2( push_event(session_id, material_id, ReadingEventTypeV2::MarkedAsRead, None, 0, timestamp_ms) } -// ── Buffer Management ── +// ── Buffer Management (V2 with ack) ── -pub fn export_pending_events_v2() -> Vec { - buffer().lock().map(|buf| buf.clone()).unwrap_or_default() +/// Export Pending and Failed events. Marks them as Exported. +pub fn export_pending_events_v2(limit: u32, timestamp_ms: i64) -> Vec { + match buffer().lock() { + Ok(mut buf) => { + let mut result = Vec::new(); + let mut count = 0u32; + for item in buf.iter_mut() { + if count >= limit { break; } + if item.state == BufferedEventState::Pending || item.state == BufferedEventState::Failed { + item.state = BufferedEventState::Exported; + item.exported_at_ms = Some(timestamp_ms); + result.push(item.event.clone()); + count += 1; + } + } + result + } + Err(_) => Vec::new(), + } } -pub fn clear_exported_events_v2(count: usize) { - if let Ok(mut buf) = buffer().lock() { - let n = count.min(buf.len()); - buf.drain(..n); +/// Acknowledge events by eventId. Removes them from the buffer. +pub fn ack_events_v2(event_ids: &[String]) -> u32 { + match buffer().lock() { + Ok(mut buf) => { + let before = buf.len(); + buf.retain(|item| !event_ids.contains(&item.event.event_id)); + (before - buf.len()) as u32 + } + Err(_) => 0, + } +} + +/// Mark exported events as Failed (for retry). +pub fn mark_events_failed_v2(event_ids: &[String]) -> u32 { + match buffer().lock() { + Ok(mut buf) => { + let mut count = 0; + for item in buf.iter_mut() { + if event_ids.contains(&item.event.event_id) { + item.state = BufferedEventState::Failed; + item.retry_count += 1; + count += 1; + } + } + count + } + Err(_) => 0, } } @@ -172,8 +238,6 @@ mod tests { assert_eq!(ev.event_type, ReadingEventTypeV2::MaterialOpened); assert_eq!(ev.active_seconds_delta, 0); assert_eq!(ev.sequence, 1); - assert!(!ev.event_id.is_empty()); - assert_eq!(ev.material_id, "mat_1"); teardown_session(&sid); } @@ -186,35 +250,6 @@ mod tests { teardown_session(&sid); } - #[test] - fn test_push_position_changed() { - let sid = setup_session("mat_3"); - let pos = ReadingPosition::Pdf { page_number: 3, page_progress: 0.5, overall_progress: 0.1 }; - let ev = push_position_changed_v2(&sid, "mat_3", pos.clone(), 2000).unwrap(); - assert_eq!(ev.event_type, ReadingEventTypeV2::PositionChanged); - assert_eq!(ev.active_seconds_delta, 0); - assert!(ev.position.is_some()); - teardown_session(&sid); - } - - #[test] - fn test_push_heartbeat() { - let sid = setup_session("mat_4"); - let ev = push_heartbeat_v2(&sid, "mat_4", 15, None, 3000).unwrap(); - assert_eq!(ev.event_type, ReadingEventTypeV2::Heartbeat); - assert_eq!(ev.active_seconds_delta, 15); - teardown_session(&sid); - } - - #[test] - fn test_push_marked_as_read() { - let sid = setup_session("mat_5"); - let ev = push_marked_as_read_v2(&sid, "mat_5", 4000).unwrap(); - assert_eq!(ev.event_type, ReadingEventTypeV2::MarkedAsRead); - assert_eq!(ev.active_seconds_delta, 0); - teardown_session(&sid); - } - #[test] fn test_sequence_increments() { let sid = setup_session("mat_seq"); @@ -228,20 +263,74 @@ mod tests { } #[test] - fn test_export_and_clear() { + fn test_export_ack_flow() { clear_all_events_v2(); - let sid = setup_session("mat_export"); - push_material_opened_v2(&sid, "mat_export", 1000).unwrap(); - push_heartbeat_v2(&sid, "mat_export", 15, None, 2000).unwrap(); - let before = buffer_size_v2(); - assert!(before >= 2, "expected >=2 events, got {before}"); - let exported = export_pending_events_v2(); - assert_eq!(exported.len(), before); - clear_exported_events_v2(before); - assert_eq!(buffer_size_v2(), 0); + let sid = setup_session("mat_ack"); + let e1 = push_material_opened_v2(&sid, "mat_ack", 1000).unwrap(); + let e2 = push_heartbeat_v2(&sid, "mat_ack", 15, None, 2000).unwrap(); + let my_ids = [e1.event_id.clone(), e2.event_id.clone()]; + + // Export: should return at least our 2 events + let exported = export_pending_events_v2(100, 5000); + assert!(exported.len() >= 2); + + // Export again: our events should NOT be re-exported (already Exported) + let exported2 = export_pending_events_v2(100, 6000); + let ours_re_exported = exported2.iter().any(|e| my_ids.contains(&e.event_id)); + assert!(!ours_re_exported, "exported events should not be re-exported"); + + // Ack: remove our 2 events + let removed = ack_events_v2(&my_ids); + assert_eq!(removed, 2); + teardown_session(&sid); } + #[test] + fn test_mark_failed_and_retry() { + clear_all_events_v2(); + let sid = setup_session("mat_retry"); + let e1 = push_heartbeat_v2(&sid, "mat_retry", 15, None, 1000).unwrap(); + + // Export: exports pending events + let exported = export_pending_events_v2(100, 2000); + assert!(exported.iter().any(|e| e.event_id == e1.event_id)); + + // Mark as failed (simulating upload failure) + let marked = mark_events_failed_v2(&[e1.event_id.clone()]); + assert_eq!(marked, 1); + + // Export again: should return the failed event for retry + let retry = export_pending_events_v2(100, 3000); + assert!(retry.iter().any(|e| e.event_id == e1.event_id)); + + // Ack + ack_events_v2(&[e1.event_id.clone()]); + + teardown_session(&sid); + } + + #[test] + fn test_buffer_size_not_exceeded() { + clear_all_events_v2(); + let sid = setup_session("mat_overflow"); + let sz = buffer_size_v2(); + // Push enough to reach or exceed MAX + for i in 0..(MAX_BUFFER_SIZE + 10).saturating_sub(sz) { + push_material_opened_v2(&sid, "mat_overflow", 1000 + i as i64).unwrap(); + } + // Buffer should never exceed MAX + assert!(buffer_size_v2() <= MAX_BUFFER_SIZE); + teardown_session(&sid); + } + + #[test] + fn test_ack_nonexistent_no_crash() { + clear_all_events_v2(); + let removed = ack_events_v2(&["nonexistent".to_string()]); + assert_eq!(removed, 0); + } + #[test] fn test_closed_session_rejects() { let sid = setup_session("mat_reject"); @@ -258,21 +347,6 @@ mod tests { assert!(json.contains("materialOpened") || json.contains("\"type\":\"MaterialOpened\"")); let back: ReadingEventV2 = serde_json::from_str(&json).unwrap(); assert_eq!(back.event_id, ev.event_id); - assert_eq!(back.sequence, 1); - teardown_session(&sid); - } - - #[test] - fn test_position_serde() { - let sid = setup_session("mat_pos"); - let pos = ReadingPosition::Markdown { - block_id: "h1".into(), - scroll_progress: 0.5, - }; - let ev = push_position_changed_v2(&sid, "mat_pos", pos, 2000).unwrap(); - let json = serde_json::to_string(&ev).unwrap(); - // Position JSON should have camelCase fields - assert!(json.contains("blockId") || json.contains("block_id")); teardown_session(&sid); } }