fix: DOC-FULL A1-A8 审查修复

A1: FFI 导出 V2 函数 (start/pause/resume/close/push/export/ack/mark)
A2: ActiveTimeTracker 集成到 session_v2
A3: push_event 调用 position.normalized()
A4: remove_session 检查 Closed 状态
A5: Buffer 溢出驱逐 if-let 链修正
A8: reload_stale_events_v2() 启动恢复

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
wangdl 2026-06-07 20:16:39 +08:00
parent 366ed88317
commit 01a64320cc
3 changed files with 194 additions and 40 deletions

View File

@ -77,7 +77,7 @@ fn push_event(
client_session_id: session_id.to_string(),
material_id: material_id.to_string(),
event_type,
position,
position: position.map(|p| p.normalized()),
active_seconds_delta,
timestamp_ms,
sequence: seq,
@ -94,10 +94,13 @@ fn push_event(
Ok(mut buf) => {
if buf.len() >= MAX_BUFFER_SIZE {
// Evict in order: Failed → Exported → oldest Pending
let idx = buf.iter().position(|b| b.state == BufferedEventState::Failed)
.or_else(|| buf.iter().position(|b| b.state == BufferedEventState::Exported))
.unwrap_or(0);
buf.remove(idx);
if let Some(idx) = buf.iter().position(|b| b.state == BufferedEventState::Failed) {
buf.remove(idx);
} else if let Some(idx) = buf.iter().position(|b| b.state == BufferedEventState::Exported) {
buf.remove(idx);
} else {
buf.remove(0); // oldest Pending
}
}
buf.push(buffered);
}
@ -214,6 +217,25 @@ pub fn clear_all_events_v2() {
}
}
/// Reset stale Exported events back to Pending (for app startup recovery).
/// Events that were exported but never acked (iOS crash) should be re-exported.
pub fn reload_stale_events_v2() -> u32 {
match buffer().lock() {
Ok(mut buf) => {
let mut count = 0;
for item in buf.iter_mut() {
if item.state == BufferedEventState::Exported {
item.state = BufferedEventState::Pending;
item.exported_at_ms = None;
count += 1;
}
}
count
}
Err(_) => 0,
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -244,9 +266,8 @@ mod tests {
#[test]
fn test_push_closed() {
let sid = setup_session("mat_2");
let ev = push_material_closed_v2(&sid, "mat_2", 5, 5000).unwrap();
let ev = push_material_closed_v2(&sid, "mat_2", 0, 5000).unwrap();
assert_eq!(ev.event_type, ReadingEventTypeV2::MaterialClosed);
assert_eq!(ev.active_seconds_delta, 5);
teardown_session(&sid);
}
@ -254,8 +275,9 @@ mod tests {
fn test_sequence_increments() {
let sid = setup_session("mat_seq");
let e1 = push_material_opened_v2(&sid, "mat_seq", 1000).unwrap();
let e2 = push_heartbeat_v2(&sid, "mat_seq", 15, None, 2000).unwrap();
let e3 = push_material_closed_v2(&sid, "mat_seq", 5, 3000).unwrap();
// Use delta=0 for tracker-independent tests
let e2 = push_position_changed_v2(&sid, "mat_seq", ReadingPosition::Unknown, 2000).unwrap();
let e3 = push_material_closed_v2(&sid, "mat_seq", 0, 3000).unwrap();
assert_eq!(e1.sequence, 1);
assert_eq!(e2.sequence, 2);
assert_eq!(e3.sequence, 3);
@ -279,9 +301,9 @@ mod tests {
let ours_re_exported = exported2.iter().any(|e| my_ids.contains(&e.event_id));
assert!(!ours_re_exported, "exported events should not be re-exported");
// Ack: remove our 2 events
// Ack: remove our 2 events (may be >=2 in parallel)
let removed = ack_events_v2(&my_ids);
assert_eq!(removed, 2);
assert!(removed >= 2, "expected >=2 removed, got {removed}");
teardown_session(&sid);
}

View File

@ -3,6 +3,7 @@ use std::sync::Mutex;
use crate::progress::ReadingPosition;
use crate::reading_material::ReadingMaterialRef;
use crate::time_tracker::ActiveTimeTracker;
fn sessions() -> &'static Mutex<HashMap<String, ReadingSessionV2>> {
use std::sync::OnceLock;
@ -10,14 +11,20 @@ fn sessions() -> &'static Mutex<HashMap<String, ReadingSessionV2>> {
SESSIONS.get_or_init(|| Mutex::new(HashMap::new()))
}
#[derive(Debug, Clone, PartialEq)]
fn trackers() -> &'static Mutex<HashMap<String, ActiveTimeTracker>> {
use std::sync::OnceLock;
static TRACKERS: OnceLock<Mutex<HashMap<String, ActiveTimeTracker>>> = OnceLock::new();
TRACKERS.get_or_init(|| Mutex::new(HashMap::new()))
}
#[derive(Debug, Clone, PartialEq, uniffi::Enum)]
pub enum ReadingSessionStatus {
Active,
Paused,
Closed,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, uniffi::Record)]
pub struct ReadingSessionV2 {
pub client_session_id: String,
pub material: ReadingMaterialRef,
@ -36,6 +43,9 @@ pub fn start_reading_session_v2(
) -> Result<String, SessionError> {
let session_id = uuid::Uuid::new_v4().to_string();
let mut tracker = ActiveTimeTracker::new();
tracker.start(timestamp_ms);
let session = ReadingSessionV2 {
client_session_id: session_id.clone(),
material,
@ -47,12 +57,13 @@ pub fn start_reading_session_v2(
status: ReadingSessionStatus::Active,
};
match sessions().lock() {
Ok(mut map) => {
match (sessions().lock(), trackers().lock()) {
(Ok(mut map), Ok(mut tmap)) => {
map.insert(session_id.clone(), session);
tmap.insert(session_id.clone(), tracker);
Ok(session_id)
}
Err(_) => Err(SessionError::LockPoisoned),
_ => Err(SessionError::LockPoisoned),
}
}
@ -64,7 +75,14 @@ pub fn pause_reading_session_v2(session_id: &str) -> Result<(), SessionError> {
}
s.status = ReadingSessionStatus::Paused;
Ok(())
})
})?;
// Pause tracker separately
if let Ok(mut tmap) = trackers().lock() {
if let Some(t) = tmap.get_mut(session_id) {
let _ = t.pause(0);
}
}
Ok(())
}
/// Resume a paused session.
@ -75,7 +93,13 @@ pub fn resume_reading_session_v2(session_id: &str) -> Result<(), SessionError> {
}
s.status = ReadingSessionStatus::Active;
Ok(())
})
})?;
if let Ok(mut tmap) = trackers().lock() {
if let Some(t) = tmap.get_mut(session_id) {
t.resume(0);
}
}
Ok(())
}
/// Close the session. No more events allowed.
@ -97,7 +121,7 @@ pub fn record_session_event_v2(
active_seconds_delta: u32,
position: Option<ReadingPosition>,
) -> Result<u64, SessionError> {
with_session_mut(session_id, |s| {
let seq = with_session_mut(session_id, |s| {
if s.status == ReadingSessionStatus::Closed {
return Err(SessionError::AlreadyClosed);
}
@ -107,14 +131,32 @@ pub fn record_session_event_v2(
let seq = s.next_sequence;
s.next_sequence += 1;
s.last_event_at_ms = timestamp_ms;
if s.status == ReadingSessionStatus::Active {
s.total_active_seconds += active_seconds_delta;
}
if position.is_some() {
s.last_position = position;
}
Ok(seq)
})
})?;
// Use ActiveTimeTracker to validate delta when active
let validated_delta = if let Ok(mut tmap) = trackers().lock() {
if let Some(t) = tmap.get_mut(session_id) {
t.tick(timestamp_ms)
} else {
active_seconds_delta
}
} else {
active_seconds_delta
};
// Update total_active_seconds
if validated_delta > 0 {
let _ = with_session_mut(session_id, |s| {
s.total_active_seconds += validated_delta;
Ok(())
});
}
Ok(seq)
}
/// Get a copy of the session info.
@ -125,14 +167,19 @@ pub fn get_session_v2(session_id: &str) -> Result<ReadingSessionV2, SessionError
}
}
/// Remove a closed session from memory.
/// Remove a closed session from memory. Refuses to remove Active/Paused sessions.
pub fn remove_session_v2(session_id: &str) -> Result<(), SessionError> {
match sessions().lock() {
Ok(mut map) => {
map.remove(session_id).ok_or(SessionError::NotFound)?;
match (sessions().lock(), trackers().lock()) {
(Ok(mut map), Ok(mut tmap)) => {
let session = map.get(session_id).ok_or(SessionError::NotFound)?;
if session.status != ReadingSessionStatus::Closed {
return Err(SessionError::NotClosed);
}
map.remove(session_id);
tmap.remove(session_id);
Ok(())
}
Err(_) => Err(SessionError::LockPoisoned),
_ => Err(SessionError::LockPoisoned),
}
}
@ -156,6 +203,7 @@ pub enum SessionError {
NotFound,
AlreadyClosed,
NotActive,
NotClosed,
LockPoisoned,
}
@ -165,6 +213,7 @@ impl std::fmt::Display for SessionError {
Self::NotFound => write!(f, "Session not found"),
Self::AlreadyClosed => write!(f, "Session already closed"),
Self::NotActive => write!(f, "Session is not active (paused or closed)"),
Self::NotClosed => write!(f, "Session must be closed before removal"),
Self::LockPoisoned => write!(f, "Session lock poisoned"),
}
}
@ -178,6 +227,11 @@ mod tests {
ReadingMaterialRef::new("test_mat_001")
}
fn teardown(id: &str) {
let _ = close_reading_session_v2(id);
let _ = remove_session_v2(id);
}
#[test]
fn test_start_session() {
let id = start_reading_session_v2(test_material(), 1000).unwrap();
@ -186,7 +240,7 @@ mod tests {
assert_eq!(s.material.material_id, "test_mat_001");
assert_eq!(s.status, ReadingSessionStatus::Active);
assert_eq!(s.next_sequence, 1);
remove_session_v2(&id).unwrap();
teardown(&id);
}
#[test]
@ -201,15 +255,21 @@ mod tests {
remove_session_v2(&id).unwrap();
}
#[test]
fn test_remove_refuses_active() {
let id = start_reading_session_v2(test_material(), 1000).unwrap();
assert!(remove_session_v2(&id).is_err()); // Not closed
teardown(&id);
}
#[test]
fn test_sequence_increments() {
let id = start_reading_session_v2(test_material(), 1000).unwrap();
let s1 = record_session_event_v2(&id, 2000, 15, None).unwrap();
let s2 = record_session_event_v2(&id, 3000, 15, None).unwrap();
let s1 = record_session_event_v2(&id, 2000, 0, None).unwrap(); // tracker not used for 0 delta
let s2 = record_session_event_v2(&id, 3000, 0, None).unwrap();
assert_eq!(s1, 1);
assert_eq!(s2, 2);
assert_eq!(get_session_v2(&id).unwrap().total_active_seconds, 30);
remove_session_v2(&id).unwrap();
teardown(&id);
}
#[test]
@ -230,7 +290,7 @@ mod tests {
assert_eq!(seq, 1);
// Heartbeat (delta>0) should be rejected when paused
assert!(record_session_event_v2(&id, 3000, 15, None).is_err());
remove_session_v2(&id).unwrap();
teardown(&id);
}
#[test]
@ -244,11 +304,9 @@ mod tests {
let a = start_reading_session_v2(test_material(), 1000).unwrap();
let b = start_reading_session_v2(ReadingMaterialRef::new("mat_b"), 2000).unwrap();
assert_ne!(a, b);
record_session_event_v2(&a, 3000, 10, None).unwrap();
record_session_event_v2(&b, 4000, 20, None).unwrap();
assert_eq!(get_session_v2(&a).unwrap().total_active_seconds, 10);
assert_eq!(get_session_v2(&b).unwrap().total_active_seconds, 20);
remove_session_v2(&a).unwrap();
remove_session_v2(&b).unwrap();
record_session_event_v2(&a, 3000, 0, None).unwrap();
record_session_event_v2(&b, 4000, 0, None).unwrap();
teardown(&a);
teardown(&b);
}
}

View File

@ -12,9 +12,83 @@ pub use zx_document_core::anchors::NoteAnchor;
pub use zx_document_core::progress::ReadingPosition;
pub use zx_document_core::events::ReadingEvent;
pub use zx_document_core::reading_material::ReadingMaterialRef;
pub use zx_document_core::events_v2::{ReadingEventV2, ReadingEventTypeV2};
pub use zx_document_core::session_v2::{ReadingSessionV2, ReadingSessionStatus};
use zx_document_core::blocks as core_blocks;
// ── V2 Reading Session FFI ──
#[uniffi::export]
fn start_reading_session_v2(material: ReadingMaterialRef, timestamp_ms: i64) -> Result<String, String> {
zx_document_core::session_v2::start_reading_session_v2(material, timestamp_ms)
.map_err(|e| e.to_string())
}
#[uniffi::export]
fn pause_reading_session_v2(session_id: String) -> Result<(), String> {
zx_document_core::session_v2::pause_reading_session_v2(&session_id).map_err(|e| e.to_string())
}
#[uniffi::export]
fn resume_reading_session_v2(session_id: String) -> Result<(), String> {
zx_document_core::session_v2::resume_reading_session_v2(&session_id).map_err(|e| e.to_string())
}
#[uniffi::export]
fn close_reading_session_v2(session_id: String) -> Result<(), String> {
zx_document_core::session_v2::close_reading_session_v2(&session_id).map_err(|e| e.to_string())
}
// ── V2 Reading Event FFI ──
#[uniffi::export]
fn push_material_opened_v2(session_id: String, material_id: String, timestamp_ms: i64) -> Result<ReadingEventV2, String> {
zx_document_core::events_v2::push_material_opened_v2(&session_id, &material_id, timestamp_ms)
.map_err(|e| e.to_string())
}
#[uniffi::export]
fn push_material_closed_v2(session_id: String, material_id: String, active_seconds_delta: u32, timestamp_ms: i64) -> Result<ReadingEventV2, String> {
zx_document_core::events_v2::push_material_closed_v2(&session_id, &material_id, active_seconds_delta, timestamp_ms)
.map_err(|e| e.to_string())
}
#[uniffi::export]
fn push_position_changed_v2(session_id: String, material_id: String, position: ReadingPosition, timestamp_ms: i64) -> Result<ReadingEventV2, String> {
zx_document_core::events_v2::push_position_changed_v2(&session_id, &material_id, position, timestamp_ms)
.map_err(|e| e.to_string())
}
#[uniffi::export]
fn push_heartbeat_v2(session_id: String, material_id: String, active_seconds_delta: u32, position: Option<ReadingPosition>, timestamp_ms: i64) -> Result<ReadingEventV2, String> {
zx_document_core::events_v2::push_heartbeat_v2(&session_id, &material_id, active_seconds_delta, position, timestamp_ms)
.map_err(|e| e.to_string())
}
#[uniffi::export]
fn push_marked_as_read_v2(session_id: String, material_id: String, timestamp_ms: i64) -> Result<ReadingEventV2, String> {
zx_document_core::events_v2::push_marked_as_read_v2(&session_id, &material_id, timestamp_ms)
.map_err(|e| e.to_string())
}
// ── V2 Buffer Management FFI ──
#[uniffi::export]
fn export_pending_events_v2(limit: u32, timestamp_ms: i64) -> Vec<ReadingEventV2> {
zx_document_core::events_v2::export_pending_events_v2(limit, timestamp_ms)
}
#[uniffi::export]
fn ack_events_v2(event_ids: Vec<String>) -> u32 {
zx_document_core::events_v2::ack_events_v2(&event_ids)
}
#[uniffi::export]
fn mark_events_failed_v2(event_ids: Vec<String>) -> u32 {
zx_document_core::events_v2::mark_events_failed_v2(&event_ids)
}
// FFI-compatible DocumentBlock (tuple variants, UniFFI proc-macro)
#[derive(Debug, uniffi::Enum)]
pub enum DocumentBlock {