diff --git a/web/src/server/services/terminal-manager.ts b/web/src/server/services/terminal-manager.ts index 6b0af0a4..f7c4c45d 100644 --- a/web/src/server/services/terminal-manager.ts +++ b/web/src/server/services/terminal-manager.ts @@ -7,10 +7,39 @@ import { createLogger } from '../utils/logger.js'; const logger = createLogger('terminal-manager'); +// Flow control configuration +const FLOW_CONTROL_CONFIG = { + // When buffer exceeds this percentage of max lines, pause reading + // 80% gives a good buffer before hitting the scrollback limit + highWatermark: 0.8, + // Resume reading when buffer drops below this percentage + // 50% ensures enough space is cleared before resuming + lowWatermark: 0.5, + // Check interval for resuming paused sessions + // 100ms provides responsive resumption without excessive CPU usage + checkInterval: 100, // ms + // Maximum pending lines to accumulate while paused + // 10K lines handles bursts without excessive memory (avg ~1MB at 100 chars/line) + maxPendingLines: 10000, + // Maximum time a session can be paused before timing out + // 5 minutes handles temporary client issues without indefinite memory growth + maxPauseTime: 5 * 60 * 1000, // 5 minutes + // Lines to process between buffer pressure checks + // Checking every 100 lines balances performance with responsiveness + bufferCheckInterval: 100, +}; + interface SessionTerminal { terminal: XtermTerminal; watcher?: fs.FSWatcher; lastUpdate: number; + isPaused?: boolean; + pendingLines?: string[]; + pausedAt?: number; + linesProcessedSinceCheck?: number; + isProcessingPending?: boolean; + lastFileOffset?: number; + lineBuffer?: string; } type BufferChangeListener = (sessionId: string, snapshot: BufferSnapshot) => void; @@ -47,6 +76,7 @@ export class TerminalManager { }, }); private originalConsoleWarn: typeof console.warn; + private flowControlTimer?: NodeJS.Timeout; constructor(controlDir: string) { this.controlDir = controlDir; @@ -66,6 +96,9 @@ export class TerminalManager { } this.originalConsoleWarn.apply(console, args); }; + + // Start flow control check timer + this.startFlowControlTimer(); } /** @@ -110,8 +143,8 @@ export class TerminalManager { if (!sessionTerminal) return; const streamPath = path.join(this.controlDir, sessionId, 'stdout'); - let lastOffset = 0; - let lineBuffer = ''; + let lastOffset = sessionTerminal.lastFileOffset || 0; + let lineBuffer = sessionTerminal.lineBuffer || ''; // Check if the file exists if (!fs.existsSync(streamPath)) { @@ -146,6 +179,7 @@ export class TerminalManager { // Update offset lastOffset = stats.size; + sessionTerminal.lastFileOffset = lastOffset; // Process new data const newData = buffer.toString('utf8'); @@ -154,6 +188,7 @@ export class TerminalManager { // Process complete lines const lines = lineBuffer.split('\n'); lineBuffer = lines.pop() || ''; // Keep incomplete line for next time + sessionTerminal.lineBuffer = lineBuffer; for (const line of lines) { if (line.trim()) { @@ -174,10 +209,202 @@ export class TerminalManager { } } + /** + * Start flow control timer to check paused sessions + */ + private startFlowControlTimer(): void { + let checkIndex = 0; + const sessionIds: string[] = []; + + this.flowControlTimer = setInterval(() => { + // Rebuild session list periodically + if (checkIndex === 0) { + sessionIds.length = 0; + for (const [sessionId, sessionTerminal] of this.terminals) { + if (sessionTerminal.isPaused) { + sessionIds.push(sessionId); + } + } + } + + // Process one session per tick to avoid thundering herd + if (sessionIds.length > 0) { + const sessionId = sessionIds[checkIndex % sessionIds.length]; + const sessionTerminal = this.terminals.get(sessionId); + + if (sessionTerminal?.isPaused) { + // Check for timeout + if ( + sessionTerminal.pausedAt && + Date.now() - sessionTerminal.pausedAt > FLOW_CONTROL_CONFIG.maxPauseTime + ) { + logger.warn( + chalk.red( + `Session ${sessionId} has been paused for too long. ` + + `Dropping ${sessionTerminal.pendingLines?.length || 0} pending lines.` + ) + ); + sessionTerminal.isPaused = false; + sessionTerminal.pendingLines = []; + sessionTerminal.pausedAt = undefined; + + // Resume file watching after timeout + this.resumeFileWatcher(sessionId).catch((error) => { + logger.error( + `Failed to resume file watcher for session ${sessionId} after timeout:`, + error + ); + }); + } else { + this.checkBufferPressure(sessionId); + } + } + + checkIndex = (checkIndex + 1) % Math.max(sessionIds.length, 1); + } + }, FLOW_CONTROL_CONFIG.checkInterval); + } + + /** + * Check buffer pressure and pause/resume as needed + */ + private checkBufferPressure(sessionId: string): boolean { + const sessionTerminal = this.terminals.get(sessionId); + if (!sessionTerminal) return false; + + const terminal = sessionTerminal.terminal; + const buffer = terminal.buffer.active; + const maxLines = terminal.options.scrollback || 10000; + const currentLines = buffer.length; + const bufferUtilization = currentLines / maxLines; + + const wasPaused = sessionTerminal.isPaused || false; + + // Check if we should pause + if (!wasPaused && bufferUtilization > FLOW_CONTROL_CONFIG.highWatermark) { + sessionTerminal.isPaused = true; + sessionTerminal.pendingLines = []; + sessionTerminal.pausedAt = Date.now(); + + // Apply backpressure by closing the file watcher + if (sessionTerminal.watcher) { + sessionTerminal.watcher.close(); + sessionTerminal.watcher = undefined; + } + + logger.warn( + chalk.yellow( + `Buffer pressure high for session ${sessionId}: ${Math.round(bufferUtilization * 100)}% ` + + `(${currentLines}/${maxLines} lines). Pausing file watcher.` + ) + ); + return true; + } + + // Check if we should resume + if (wasPaused && bufferUtilization < FLOW_CONTROL_CONFIG.lowWatermark) { + // Avoid race condition: mark as processing pending before resuming + if ( + sessionTerminal.pendingLines && + sessionTerminal.pendingLines.length > 0 && + !sessionTerminal.isProcessingPending + ) { + sessionTerminal.isProcessingPending = true; + + const pendingCount = sessionTerminal.pendingLines.length; + logger.log( + chalk.green( + `Buffer pressure normalized for session ${sessionId}: ${Math.round(bufferUtilization * 100)}% ` + + `(${currentLines}/${maxLines} lines). Processing ${pendingCount} pending lines.` + ) + ); + + // Process pending lines asynchronously to avoid blocking + setImmediate(() => { + const lines = sessionTerminal.pendingLines || []; + sessionTerminal.pendingLines = []; + sessionTerminal.isPaused = false; + sessionTerminal.pausedAt = undefined; + sessionTerminal.isProcessingPending = false; + + for (const pendingLine of lines) { + this.processStreamLine(sessionId, sessionTerminal, pendingLine); + } + + // Resume file watching after processing pending lines + this.resumeFileWatcher(sessionId).catch((error) => { + logger.error(`Failed to resume file watcher for session ${sessionId}:`, error); + }); + }); + } else if (!sessionTerminal.pendingLines || sessionTerminal.pendingLines.length === 0) { + // No pending lines, just resume + sessionTerminal.isPaused = false; + sessionTerminal.pausedAt = undefined; + + // Resume file watching + this.resumeFileWatcher(sessionId).catch((error) => { + logger.error(`Failed to resume file watcher for session ${sessionId}:`, error); + }); + + logger.log( + chalk.green( + `Buffer pressure normalized for session ${sessionId}: ${Math.round(bufferUtilization * 100)}% ` + + `(${currentLines}/${maxLines} lines). Resuming file watcher.` + ) + ); + } + return false; + } + + return wasPaused; + } + /** * Handle stream line */ private handleStreamLine(sessionId: string, sessionTerminal: SessionTerminal, line: string) { + // Initialize line counter if needed + if (sessionTerminal.linesProcessedSinceCheck === undefined) { + sessionTerminal.linesProcessedSinceCheck = 0; + } + + // Check buffer pressure periodically or if already paused + let isPaused = sessionTerminal.isPaused || false; + if ( + !isPaused && + sessionTerminal.linesProcessedSinceCheck >= FLOW_CONTROL_CONFIG.bufferCheckInterval + ) { + isPaused = this.checkBufferPressure(sessionId); + sessionTerminal.linesProcessedSinceCheck = 0; + } + + if (isPaused) { + // Queue the line for later processing + if (!sessionTerminal.pendingLines) { + sessionTerminal.pendingLines = []; + } + + // Limit pending lines to prevent memory issues + if (sessionTerminal.pendingLines.length < FLOW_CONTROL_CONFIG.maxPendingLines) { + sessionTerminal.pendingLines.push(line); + } else { + logger.warn( + chalk.red( + `Pending lines limit reached for session ${sessionId}. Dropping new data to prevent memory overflow.` + ) + ); + } + return; + } + + sessionTerminal.linesProcessedSinceCheck++; + this.processStreamLine(sessionId, sessionTerminal, line); + } + + /** + * Process a stream line (separated from handleStreamLine for flow control) + */ + private processStreamLine(sessionId: string, sessionTerminal: SessionTerminal, line: string) { try { const data = JSON.parse(line); @@ -246,8 +473,12 @@ export class TerminalManager { async getBufferStats(sessionId: string) { const terminal = await this.getTerminal(sessionId); const buffer = terminal.buffer.active; + const sessionTerminal = this.terminals.get(sessionId); logger.debug(`Getting buffer stats for session ${sessionId}: ${buffer.length} total rows`); + const maxLines = terminal.options.scrollback || 10000; + const bufferUtilization = buffer.length / maxLines; + return { totalRows: buffer.length, cols: terminal.cols, @@ -256,6 +487,11 @@ export class TerminalManager { cursorX: buffer.cursorX, cursorY: buffer.cursorY, scrollback: terminal.options.scrollback || 0, + // Flow control metrics + isPaused: sessionTerminal?.isPaused || false, + pendingLines: sessionTerminal?.pendingLines?.length || 0, + bufferUtilization: Math.round(bufferUtilization * 100), + maxBufferLines: maxLines, }; } @@ -816,6 +1052,18 @@ export class TerminalManager { } } + /** + * Resume file watching for a paused session + */ + private async resumeFileWatcher(sessionId: string): Promise { + const sessionTerminal = this.terminals.get(sessionId); + if (!sessionTerminal || sessionTerminal.watcher) { + return; // Already watching or session doesn't exist + } + + await this.watchStreamFile(sessionId); + } + /** * Destroy the terminal manager and restore console overrides */ @@ -840,6 +1088,12 @@ export class TerminalManager { // Clear write queues this.writeQueues.clear(); + // Clear flow control timer + if (this.flowControlTimer) { + clearInterval(this.flowControlTimer); + this.flowControlTimer = undefined; + } + // Restore original console.warn console.warn = this.originalConsoleWarn; }