diff --git a/web/src/client/components/session-view/direct-keyboard-manager.ts b/web/src/client/components/session-view/direct-keyboard-manager.ts index c275454a..be1a7f8e 100644 --- a/web/src/client/components/session-view/direct-keyboard-manager.ts +++ b/web/src/client/components/session-view/direct-keyboard-manager.ts @@ -432,8 +432,8 @@ export class DirectKeyboardManager { key: string, isModifier?: boolean, isSpecial?: boolean, - isToggle?: boolean, - pasteText?: string + _isToggle?: boolean, + _pasteText?: string ): Promise => { if (!this.inputManager) { logger.error('No input manager found'); @@ -737,7 +737,9 @@ export class DirectKeyboardManager { // Restore all original styles Object.entries(originalStyles).forEach(([key, value]) => { if (value !== undefined) { - (this.hiddenInput!.style as any)[key] = value; + if (this.hiddenInput?.style) { + (this.hiddenInput.style as unknown as Record)[key] = value; + } } }); this.hiddenInput.placeholder = ''; diff --git a/web/src/client/components/terminal-quick-keys.ts b/web/src/client/components/terminal-quick-keys.ts index 0098d77f..cb7635a1 100644 --- a/web/src/client/components/terminal-quick-keys.ts +++ b/web/src/client/components/terminal-quick-keys.ts @@ -196,7 +196,7 @@ export class TerminalQuickKeys extends LitElement { } } - private handlePasteImmediate(e: Event) { + private handlePasteImmediate(_e: Event) { console.log('[QuickKeys] Paste button touched - delegating to paste handler'); // Always delegate to the main paste handler in direct-keyboard-manager diff --git a/web/src/server/pty/pty-manager.ts b/web/src/server/pty/pty-manager.ts index 0f37d76e..abda889a 100644 --- a/web/src/server/pty/pty-manager.ts +++ b/web/src/server/pty/pty-manager.ts @@ -246,6 +246,7 @@ export class PtyManager extends EventEmitter { startedAt: new Date().toISOString(), initialCols: cols, initialRows: rows, + lastClearOffset: 0, version: VERSION, }; diff --git a/web/src/server/server.ts b/web/src/server/server.ts index a7318db8..c8aaee2e 100644 --- a/web/src/server/server.ts +++ b/web/src/server/server.ts @@ -448,7 +448,7 @@ export async function createApp(): Promise { logger.debug('Initialized terminal manager'); // Initialize stream watcher for file-based streaming - const streamWatcher = new StreamWatcher(); + const streamWatcher = new StreamWatcher(sessionManager); logger.debug('Initialized stream watcher'); // Initialize activity monitor diff --git a/web/src/server/services/stream-watcher.ts b/web/src/server/services/stream-watcher.ts index 940bf9d1..a367bb02 100644 --- a/web/src/server/services/stream-watcher.ts +++ b/web/src/server/services/stream-watcher.ts @@ -1,11 +1,16 @@ import chalk from 'chalk'; import type { Response } from 'express'; import * as fs from 'fs'; +import type { SessionManager } from '../pty/session-manager.js'; import type { AsciinemaHeader } from '../pty/types.js'; import { createLogger } from '../utils/logger.js'; const logger = createLogger('stream-watcher'); +// Constants +const HEADER_READ_BUFFER_SIZE = 4096; +const CLEAR_SEQUENCE = '\x1b[3J'; + interface StreamClient { response: Response; startTime: number; @@ -39,6 +44,15 @@ function isExitEvent(event: AsciinemaEvent): event is AsciinemaExitEvent { return Array.isArray(event) && event[0] === 'exit'; } +/** + * Checks if an output event contains a terminal clear sequence + * @param event - The asciinema event to check + * @returns true if the event contains a clear sequence + */ +function containsClearSequence(event: AsciinemaEvent): boolean { + return isOutputEvent(event) && event[2].includes(CLEAR_SEQUENCE); +} + interface WatcherInfo { clients: Set; watcher?: fs.FSWatcher; @@ -50,8 +64,10 @@ interface WatcherInfo { export class StreamWatcher { private activeWatchers: Map = new Map(); + private sessionManager: SessionManager; - constructor() { + constructor(sessionManager: SessionManager) { + this.sessionManager = sessionManager; // Clean up notification listeners on exit process.on('beforeExit', () => { this.cleanup(); @@ -82,7 +98,7 @@ export class StreamWatcher { this.activeWatchers.set(sessionId, watcherInfo); // Send existing content first - this.sendExistingContent(streamPath, client); + this.sendExistingContent(sessionId, streamPath, client); // Get current file size and stats if (fs.existsSync(streamPath)) { @@ -99,7 +115,7 @@ export class StreamWatcher { this.startWatching(sessionId, streamPath, watcherInfo); } else { // Send existing content to new client - this.sendExistingContent(streamPath, client); + this.sendExistingContent(sessionId, streamPath, client); } // Add client to set @@ -150,23 +166,91 @@ export class StreamWatcher { /** * Send existing content to a client */ - private sendExistingContent(streamPath: string, client: StreamClient): void { + private sendExistingContent(sessionId: string, streamPath: string, client: StreamClient): void { try { - // First pass: analyze the stream to find the last clear and track resize events - const analysisStream = fs.createReadStream(streamPath, { encoding: 'utf8' }); + // Load existing session info or use defaults, but don't save incomplete session data + const sessionInfo = this.sessionManager.loadSessionInfo(sessionId); + + // Validate offset to ensure we don't read beyond file size + let startOffset = sessionInfo?.lastClearOffset ?? 0; + if (fs.existsSync(streamPath)) { + const stats = fs.statSync(streamPath); + startOffset = Math.min(startOffset, stats.size); + } + + // Read header line separately (first line of file) + // We need to track byte position separately from string length due to UTF-8 encoding + let header: AsciinemaHeader | null = null; + let fd: number | null = null; + try { + fd = fs.openSync(streamPath, 'r'); + const buf = Buffer.alloc(HEADER_READ_BUFFER_SIZE); + let data = ''; + + // Important: Use filePosition (bytes) not data.length (characters) for fs.readSync + // UTF-8 strings have character count != byte count for multi-byte characters + let filePosition = 0; // Track actual byte position in file + let bytesRead = fs.readSync(fd, buf, 0, buf.length, filePosition); + + while (!data.includes('\n') && bytesRead > 0) { + data += buf.toString('utf8', 0, bytesRead); + + // Increment by actual bytes read, not string characters + // This ensures correct file positioning for subsequent reads + filePosition += bytesRead; + + if (!data.includes('\n')) { + // Use filePosition (byte offset) not data.length (character count) + bytesRead = fs.readSync(fd, buf, 0, buf.length, filePosition); + } + } + + const idx = data.indexOf('\n'); + if (idx !== -1) { + header = JSON.parse(data.slice(0, idx)); + } + } catch (e) { + logger.debug(`failed to read asciinema header for session ${sessionId}: ${e}`); + } finally { + // Ensure file descriptor is always closed to prevent leaks + // This executes even if an exception occurs during read operations + if (fd !== null) { + try { + fs.closeSync(fd); + } catch (closeError) { + logger.debug(`failed to close file descriptor: ${closeError}`); + } + } + } + + // Analyze the stream starting from stored offset to find the most recent clear sequence + // This allows us to prune old terminal content and only send what's currently visible + const analysisStream = fs.createReadStream(streamPath, { + encoding: 'utf8', + start: startOffset, + }); let lineBuffer = ''; const events: AsciinemaEvent[] = []; let lastClearIndex = -1; let lastResizeBeforeClear: AsciinemaResizeEvent | null = null; let currentResize: AsciinemaResizeEvent | null = null; - let header: AsciinemaHeader | null = null; + + // Track byte offset in the file for accurate position tracking + // This is crucial for UTF-8 encoded files where character count != byte count + let fileOffset = startOffset; + let lastClearOffset = startOffset; analysisStream.on('data', (chunk: string | Buffer) => { lineBuffer += chunk.toString(); - const lines = lineBuffer.split('\n'); - lineBuffer = lines.pop() || ''; // Keep incomplete line for next chunk + let index = lineBuffer.indexOf('\n'); + while (index !== -1) { + const line = lineBuffer.slice(0, index); + lineBuffer = lineBuffer.slice(index + 1); + + // Calculate byte length of the line plus newline character + // Buffer.byteLength correctly handles multi-byte UTF-8 characters + fileOffset += Buffer.byteLength(line, 'utf8') + 1; - for (const line of lines) { if (line.trim()) { try { const parsed = JSON.parse(line); @@ -185,9 +269,10 @@ export class StreamWatcher { } // Check for clear sequence in output events - if (isOutputEvent(event) && event[2].includes('\x1b[3J')) { + if (containsClearSequence(event)) { lastClearIndex = events.length; lastResizeBeforeClear = currentResize; + lastClearOffset = fileOffset; logger.debug( `found clear sequence at event index ${lastClearIndex}, current resize: ${currentResize ? currentResize[2] : 'none'}` ); @@ -200,6 +285,7 @@ export class StreamWatcher { logger.debug(`skipping invalid JSON line during analysis: ${e}`); } } + index = lineBuffer.indexOf('\n'); } }); @@ -208,6 +294,7 @@ export class StreamWatcher { if (lineBuffer.trim()) { try { const parsed = JSON.parse(lineBuffer); + fileOffset += Buffer.byteLength(lineBuffer, 'utf8'); if (Array.isArray(parsed)) { if (parsed[0] === 'exit') { events.push(parsed as AsciinemaExitEvent); @@ -217,9 +304,10 @@ export class StreamWatcher { if (isResizeEvent(event)) { currentResize = event; } - if (isOutputEvent(event) && event[2].includes('\x1b[3J')) { + if (containsClearSequence(event)) { lastClearIndex = events.length; lastResizeBeforeClear = currentResize; + lastClearOffset = fileOffset; logger.debug( `found clear sequence at event index ${lastClearIndex} (last event)` ); @@ -239,8 +327,16 @@ export class StreamWatcher { // Start from after the last clear startIndex = lastClearIndex + 1; logger.log( - chalk.green(`pruning stream: skipping ${lastClearIndex + 1} events before last clear`) + chalk.green( + `pruning stream: skipping ${lastClearIndex + 1} events before last clear at offset ${lastClearOffset}` + ) ); + + // Persist new clear offset to session only if session already exists + if (sessionInfo) { + sessionInfo.lastClearOffset = lastClearOffset; + this.sessionManager.saveSessionInfo(sessionId, sessionInfo); + } } // Send header first - update dimensions if we have a resize @@ -283,7 +379,7 @@ export class StreamWatcher { analysisStream.on('error', (error) => { logger.error('failed to analyze stream for pruning:', error); // Fall back to original implementation without pruning - this.sendExistingContentWithoutPruning(streamPath, client); + this.sendExistingContentWithoutPruning(sessionId, streamPath, client); }); } catch (error) { logger.error('failed to create read stream:', error); @@ -293,7 +389,11 @@ export class StreamWatcher { /** * Original implementation without pruning (fallback) */ - private sendExistingContentWithoutPruning(streamPath: string, client: StreamClient): void { + private sendExistingContentWithoutPruning( + _sessionId: string, + streamPath: string, + client: StreamClient + ): void { try { const stream = fs.createReadStream(streamPath, { encoding: 'utf8' }); let exitEventFound = false; diff --git a/web/src/shared/types.ts b/web/src/shared/types.ts index d867eabe..c878ec2d 100644 --- a/web/src/shared/types.ts +++ b/web/src/shared/types.ts @@ -22,6 +22,11 @@ export interface SessionInfo { pid?: number; initialCols?: number; initialRows?: number; + /** + * Byte offset of the last clear event in the session stdout file. + * Used to quickly seek to the most recent content when replaying casts. + */ + lastClearOffset?: number; version?: string; // VibeTunnel version that created this session } diff --git a/web/src/test/unit/stream-pruning.test.ts b/web/src/test/unit/stream-pruning.test.ts index bcf1827b..d89880ea 100644 --- a/web/src/test/unit/stream-pruning.test.ts +++ b/web/src/test/unit/stream-pruning.test.ts @@ -3,6 +3,7 @@ import * as fs from 'fs'; import * as os from 'os'; import * as path from 'path'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { SessionManager } from '../../server/pty/session-manager.js'; import type { AsciinemaHeader } from '../../server/pty/types.js'; import { StreamWatcher } from '../../server/services/stream-watcher.js'; import { @@ -35,7 +36,8 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => { locals: {}, }; - streamWatcher = new StreamWatcher(); + const sessionManager = new SessionManager(tempDir); + streamWatcher = new StreamWatcher(sessionManager); }); afterEach(() => { @@ -80,7 +82,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => { // Use reflection to call private method // biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher); - sendExistingContent(filepath, { response: mockResponse, startTime: Date.now() / 1000 }); + sendExistingContent('session1', filepath, { + response: mockResponse, + startTime: Date.now() / 1000, + }); // Wait for async operations to complete await new Promise((resolve) => setTimeout(resolve, 200)); @@ -117,7 +122,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => { // biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher); - sendExistingContent(filepath, { response: mockResponse, startTime: Date.now() / 1000 }); + sendExistingContent('session2', filepath, { + response: mockResponse, + startTime: Date.now() / 1000, + }); await new Promise((resolve) => setTimeout(resolve, 200)); @@ -138,7 +146,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => { // biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher); - sendExistingContent(filepath, { response: mockResponse, startTime: Date.now() / 1000 }); + sendExistingContent('session3', filepath, { + response: mockResponse, + startTime: Date.now() / 1000, + }); await new Promise((resolve) => setTimeout(resolve, 200)); @@ -157,7 +168,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => { // biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher); - sendExistingContent(nonExistentPath, { response: mockResponse, startTime: Date.now() / 1000 }); + sendExistingContent('session4', nonExistentPath, { + response: mockResponse, + startTime: Date.now() / 1000, + }); await new Promise((resolve) => setTimeout(resolve, 200)); @@ -171,7 +185,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => { // biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher); - sendExistingContent(filepath, { response: mockResponse, startTime: Date.now() / 1000 }); + sendExistingContent('session5', filepath, { + response: mockResponse, + startTime: Date.now() / 1000, + }); await new Promise((resolve) => setTimeout(resolve, 200));