Fix terminal flow control to prevent xterm.js buffer overflow (#223)

This commit is contained in:
Peter Steinberger 2025-07-06 12:45:31 +01:00 committed by GitHub
parent 36337cfd7a
commit 7f7b4b682b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -7,10 +7,39 @@ import { createLogger } from '../utils/logger.js';
const logger = createLogger('terminal-manager');
// Flow control configuration
const FLOW_CONTROL_CONFIG = {
// When buffer exceeds this percentage of max lines, pause reading
// 80% gives a good buffer before hitting the scrollback limit
highWatermark: 0.8,
// Resume reading when buffer drops below this percentage
// 50% ensures enough space is cleared before resuming
lowWatermark: 0.5,
// Check interval for resuming paused sessions
// 100ms provides responsive resumption without excessive CPU usage
checkInterval: 100, // ms
// Maximum pending lines to accumulate while paused
// 10K lines handles bursts without excessive memory (avg ~1MB at 100 chars/line)
maxPendingLines: 10000,
// Maximum time a session can be paused before timing out
// 5 minutes handles temporary client issues without indefinite memory growth
maxPauseTime: 5 * 60 * 1000, // 5 minutes
// Lines to process between buffer pressure checks
// Checking every 100 lines balances performance with responsiveness
bufferCheckInterval: 100,
};
interface SessionTerminal {
terminal: XtermTerminal;
watcher?: fs.FSWatcher;
lastUpdate: number;
isPaused?: boolean;
pendingLines?: string[];
pausedAt?: number;
linesProcessedSinceCheck?: number;
isProcessingPending?: boolean;
lastFileOffset?: number;
lineBuffer?: string;
}
type BufferChangeListener = (sessionId: string, snapshot: BufferSnapshot) => void;
@ -47,6 +76,7 @@ export class TerminalManager {
},
});
private originalConsoleWarn: typeof console.warn;
private flowControlTimer?: NodeJS.Timeout;
constructor(controlDir: string) {
this.controlDir = controlDir;
@ -66,6 +96,9 @@ export class TerminalManager {
}
this.originalConsoleWarn.apply(console, args);
};
// Start flow control check timer
this.startFlowControlTimer();
}
/**
@ -110,8 +143,8 @@ export class TerminalManager {
if (!sessionTerminal) return;
const streamPath = path.join(this.controlDir, sessionId, 'stdout');
let lastOffset = 0;
let lineBuffer = '';
let lastOffset = sessionTerminal.lastFileOffset || 0;
let lineBuffer = sessionTerminal.lineBuffer || '';
// Check if the file exists
if (!fs.existsSync(streamPath)) {
@ -146,6 +179,7 @@ export class TerminalManager {
// Update offset
lastOffset = stats.size;
sessionTerminal.lastFileOffset = lastOffset;
// Process new data
const newData = buffer.toString('utf8');
@ -154,6 +188,7 @@ export class TerminalManager {
// Process complete lines
const lines = lineBuffer.split('\n');
lineBuffer = lines.pop() || ''; // Keep incomplete line for next time
sessionTerminal.lineBuffer = lineBuffer;
for (const line of lines) {
if (line.trim()) {
@ -174,10 +209,202 @@ export class TerminalManager {
}
}
/**
* Start flow control timer to check paused sessions
*/
private startFlowControlTimer(): void {
let checkIndex = 0;
const sessionIds: string[] = [];
this.flowControlTimer = setInterval(() => {
// Rebuild session list periodically
if (checkIndex === 0) {
sessionIds.length = 0;
for (const [sessionId, sessionTerminal] of this.terminals) {
if (sessionTerminal.isPaused) {
sessionIds.push(sessionId);
}
}
}
// Process one session per tick to avoid thundering herd
if (sessionIds.length > 0) {
const sessionId = sessionIds[checkIndex % sessionIds.length];
const sessionTerminal = this.terminals.get(sessionId);
if (sessionTerminal?.isPaused) {
// Check for timeout
if (
sessionTerminal.pausedAt &&
Date.now() - sessionTerminal.pausedAt > FLOW_CONTROL_CONFIG.maxPauseTime
) {
logger.warn(
chalk.red(
`Session ${sessionId} has been paused for too long. ` +
`Dropping ${sessionTerminal.pendingLines?.length || 0} pending lines.`
)
);
sessionTerminal.isPaused = false;
sessionTerminal.pendingLines = [];
sessionTerminal.pausedAt = undefined;
// Resume file watching after timeout
this.resumeFileWatcher(sessionId).catch((error) => {
logger.error(
`Failed to resume file watcher for session ${sessionId} after timeout:`,
error
);
});
} else {
this.checkBufferPressure(sessionId);
}
}
checkIndex = (checkIndex + 1) % Math.max(sessionIds.length, 1);
}
}, FLOW_CONTROL_CONFIG.checkInterval);
}
/**
* Check buffer pressure and pause/resume as needed
*/
private checkBufferPressure(sessionId: string): boolean {
const sessionTerminal = this.terminals.get(sessionId);
if (!sessionTerminal) return false;
const terminal = sessionTerminal.terminal;
const buffer = terminal.buffer.active;
const maxLines = terminal.options.scrollback || 10000;
const currentLines = buffer.length;
const bufferUtilization = currentLines / maxLines;
const wasPaused = sessionTerminal.isPaused || false;
// Check if we should pause
if (!wasPaused && bufferUtilization > FLOW_CONTROL_CONFIG.highWatermark) {
sessionTerminal.isPaused = true;
sessionTerminal.pendingLines = [];
sessionTerminal.pausedAt = Date.now();
// Apply backpressure by closing the file watcher
if (sessionTerminal.watcher) {
sessionTerminal.watcher.close();
sessionTerminal.watcher = undefined;
}
logger.warn(
chalk.yellow(
`Buffer pressure high for session ${sessionId}: ${Math.round(bufferUtilization * 100)}% ` +
`(${currentLines}/${maxLines} lines). Pausing file watcher.`
)
);
return true;
}
// Check if we should resume
if (wasPaused && bufferUtilization < FLOW_CONTROL_CONFIG.lowWatermark) {
// Avoid race condition: mark as processing pending before resuming
if (
sessionTerminal.pendingLines &&
sessionTerminal.pendingLines.length > 0 &&
!sessionTerminal.isProcessingPending
) {
sessionTerminal.isProcessingPending = true;
const pendingCount = sessionTerminal.pendingLines.length;
logger.log(
chalk.green(
`Buffer pressure normalized for session ${sessionId}: ${Math.round(bufferUtilization * 100)}% ` +
`(${currentLines}/${maxLines} lines). Processing ${pendingCount} pending lines.`
)
);
// Process pending lines asynchronously to avoid blocking
setImmediate(() => {
const lines = sessionTerminal.pendingLines || [];
sessionTerminal.pendingLines = [];
sessionTerminal.isPaused = false;
sessionTerminal.pausedAt = undefined;
sessionTerminal.isProcessingPending = false;
for (const pendingLine of lines) {
this.processStreamLine(sessionId, sessionTerminal, pendingLine);
}
// Resume file watching after processing pending lines
this.resumeFileWatcher(sessionId).catch((error) => {
logger.error(`Failed to resume file watcher for session ${sessionId}:`, error);
});
});
} else if (!sessionTerminal.pendingLines || sessionTerminal.pendingLines.length === 0) {
// No pending lines, just resume
sessionTerminal.isPaused = false;
sessionTerminal.pausedAt = undefined;
// Resume file watching
this.resumeFileWatcher(sessionId).catch((error) => {
logger.error(`Failed to resume file watcher for session ${sessionId}:`, error);
});
logger.log(
chalk.green(
`Buffer pressure normalized for session ${sessionId}: ${Math.round(bufferUtilization * 100)}% ` +
`(${currentLines}/${maxLines} lines). Resuming file watcher.`
)
);
}
return false;
}
return wasPaused;
}
/**
* Handle stream line
*/
private handleStreamLine(sessionId: string, sessionTerminal: SessionTerminal, line: string) {
// Initialize line counter if needed
if (sessionTerminal.linesProcessedSinceCheck === undefined) {
sessionTerminal.linesProcessedSinceCheck = 0;
}
// Check buffer pressure periodically or if already paused
let isPaused = sessionTerminal.isPaused || false;
if (
!isPaused &&
sessionTerminal.linesProcessedSinceCheck >= FLOW_CONTROL_CONFIG.bufferCheckInterval
) {
isPaused = this.checkBufferPressure(sessionId);
sessionTerminal.linesProcessedSinceCheck = 0;
}
if (isPaused) {
// Queue the line for later processing
if (!sessionTerminal.pendingLines) {
sessionTerminal.pendingLines = [];
}
// Limit pending lines to prevent memory issues
if (sessionTerminal.pendingLines.length < FLOW_CONTROL_CONFIG.maxPendingLines) {
sessionTerminal.pendingLines.push(line);
} else {
logger.warn(
chalk.red(
`Pending lines limit reached for session ${sessionId}. Dropping new data to prevent memory overflow.`
)
);
}
return;
}
sessionTerminal.linesProcessedSinceCheck++;
this.processStreamLine(sessionId, sessionTerminal, line);
}
/**
* Process a stream line (separated from handleStreamLine for flow control)
*/
private processStreamLine(sessionId: string, sessionTerminal: SessionTerminal, line: string) {
try {
const data = JSON.parse(line);
@ -246,8 +473,12 @@ export class TerminalManager {
async getBufferStats(sessionId: string) {
const terminal = await this.getTerminal(sessionId);
const buffer = terminal.buffer.active;
const sessionTerminal = this.terminals.get(sessionId);
logger.debug(`Getting buffer stats for session ${sessionId}: ${buffer.length} total rows`);
const maxLines = terminal.options.scrollback || 10000;
const bufferUtilization = buffer.length / maxLines;
return {
totalRows: buffer.length,
cols: terminal.cols,
@ -256,6 +487,11 @@ export class TerminalManager {
cursorX: buffer.cursorX,
cursorY: buffer.cursorY,
scrollback: terminal.options.scrollback || 0,
// Flow control metrics
isPaused: sessionTerminal?.isPaused || false,
pendingLines: sessionTerminal?.pendingLines?.length || 0,
bufferUtilization: Math.round(bufferUtilization * 100),
maxBufferLines: maxLines,
};
}
@ -816,6 +1052,18 @@ export class TerminalManager {
}
}
/**
* Resume file watching for a paused session
*/
private async resumeFileWatcher(sessionId: string): Promise<void> {
const sessionTerminal = this.terminals.get(sessionId);
if (!sessionTerminal || sessionTerminal.watcher) {
return; // Already watching or session doesn't exist
}
await this.watchStreamFile(sessionId);
}
/**
* Destroy the terminal manager and restore console overrides
*/
@ -840,6 +1088,12 @@ export class TerminalManager {
// Clear write queues
this.writeQueues.clear();
// Clear flow control timer
if (this.flowControlTimer) {
clearInterval(this.flowControlTimer);
this.flowControlTimer = undefined;
}
// Restore original console.warn
console.warn = this.originalConsoleWarn;
}