diff --git a/web/src/server/pty/asciinema-writer.ts b/web/src/server/pty/asciinema-writer.ts index 3c69d751..98dabc75 100644 --- a/web/src/server/pty/asciinema-writer.ts +++ b/web/src/server/pty/asciinema-writer.ts @@ -14,6 +14,7 @@ export class AsciinemaWriter { private startTime: Date; private utf8Buffer: Buffer = Buffer.alloc(0); private headerWritten = false; + private fd: number | null = null; constructor( private filePath: string, @@ -27,10 +28,16 @@ export class AsciinemaWriter { fs.mkdirSync(dir, { recursive: true }); } - // Create write stream + // Create write stream with no buffering for real-time performance this.writeStream = fs.createWriteStream(filePath, { flags: 'w', encoding: 'utf8', + highWaterMark: 0, // Disable internal buffering + }); + + // Get file descriptor for fsync + this.writeStream.on('open', (fd) => { + this.fd = fd; }); this.writeHeader(); @@ -151,6 +158,16 @@ export class AsciinemaWriter { const eventArray = [event.time, event.type, event.data]; const eventJson = JSON.stringify(eventArray); this.writeStream.write(eventJson + '\n'); + + // Force immediate disk write to trigger file watchers + // This is critical for real-time performance with forwarded sessions + if (this.fd !== null) { + try { + fs.fsyncSync(this.fd); + } catch (_e) { + // Ignore sync errors + } + } } /** diff --git a/web/src/server/pty/pty-manager.ts b/web/src/server/pty/pty-manager.ts index 033e04f5..955e96d9 100644 --- a/web/src/server/pty/pty-manager.ts +++ b/web/src/server/pty/pty-manager.ts @@ -263,7 +263,9 @@ export class PtyManager { client.on('data', (data) => { const text = data.toString('utf8'); if (session.ptyProcess) { + // Write input first for fastest response session.ptyProcess.write(text); + // Then record it (non-blocking) session.asciinemaWriter?.writeInput(text); } }); @@ -413,7 +415,8 @@ export class PtyManager { sessionId ); } - // Try Unix domain socket first for lowest latency + + // For forwarded sessions, we need to use socket communication const socketPath = path.join(sessionPaths.controlDir, 'input.sock'); // Check if we have a cached socket connection @@ -424,6 +427,8 @@ export class PtyManager { try { socketClient = net.createConnection(socketPath); socketClient.setNoDelay(true); + // Keep socket alive for better performance + socketClient.setKeepAlive(true, 0); this.inputSocketClients.set(sessionId, socketClient); socketClient.on('error', () => { @@ -439,6 +444,7 @@ export class PtyManager { } if (socketClient && !socketClient.destroyed) { + // Write and flush immediately socketClient.write(dataToSend); } else { throw new PtyError( diff --git a/web/src/server/routes/sessions.ts b/web/src/server/routes/sessions.ts index a115c0f3..b77eca77 100644 --- a/web/src/server/routes/sessions.ts +++ b/web/src/server/routes/sessions.ts @@ -790,10 +790,16 @@ export function createSessionRoutes(config: SessionRoutesConfig): Router { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control', 'X-Accel-Buffering': 'no', // Disable Nginx buffering + 'Content-Encoding': 'identity', // Prevent compression }); + // Force headers to be sent immediately + res.flushHeaders(); + // Send initial connection event res.write(':ok\n\n'); + // @ts-expect-error - flush exists but not in types + if (res.flush) res.flush(); // Add client to stream watcher streamWatcher.addClient(sessionId, streamPath, res); @@ -801,6 +807,8 @@ export function createSessionRoutes(config: SessionRoutesConfig): Router { // Send heartbeat every 30 seconds to keep connection alive const heartbeat = setInterval(() => { res.write(':heartbeat\n\n'); + // @ts-expect-error - flush exists but not in types + if (res.flush) res.flush(); }, 30000); // Clean up on disconnect diff --git a/web/src/server/services/stream-watcher.ts b/web/src/server/services/stream-watcher.ts index 7af4d996..194eb15c 100644 --- a/web/src/server/services/stream-watcher.ts +++ b/web/src/server/services/stream-watcher.ts @@ -192,7 +192,8 @@ export class StreamWatcher { * Start watching a file for changes */ private startWatching(sessionId: string, streamPath: string, watcherInfo: WatcherInfo): void { - watcherInfo.watcher = fs.watch(streamPath, (eventType) => { + // Use options for more responsive watching + watcherInfo.watcher = fs.watch(streamPath, { persistent: true }, (eventType) => { if (eventType === 'change') { try { const stats = fs.statSync(streamPath); @@ -214,6 +215,7 @@ export class StreamWatcher { const lines = watcherInfo.lineBuffer.split('\n'); watcherInfo.lineBuffer = lines.pop() || ''; + console.log(`[STREAM] New data: ${newData}`); for (const line of lines) { if (line.trim()) { this.broadcastLine(sessionId, line, watcherInfo); @@ -264,6 +266,8 @@ export class StreamWatcher { try { client.response.write(clientData); + // @ts-expect-error - flush exists but not in types + if (client.response.flush) client.response.flush(); } catch (error) { console.error(`[STREAM] Error writing to client:`, error); // Client might be disconnected @@ -281,6 +285,8 @@ export class StreamWatcher { try { client.response.write(clientData); + // @ts-expect-error - flush exists but not in types + if (client.response.flush) client.response.flush(); } catch (error) { console.error(`[STREAM] Error writing to client:`, error); }