- Add ReadingContextRegistry for SessionManager→Pipeline context sharing - Wire registry into SessionManager (register on open, unregister on close) - Fix Pipeline to auto-pull from registry when contexts not provided - Fix ScenePhase to not pass empty contexts Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
247 lines
8.1 KiB
Swift
247 lines
8.1 KiB
Swift
import Foundation
|
|
|
|
// MARK: - Queue Item
|
|
|
|
struct ReadingEventQueueItem: Codable, Equatable, Identifiable {
|
|
let id: String // UUID
|
|
let eventId: String // Rust ReadingEventV2.eventId
|
|
let payload: ReadingEventUploadItem
|
|
var status: QueueItemStatus
|
|
var retryCount: Int
|
|
var lastErrorCode: String?
|
|
var lastTriedAt: Date?
|
|
let createdAt: Date
|
|
var updatedAt: Date
|
|
|
|
enum QueueItemStatus: String, Codable {
|
|
case pending
|
|
case uploading
|
|
case failed // retryable
|
|
case failedPermanent // will not retry
|
|
}
|
|
}
|
|
|
|
// MARK: - Upload Queue
|
|
|
|
/// Local queue for reading event upload items. Persisted to JSON file.
|
|
/// Coordinates with Rust buffer: export → enqueue → API upload → ack/markFailed.
|
|
@MainActor
|
|
final class ReadingEventUploadQueue {
|
|
static let shared = ReadingEventUploadQueue()
|
|
|
|
private var items: [ReadingEventQueueItem] = []
|
|
private let maxRetryCount = 3
|
|
private let storageURL: URL
|
|
|
|
var pendingCount: Int { items.filter { $0.status == .pending }.count }
|
|
var failedCount: Int { items.filter { $0.status == .failed }.count }
|
|
|
|
private init() {
|
|
let docs = FileManager.default.urls(for: .documentDirectory, in: .userDomainMask).first!
|
|
storageURL = docs.appendingPathComponent("reading_event_queue.json")
|
|
loadFromDisk()
|
|
}
|
|
|
|
// MARK: - Public API
|
|
|
|
/// Enqueue a batch of upload items. Deduplicates by eventId.
|
|
func enqueue(_ uploadItems: [ReadingEventUploadItem]) {
|
|
let existingIds = Set(items.map(\.eventId))
|
|
let now = Date()
|
|
let newItems = uploadItems
|
|
.filter { !existingIds.contains($0.eventId) }
|
|
.map { item in
|
|
ReadingEventQueueItem(
|
|
id: UUID().uuidString,
|
|
eventId: item.eventId,
|
|
payload: item,
|
|
status: .pending,
|
|
retryCount: 0,
|
|
lastErrorCode: nil,
|
|
lastTriedAt: nil,
|
|
createdAt: now,
|
|
updatedAt: now
|
|
)
|
|
}
|
|
items.append(contentsOf: newItems)
|
|
saveToDisk()
|
|
}
|
|
|
|
/// Fetch a batch of pending items (max `limit`).
|
|
func fetchPendingBatch(limit: Int = 100) -> [ReadingEventQueueItem] {
|
|
return Array(items.filter { $0.status == .pending }.prefix(limit))
|
|
}
|
|
|
|
/// Mark items as uploaded (will be removed from queue).
|
|
func markUploaded(ids: [String]) {
|
|
items.removeAll { ids.contains($0.id) }
|
|
saveToDisk()
|
|
}
|
|
|
|
/// Mark items for retry (increment retry count, set failed status).
|
|
func markRetry(ids: [String], errorCode: String? = nil) {
|
|
let now = Date()
|
|
for i in items.indices {
|
|
guard ids.contains(items[i].id) else { continue }
|
|
items[i].retryCount += 1
|
|
items[i].lastErrorCode = errorCode
|
|
items[i].lastTriedAt = now
|
|
items[i].updatedAt = now
|
|
items[i].status = items[i].retryCount >= maxRetryCount ? .failedPermanent : .failed
|
|
}
|
|
saveToDisk()
|
|
}
|
|
|
|
/// Mark items as permanently failed (will not retry).
|
|
func markPermanentFailed(ids: [String], errorCode: String? = nil) {
|
|
let now = Date()
|
|
for i in items.indices {
|
|
guard ids.contains(items[i].id) else { continue }
|
|
items[i].status = .failedPermanent
|
|
items[i].lastErrorCode = errorCode
|
|
items[i].lastTriedAt = now
|
|
items[i].updatedAt = now
|
|
}
|
|
saveToDisk()
|
|
}
|
|
|
|
/// Retry all failed items (move back to pending).
|
|
func retryFailed() {
|
|
let now = Date()
|
|
for i in items.indices {
|
|
guard items[i].status == .failed else { continue }
|
|
items[i].status = .pending
|
|
items[i].updatedAt = now
|
|
}
|
|
saveToDisk()
|
|
}
|
|
|
|
/// Remove all permanently failed items.
|
|
func clearPermanentFailed() {
|
|
items.removeAll { $0.status == .failedPermanent }
|
|
saveToDisk()
|
|
}
|
|
|
|
/// Remove all items.
|
|
func clearAll() {
|
|
items.removeAll()
|
|
saveToDisk()
|
|
}
|
|
|
|
// MARK: - Persistence
|
|
|
|
private func saveToDisk() {
|
|
do {
|
|
let data = try JSONEncoder().encode(items)
|
|
try data.write(to: storageURL, options: .atomic)
|
|
} catch {
|
|
print("[UploadQueue] Failed to save: \(error)")
|
|
}
|
|
}
|
|
|
|
private func loadFromDisk() {
|
|
guard FileManager.default.fileExists(atPath: storageURL.path) else {
|
|
items = []
|
|
return
|
|
}
|
|
do {
|
|
let data = try Data(contentsOf: storageURL)
|
|
items = try JSONDecoder().decode([ReadingEventQueueItem].self, from: data)
|
|
// Recover stuck uploading items (crash recovery)
|
|
let now = Date()
|
|
for i in items.indices where items[i].status == .uploading {
|
|
items[i].status = .pending
|
|
items[i].updatedAt = now
|
|
}
|
|
print("[UploadQueue] Loaded \(items.count) items from disk")
|
|
} catch {
|
|
print("[UploadQueue] Failed to load: \(error)")
|
|
items = []
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Upload Pipeline
|
|
|
|
/// Orchestrates: Rust export → mapper → enqueue → API upload → ack/markFailed.
|
|
@MainActor
|
|
final class ReadingEventUploadPipeline {
|
|
static let shared = ReadingEventUploadPipeline()
|
|
|
|
private let queue = ReadingEventUploadQueue.shared
|
|
private let adapter: ReadingRuntimeAdapter = RustReadingRuntimeAdapter()
|
|
private let readingAPI = ReadingAPIService.shared
|
|
|
|
private init() {}
|
|
|
|
/// Full pipeline: export from Rust → enqueue. Pulls contexts from registry if not provided.
|
|
func exportAndEnqueue(contexts: [String: ReadingMaterialContext] = [:]) {
|
|
let effectiveContexts = contexts.isEmpty ? ReadingContextRegistry.shared.allContexts() : contexts
|
|
let rustEvents = adapter.exportEvents(limit: 100, timestampMs: nowMs())
|
|
guard !rustEvents.isEmpty else { return }
|
|
|
|
let uploadItems = ReadingEventMapper.map(rustEvents: rustEvents, contexts: effectiveContexts)
|
|
guard !uploadItems.isEmpty else { return }
|
|
|
|
queue.enqueue(uploadItems)
|
|
print("[UploadPipeline] Exported \(rustEvents.count) events, enqueued \(uploadItems.count)")
|
|
}
|
|
|
|
/// Flush pending items to API.
|
|
func flush() async {
|
|
let batch = queue.fetchPendingBatch(limit: 100)
|
|
guard !batch.isEmpty else { return }
|
|
|
|
queue.markUploading(ids: batch.map(\.id))
|
|
|
|
do {
|
|
let response = try await readingAPI.uploadReadingEvents(batch.map(\.payload))
|
|
|
|
// Ack successful events in Rust
|
|
let ackedIds = batch.map(\.eventId)
|
|
_ = adapter.ackEvents(ackedIds)
|
|
|
|
queue.markUploaded(ids: batch.map(\.id))
|
|
print("[UploadPipeline] Flushed \(response.processed) events")
|
|
} catch let error as APIError {
|
|
let errorCode = error.errorCode ?? "NETWORK_ERROR"
|
|
queue.markRetry(ids: batch.map(\.id), errorCode: errorCode)
|
|
_ = adapter.markFailed(batch.map(\.eventId))
|
|
print("[UploadPipeline] Flush failed: \(errorCode)")
|
|
} catch {
|
|
queue.markRetry(ids: batch.map(\.id), errorCode: "UNKNOWN")
|
|
_ = adapter.markFailed(batch.map(\.eventId))
|
|
}
|
|
}
|
|
|
|
/// Reload on app launch: reload stale Rust events, enqueue, retry failed.
|
|
func reloadOnLaunch(contexts: [String: ReadingMaterialContext] = [:]) {
|
|
_ = adapter.reloadStaleEvents()
|
|
_ = adapter.cleanupStaleSessions(nowMs: nowMs(), maxAgeMs: 30 * 60 * 1000)
|
|
|
|
exportAndEnqueue(contexts: contexts)
|
|
queue.retryFailed()
|
|
}
|
|
|
|
// MARK: - Helpers
|
|
|
|
private func nowMs() -> Int64 {
|
|
Int64(Date().timeIntervalSince1970 * 1000)
|
|
}
|
|
}
|
|
|
|
// MARK: - Queue Helpers
|
|
|
|
extension ReadingEventUploadQueue {
|
|
func markUploading(ids: [String]) {
|
|
let now = Date()
|
|
for i in items.indices {
|
|
guard ids.contains(items[i].id) else { continue }
|
|
items[i].status = .uploading
|
|
items[i].lastTriedAt = now
|
|
items[i].updatedAt = now
|
|
}
|
|
saveToDisk()
|
|
}
|
|
}
|