fix: eliminate lag in forwarded sessions by forcing immediate disk sync

- Add fs.fsyncSync after each asciinema event write to trigger file watchers immediately
- Keep socket connections alive with setKeepAlive for better performance
- Add response flushing in SSE streams to prevent buffering
- Fix the ~100ms input lag that was affecting forwarded sessions vs server-created sessions

The lag was caused by buffered writes not immediately triggering file system watchers.
Forwarded sessions now feel as responsive as server-created sessions.

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Mario Zechner 2025-06-22 14:56:25 +02:00
parent eb0f9f4d77
commit 862fc86c72
4 changed files with 40 additions and 3 deletions

View file

@ -14,6 +14,7 @@ export class AsciinemaWriter {
private startTime: Date; private startTime: Date;
private utf8Buffer: Buffer = Buffer.alloc(0); private utf8Buffer: Buffer = Buffer.alloc(0);
private headerWritten = false; private headerWritten = false;
private fd: number | null = null;
constructor( constructor(
private filePath: string, private filePath: string,
@ -27,10 +28,16 @@ export class AsciinemaWriter {
fs.mkdirSync(dir, { recursive: true }); fs.mkdirSync(dir, { recursive: true });
} }
// Create write stream // Create write stream with no buffering for real-time performance
this.writeStream = fs.createWriteStream(filePath, { this.writeStream = fs.createWriteStream(filePath, {
flags: 'w', flags: 'w',
encoding: 'utf8', encoding: 'utf8',
highWaterMark: 0, // Disable internal buffering
});
// Get file descriptor for fsync
this.writeStream.on('open', (fd) => {
this.fd = fd;
}); });
this.writeHeader(); this.writeHeader();
@ -151,6 +158,16 @@ export class AsciinemaWriter {
const eventArray = [event.time, event.type, event.data]; const eventArray = [event.time, event.type, event.data];
const eventJson = JSON.stringify(eventArray); const eventJson = JSON.stringify(eventArray);
this.writeStream.write(eventJson + '\n'); this.writeStream.write(eventJson + '\n');
// Force immediate disk write to trigger file watchers
// This is critical for real-time performance with forwarded sessions
if (this.fd !== null) {
try {
fs.fsyncSync(this.fd);
} catch (_e) {
// Ignore sync errors
}
}
} }
/** /**

View file

@ -263,7 +263,9 @@ export class PtyManager {
client.on('data', (data) => { client.on('data', (data) => {
const text = data.toString('utf8'); const text = data.toString('utf8');
if (session.ptyProcess) { if (session.ptyProcess) {
// Write input first for fastest response
session.ptyProcess.write(text); session.ptyProcess.write(text);
// Then record it (non-blocking)
session.asciinemaWriter?.writeInput(text); session.asciinemaWriter?.writeInput(text);
} }
}); });
@ -413,7 +415,8 @@ export class PtyManager {
sessionId sessionId
); );
} }
// Try Unix domain socket first for lowest latency
// For forwarded sessions, we need to use socket communication
const socketPath = path.join(sessionPaths.controlDir, 'input.sock'); const socketPath = path.join(sessionPaths.controlDir, 'input.sock');
// Check if we have a cached socket connection // Check if we have a cached socket connection
@ -424,6 +427,8 @@ export class PtyManager {
try { try {
socketClient = net.createConnection(socketPath); socketClient = net.createConnection(socketPath);
socketClient.setNoDelay(true); socketClient.setNoDelay(true);
// Keep socket alive for better performance
socketClient.setKeepAlive(true, 0);
this.inputSocketClients.set(sessionId, socketClient); this.inputSocketClients.set(sessionId, socketClient);
socketClient.on('error', () => { socketClient.on('error', () => {
@ -439,6 +444,7 @@ export class PtyManager {
} }
if (socketClient && !socketClient.destroyed) { if (socketClient && !socketClient.destroyed) {
// Write and flush immediately
socketClient.write(dataToSend); socketClient.write(dataToSend);
} else { } else {
throw new PtyError( throw new PtyError(

View file

@ -790,10 +790,16 @@ export function createSessionRoutes(config: SessionRoutesConfig): Router {
'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control', 'Access-Control-Allow-Headers': 'Cache-Control',
'X-Accel-Buffering': 'no', // Disable Nginx buffering 'X-Accel-Buffering': 'no', // Disable Nginx buffering
'Content-Encoding': 'identity', // Prevent compression
}); });
// Force headers to be sent immediately
res.flushHeaders();
// Send initial connection event // Send initial connection event
res.write(':ok\n\n'); res.write(':ok\n\n');
// @ts-expect-error - flush exists but not in types
if (res.flush) res.flush();
// Add client to stream watcher // Add client to stream watcher
streamWatcher.addClient(sessionId, streamPath, res); streamWatcher.addClient(sessionId, streamPath, res);
@ -801,6 +807,8 @@ export function createSessionRoutes(config: SessionRoutesConfig): Router {
// Send heartbeat every 30 seconds to keep connection alive // Send heartbeat every 30 seconds to keep connection alive
const heartbeat = setInterval(() => { const heartbeat = setInterval(() => {
res.write(':heartbeat\n\n'); res.write(':heartbeat\n\n');
// @ts-expect-error - flush exists but not in types
if (res.flush) res.flush();
}, 30000); }, 30000);
// Clean up on disconnect // Clean up on disconnect

View file

@ -192,7 +192,8 @@ export class StreamWatcher {
* Start watching a file for changes * Start watching a file for changes
*/ */
private startWatching(sessionId: string, streamPath: string, watcherInfo: WatcherInfo): void { private startWatching(sessionId: string, streamPath: string, watcherInfo: WatcherInfo): void {
watcherInfo.watcher = fs.watch(streamPath, (eventType) => { // Use options for more responsive watching
watcherInfo.watcher = fs.watch(streamPath, { persistent: true }, (eventType) => {
if (eventType === 'change') { if (eventType === 'change') {
try { try {
const stats = fs.statSync(streamPath); const stats = fs.statSync(streamPath);
@ -214,6 +215,7 @@ export class StreamWatcher {
const lines = watcherInfo.lineBuffer.split('\n'); const lines = watcherInfo.lineBuffer.split('\n');
watcherInfo.lineBuffer = lines.pop() || ''; watcherInfo.lineBuffer = lines.pop() || '';
console.log(`[STREAM] New data: ${newData}`);
for (const line of lines) { for (const line of lines) {
if (line.trim()) { if (line.trim()) {
this.broadcastLine(sessionId, line, watcherInfo); this.broadcastLine(sessionId, line, watcherInfo);
@ -264,6 +266,8 @@ export class StreamWatcher {
try { try {
client.response.write(clientData); client.response.write(clientData);
// @ts-expect-error - flush exists but not in types
if (client.response.flush) client.response.flush();
} catch (error) { } catch (error) {
console.error(`[STREAM] Error writing to client:`, error); console.error(`[STREAM] Error writing to client:`, error);
// Client might be disconnected // Client might be disconnected
@ -281,6 +285,8 @@ export class StreamWatcher {
try { try {
client.response.write(clientData); client.response.write(clientData);
// @ts-expect-error - flush exists but not in types
if (client.response.flush) client.response.flush();
} catch (error) { } catch (error) {
console.error(`[STREAM] Error writing to client:`, error); console.error(`[STREAM] Error writing to client:`, error);
} }