Fix SSE stream closing issue and improve file monitoring

- Add periodic heartbeats (every 15 seconds) to keep SSE connections alive
- Enhance response headers to prevent proxy buffering (X-Accel-Buffering: no)
- Fix monitorFileChanges to read entire file from beginning like tail -f
- Process lines synchronously to maintain order
- Send initial connection message to establish stream immediately
- Remove empty line filtering to match tail -f behavior exactly

This ensures streams stay open indefinitely and don't timeout due to inactivity.
This commit is contained in:
Peter Steinberger 2025-06-16 07:45:02 +02:00
parent aae68479ee
commit ac13030b52
2 changed files with 52 additions and 36 deletions

View file

@ -768,8 +768,10 @@ public final class TunnelServer {
// Create SSE response with proper headers // Create SSE response with proper headers
let headers: HTTPFields = [ let headers: HTTPFields = [
.contentType: "text/event-stream", .contentType: "text/event-stream",
.cacheControl: "no-cache", .cacheControl: "no-cache, no-store, must-revalidate",
.connection: "keep-alive" .connection: "keep-alive",
.init("X-Accel-Buffering")!: "no", // Disable proxy buffering
.init("Access-Control-Allow-Origin")!: "*"
] ]
// Create async sequence for streaming // Create async sequence for streaming
@ -806,6 +808,11 @@ public final class TunnelServer {
fileMonitor?.cancel() fileMonitor?.cancel()
} }
// Send initial connection established message
var initialMessage = ByteBuffer()
initialMessage.writeString(": connected\n\n")
continuation.yield(initialMessage)
// Send existing content first // Send existing content first
do { do {
let content = try String(contentsOfFile: streamOutPath, encoding: .utf8) let content = try String(contentsOfFile: streamOutPath, encoding: .utf8)
@ -866,11 +873,21 @@ public final class TunnelServer {
continuation: continuation continuation: continuation
) )
// Wait for cancellation // Keep the stream open until cancelled with periodic heartbeats
await withTaskCancellationHandler { await withTaskCancellationHandler {
await withCheckedContinuation { continuation in // Send heartbeat every 15 seconds to keep connection alive
// This will suspend until cancelled while !Task.isCancelled {
continuation.resume() do {
try await Task.sleep(nanoseconds: 15_000_000_000) // 15 seconds
// Send SSE comment as heartbeat (comments start with ':')
var heartbeat = ByteBuffer()
heartbeat.writeString(": heartbeat\n\n")
continuation.yield(heartbeat)
} catch {
// Task was cancelled
break
}
} }
} onCancel: { [fileMonitor] in } onCancel: { [fileMonitor] in
fileMonitor?.cancel() fileMonitor?.cancel()
@ -894,44 +911,44 @@ public final class TunnelServer {
// Store buffer for incomplete lines // Store buffer for incomplete lines
var lineBuffer = "" var lineBuffer = ""
// Read existing file content first // Read entire file content from the beginning
let fileSize = lseek(fileDescriptor, 0, SEEK_END) let fileSize = lseek(fileDescriptor, 0, SEEK_END)
if fileSize > 0 { if fileSize > 0 {
// Read the entire file (or last portion if very large) // Seek to beginning
let maxInitialRead: Int64 = 1024 * 1024 // 1MB max initial read lseek(fileDescriptor, 0, SEEK_SET)
let readSize = min(fileSize, maxInitialRead)
let startOffset = max(0, fileSize - readSize)
lseek(fileDescriptor, startOffset, SEEK_SET) // Read entire file content
let buffer = UnsafeMutablePointer<CChar>.allocate(capacity: Int(fileSize) + 1)
let buffer = UnsafeMutablePointer<CChar>.allocate(capacity: Int(readSize) + 1)
defer { buffer.deallocate() } defer { buffer.deallocate() }
let bytesRead = read(fileDescriptor, buffer, Int(readSize)) var totalBytesRead = 0
if bytesRead > 0 { while totalBytesRead < fileSize {
let data = Data(bytes: buffer, count: bytesRead) let bytesRead = read(fileDescriptor, buffer + totalBytesRead, Int(fileSize) - totalBytesRead)
if bytesRead <= 0 { break }
totalBytesRead += bytesRead
}
if totalBytesRead > 0 {
let data = Data(bytes: buffer, count: totalBytesRead)
if let initialContent = String(data: data, encoding: .utf8) { if let initialContent = String(data: data, encoding: .utf8) {
lineBuffer = initialContent lineBuffer = initialContent
let lines = lineBuffer.components(separatedBy: .newlines) let lines = lineBuffer.components(separatedBy: .newlines)
// Process all complete lines from existing content // Process all complete lines synchronously to maintain order
if lines.count > 1 { for i in 0..<lines.count - 1 {
for i in 0..<(lines.count - 1) {
let line = lines[i] let line = lines[i]
Task { @MainActor in await processNewLine(
await self.processNewLine(
line: line, line: line,
startTime: startTime, startTime: startTime,
continuation: continuation continuation: continuation
) )
} }
}
// Keep the last incomplete line in buffer // Keep the last incomplete line in buffer
lineBuffer = lines.last ?? "" lineBuffer = lines.last ?? ""
} }
} }
} }
}
// Set position to current end for monitoring new content // Set position to current end for monitoring new content
var lastReadPosition = lseek(fileDescriptor, 0, SEEK_END) var lastReadPosition = lseek(fileDescriptor, 0, SEEK_END)
@ -978,11 +995,11 @@ public final class TunnelServer {
lineBuffer += contentString lineBuffer += contentString
let lines = lineBuffer.components(separatedBy: .newlines) let lines = lineBuffer.components(separatedBy: .newlines)
// Process all complete lines // Process all complete lines synchronously to maintain order
if lines.count > 1 { if lines.count > 1 {
Task { @MainActor in
for i in 0..<(lines.count - 1) { for i in 0..<(lines.count - 1) {
let line = lines[i] let line = lines[i]
Task { @MainActor in
await self.processNewLine( await self.processNewLine(
line: line, line: line,
startTime: startTime, startTime: startTime,
@ -1011,7 +1028,6 @@ public final class TunnelServer {
continuation: AsyncStream<ByteBuffer>.Continuation continuation: AsyncStream<ByteBuffer>.Continuation
) async { ) async {
let trimmedLine = line.trimmingCharacters(in: .whitespaces) let trimmedLine = line.trimmingCharacters(in: .whitespaces)
guard !trimmedLine.isEmpty else { return }
if let data = trimmedLine.data(using: .utf8), if let data = trimmedLine.data(using: .utf8),
let parsed = try? JSONSerialization.jsonObject(with: data) { let parsed = try? JSONSerialization.jsonObject(with: data) {