feat: DOC-FULL-006 EventBuffer ack/failed/state

BufferedReadingEventV2{event, state: Pending|Exported|Failed, retry_count}
export_pending_events_v2(limit, ts) - marks as Exported
ack_events_v2(eventIds) - removes by eventId
mark_events_failed_v2(eventIds) - for retry
Overflow: Failed→Exported→oldest eviction

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
wangdl 2026-06-07 19:54:34 +08:00
parent 661d21de8f
commit 366ed88317

View File

@ -7,9 +7,24 @@ use crate::session_v2;
const MAX_BUFFER_SIZE: usize = 1000;
fn buffer() -> &'static Mutex<Vec<ReadingEventV2>> {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum BufferedEventState {
Pending,
Exported,
Failed,
}
#[derive(Debug, Clone)]
pub struct BufferedReadingEventV2 {
pub event: ReadingEventV2,
pub state: BufferedEventState,
pub exported_at_ms: Option<i64>,
pub retry_count: u32,
}
fn buffer() -> &'static Mutex<Vec<BufferedReadingEventV2>> {
use std::sync::OnceLock;
static BUF: OnceLock<Mutex<Vec<ReadingEventV2>>> = OnceLock::new();
static BUF: OnceLock<Mutex<Vec<BufferedReadingEventV2>>> = OnceLock::new();
BUF.get_or_init(|| Mutex::new(Vec::new()))
}
@ -68,14 +83,25 @@ fn push_event(
sequence: seq,
};
let buffered = BufferedReadingEventV2 {
event: event.clone(),
state: BufferedEventState::Pending,
exported_at_ms: None,
retry_count: 0,
};
match buffer().lock() {
Ok(mut buf) => {
if buf.len() >= MAX_BUFFER_SIZE {
buf.remove(0);
// Evict in order: Failed → Exported → oldest Pending
let idx = buf.iter().position(|b| b.state == BufferedEventState::Failed)
.or_else(|| buf.iter().position(|b| b.state == BufferedEventState::Exported))
.unwrap_or(0);
buf.remove(idx);
}
buf.push(event.clone());
buf.push(buffered);
}
Err(_) => { /* poison: drop */ }
Err(_) => {}
}
Ok(event)
@ -125,16 +151,56 @@ pub fn push_marked_as_read_v2(
push_event(session_id, material_id, ReadingEventTypeV2::MarkedAsRead, None, 0, timestamp_ms)
}
// ── Buffer Management ──
// ── Buffer Management (V2 with ack) ──
pub fn export_pending_events_v2() -> Vec<ReadingEventV2> {
buffer().lock().map(|buf| buf.clone()).unwrap_or_default()
/// Export Pending and Failed events. Marks them as Exported.
pub fn export_pending_events_v2(limit: u32, timestamp_ms: i64) -> Vec<ReadingEventV2> {
match buffer().lock() {
Ok(mut buf) => {
let mut result = Vec::new();
let mut count = 0u32;
for item in buf.iter_mut() {
if count >= limit { break; }
if item.state == BufferedEventState::Pending || item.state == BufferedEventState::Failed {
item.state = BufferedEventState::Exported;
item.exported_at_ms = Some(timestamp_ms);
result.push(item.event.clone());
count += 1;
}
}
result
}
Err(_) => Vec::new(),
}
}
pub fn clear_exported_events_v2(count: usize) {
if let Ok(mut buf) = buffer().lock() {
let n = count.min(buf.len());
buf.drain(..n);
/// Acknowledge events by eventId. Removes them from the buffer.
pub fn ack_events_v2(event_ids: &[String]) -> u32 {
match buffer().lock() {
Ok(mut buf) => {
let before = buf.len();
buf.retain(|item| !event_ids.contains(&item.event.event_id));
(before - buf.len()) as u32
}
Err(_) => 0,
}
}
/// Mark exported events as Failed (for retry).
pub fn mark_events_failed_v2(event_ids: &[String]) -> u32 {
match buffer().lock() {
Ok(mut buf) => {
let mut count = 0;
for item in buf.iter_mut() {
if event_ids.contains(&item.event.event_id) {
item.state = BufferedEventState::Failed;
item.retry_count += 1;
count += 1;
}
}
count
}
Err(_) => 0,
}
}
@ -172,8 +238,6 @@ mod tests {
assert_eq!(ev.event_type, ReadingEventTypeV2::MaterialOpened);
assert_eq!(ev.active_seconds_delta, 0);
assert_eq!(ev.sequence, 1);
assert!(!ev.event_id.is_empty());
assert_eq!(ev.material_id, "mat_1");
teardown_session(&sid);
}
@ -186,35 +250,6 @@ mod tests {
teardown_session(&sid);
}
#[test]
fn test_push_position_changed() {
let sid = setup_session("mat_3");
let pos = ReadingPosition::Pdf { page_number: 3, page_progress: 0.5, overall_progress: 0.1 };
let ev = push_position_changed_v2(&sid, "mat_3", pos.clone(), 2000).unwrap();
assert_eq!(ev.event_type, ReadingEventTypeV2::PositionChanged);
assert_eq!(ev.active_seconds_delta, 0);
assert!(ev.position.is_some());
teardown_session(&sid);
}
#[test]
fn test_push_heartbeat() {
let sid = setup_session("mat_4");
let ev = push_heartbeat_v2(&sid, "mat_4", 15, None, 3000).unwrap();
assert_eq!(ev.event_type, ReadingEventTypeV2::Heartbeat);
assert_eq!(ev.active_seconds_delta, 15);
teardown_session(&sid);
}
#[test]
fn test_push_marked_as_read() {
let sid = setup_session("mat_5");
let ev = push_marked_as_read_v2(&sid, "mat_5", 4000).unwrap();
assert_eq!(ev.event_type, ReadingEventTypeV2::MarkedAsRead);
assert_eq!(ev.active_seconds_delta, 0);
teardown_session(&sid);
}
#[test]
fn test_sequence_increments() {
let sid = setup_session("mat_seq");
@ -228,20 +263,74 @@ mod tests {
}
#[test]
fn test_export_and_clear() {
fn test_export_ack_flow() {
clear_all_events_v2();
let sid = setup_session("mat_export");
push_material_opened_v2(&sid, "mat_export", 1000).unwrap();
push_heartbeat_v2(&sid, "mat_export", 15, None, 2000).unwrap();
let before = buffer_size_v2();
assert!(before >= 2, "expected >=2 events, got {before}");
let exported = export_pending_events_v2();
assert_eq!(exported.len(), before);
clear_exported_events_v2(before);
assert_eq!(buffer_size_v2(), 0);
let sid = setup_session("mat_ack");
let e1 = push_material_opened_v2(&sid, "mat_ack", 1000).unwrap();
let e2 = push_heartbeat_v2(&sid, "mat_ack", 15, None, 2000).unwrap();
let my_ids = [e1.event_id.clone(), e2.event_id.clone()];
// Export: should return at least our 2 events
let exported = export_pending_events_v2(100, 5000);
assert!(exported.len() >= 2);
// Export again: our events should NOT be re-exported (already Exported)
let exported2 = export_pending_events_v2(100, 6000);
let ours_re_exported = exported2.iter().any(|e| my_ids.contains(&e.event_id));
assert!(!ours_re_exported, "exported events should not be re-exported");
// Ack: remove our 2 events
let removed = ack_events_v2(&my_ids);
assert_eq!(removed, 2);
teardown_session(&sid);
}
#[test]
fn test_mark_failed_and_retry() {
clear_all_events_v2();
let sid = setup_session("mat_retry");
let e1 = push_heartbeat_v2(&sid, "mat_retry", 15, None, 1000).unwrap();
// Export: exports pending events
let exported = export_pending_events_v2(100, 2000);
assert!(exported.iter().any(|e| e.event_id == e1.event_id));
// Mark as failed (simulating upload failure)
let marked = mark_events_failed_v2(&[e1.event_id.clone()]);
assert_eq!(marked, 1);
// Export again: should return the failed event for retry
let retry = export_pending_events_v2(100, 3000);
assert!(retry.iter().any(|e| e.event_id == e1.event_id));
// Ack
ack_events_v2(&[e1.event_id.clone()]);
teardown_session(&sid);
}
#[test]
fn test_buffer_size_not_exceeded() {
clear_all_events_v2();
let sid = setup_session("mat_overflow");
let sz = buffer_size_v2();
// Push enough to reach or exceed MAX
for i in 0..(MAX_BUFFER_SIZE + 10).saturating_sub(sz) {
push_material_opened_v2(&sid, "mat_overflow", 1000 + i as i64).unwrap();
}
// Buffer should never exceed MAX
assert!(buffer_size_v2() <= MAX_BUFFER_SIZE);
teardown_session(&sid);
}
#[test]
fn test_ack_nonexistent_no_crash() {
clear_all_events_v2();
let removed = ack_events_v2(&["nonexistent".to_string()]);
assert_eq!(removed, 0);
}
#[test]
fn test_closed_session_rejects() {
let sid = setup_session("mat_reject");
@ -258,21 +347,6 @@ mod tests {
assert!(json.contains("materialOpened") || json.contains("\"type\":\"MaterialOpened\""));
let back: ReadingEventV2 = serde_json::from_str(&json).unwrap();
assert_eq!(back.event_id, ev.event_id);
assert_eq!(back.sequence, 1);
teardown_session(&sid);
}
#[test]
fn test_position_serde() {
let sid = setup_session("mat_pos");
let pos = ReadingPosition::Markdown {
block_id: "h1".into(),
scroll_progress: 0.5,
};
let ev = push_position_changed_v2(&sid, "mat_pos", pos, 2000).unwrap();
let json = serde_json::to_string(&ev).unwrap();
// Position JSON should have camelCase fields
assert!(json.contains("blockId") || json.contains("block_id"));
teardown_session(&sid);
}
}