Implement half broken SSE

This commit is contained in:
Armin Ronacher 2025-06-16 06:02:43 +02:00
parent e27657a9fe
commit e8fb468a19

View file

@ -733,14 +733,223 @@ public final class TunnelServer {
guard FileManager.default.fileExists(atPath: streamOutPath) else {
return errorResponse(message: "Session not found", status: .notFound)
}
// For now, return a simple response indicating streaming would happen here
// Full SSE implementation would require more complex Hummingbird setup
let response = StreamResponse(
message: "Streaming endpoint - SSE implementation needed",
streamPath: streamOutPath
// Create SSE response with proper headers
let headers: HTTPFields = [
.contentType: "text/event-stream",
.cacheControl: "no-cache",
.connection: "keep-alive"
]
// Create async sequence for streaming
let stream = AsyncStream<ByteBuffer> { continuation in
let task = Task {
await self.streamFileContents(
streamOutPath: streamOutPath,
continuation: continuation
)
}
continuation.onTermination = { _ in
task.cancel()
}
}
return Response(
status: .ok,
headers: headers,
body: ResponseBody(asyncSequence: stream)
)
return jsonResponse(response)
}
private func streamFileContents(
streamOutPath: String,
continuation: AsyncStream<ByteBuffer>.Continuation
) async {
let startTime = Date()
var headerSent = false
var tailProcess: Process?
defer {
// Ensure tail process is terminated when function exits
if let process = tailProcess, process.isRunning {
process.terminate()
}
}
// Send existing content first
do {
let content = try String(contentsOfFile: streamOutPath, encoding: .utf8)
let lines = content.components(separatedBy: .newlines)
for line in lines {
let trimmedLine = line.trimmingCharacters(in: .whitespaces)
if !trimmedLine.isEmpty {
if let data = trimmedLine.data(using: .utf8),
let parsed = try? JSONSerialization.jsonObject(with: data) {
if let dict = parsed as? [String: Any],
dict["version"] != nil && dict["width"] != nil && dict["height"] != nil {
// Send header
var buffer = ByteBuffer()
buffer.writeString("data: \(trimmedLine)\n\n")
continuation.yield(buffer)
headerSent = true
} else if let array = parsed as? [Any], array.count >= 3 {
// Send event with instant timestamp (0)
let instantEvent = [0.0, array[1], array[2]]
if let eventData = try? JSONSerialization.data(withJSONObject: instantEvent),
let eventString = String(data: eventData, encoding: .utf8) {
var buffer = ByteBuffer()
buffer.writeString("data: \(eventString)\n\n")
continuation.yield(buffer)
}
}
}
}
}
} catch {
logger.error("Error reading existing content: \(error)")
}
// Send default header if none found
if !headerSent {
let defaultHeader: [String: Any] = [
"version": 2,
"width": 80,
"height": 24,
"timestamp": Int(startTime.timeIntervalSince1970),
"env": ["TERM": "xterm-256color"]
]
if let headerData = try? JSONSerialization.data(withJSONObject: defaultHeader),
let headerString = String(data: headerData, encoding: .utf8) {
var buffer = ByteBuffer()
buffer.writeString("data: \(headerString)\n\n")
continuation.yield(buffer)
}
}
// Stream new content by monitoring file changes
tailProcess = await monitorFileChanges(
streamOutPath: streamOutPath,
startTime: startTime,
continuation: continuation
)
continuation.finish()
}
private func monitorFileChanges(
streamOutPath: String,
startTime: Date,
continuation: AsyncStream<ByteBuffer>.Continuation
) async -> Process? {
// Use tail -f to monitor file changes
let tailProcess = Process()
tailProcess.executableURL = URL(fileURLWithPath: "/usr/bin/tail")
tailProcess.arguments = ["-f", streamOutPath]
let outputPipe = Pipe()
tailProcess.standardOutput = outputPipe
do {
try tailProcess.run()
// Process output from tail -f
let outputHandle = outputPipe.fileHandleForReading
var buffer = ""
// Read data asynchronously
for try await data in outputHandle.bytes {
if Task.isCancelled {
break
}
// Accumulate data into buffer
if let character = String(data: Data([data]), encoding: .utf8) {
buffer += character
// Process complete lines
let lines = buffer.components(separatedBy: .newlines)
if lines.count > 1 {
// Process all complete lines except the last (incomplete) one
for i in 0..<(lines.count - 1) {
let line = lines[i]
await processNewLine(
line: line,
startTime: startTime,
continuation: continuation
)
}
// Keep the last incomplete line in buffer
buffer = lines.last ?? ""
}
}
}
} catch {
logger.error("Error starting tail process: \(error)")
return nil
}
// Cleanup when cancelled or finished
if tailProcess.isRunning {
tailProcess.terminate()
}
return tailProcess
}
private func processNewLine(
line: String,
startTime: Date,
continuation: AsyncStream<ByteBuffer>.Continuation
) async {
let trimmedLine = line.trimmingCharacters(in: .whitespaces)
guard !trimmedLine.isEmpty else { return }
if let data = trimmedLine.data(using: .utf8),
let parsed = try? JSONSerialization.jsonObject(with: data) {
// Skip duplicate headers
if let dict = parsed as? [String: Any],
dict["version"] != nil && dict["width"] != nil && dict["height"] != nil {
return
}
if let array = parsed as? [Any], array.count >= 3 {
let currentTime = Date()
let realTimeEvent = [
currentTime.timeIntervalSince(startTime),
array[1],
array[2]
]
if let eventData = try? JSONSerialization.data(withJSONObject: realTimeEvent),
let eventString = String(data: eventData, encoding: .utf8) {
var buffer = ByteBuffer()
buffer.writeString("data: \(eventString)\n\n")
continuation.yield(buffer)
}
}
} else {
// Handle non-JSON as raw output
let currentTime = Date()
let castEvent: [Any] = [
currentTime.timeIntervalSince(startTime),
"o",
trimmedLine
]
if let eventData = try? JSONSerialization.data(withJSONObject: castEvent),
let eventString = String(data: eventData, encoding: .utf8) {
var buffer = ByteBuffer()
buffer.writeString("data: \(eventString)\n\n")
continuation.yield(buffer)
}
}
}
}
private func getSessionSnapshot(sessionId: String) async -> Response {