diff --git a/web/src/server/services/optimized-file-watcher.ts b/web/src/server/services/optimized-file-watcher.ts deleted file mode 100644 index 5c00604b..00000000 --- a/web/src/server/services/optimized-file-watcher.ts +++ /dev/null @@ -1,264 +0,0 @@ -import * as fs from 'fs'; -import * as os from 'os'; -import { EventEmitter } from 'events'; - -interface WatchOptions { - persistent?: boolean; - interval?: number; // For polling fallback -} - -/** - * Optimized file watcher that uses platform-specific mechanisms for lower latency - */ -export class OptimizedFileWatcher extends EventEmitter { - // Define event types for type safety - on(event: 'change', listener: (stats: fs.Stats) => void): this; - on(event: 'error', listener: (error: Error) => void): this; - on(event: string, listener: (...args: any[]) => void): this { - return super.on(event, listener); - } - - emit(event: 'change', stats: fs.Stats): boolean; - emit(event: 'error', error: Error): boolean; - emit(event: string, ...args: any[]): boolean { - return super.emit(event, ...args); - } - private watcher?: fs.FSWatcher; - private pollInterval?: NodeJS.Timeout; - private lastSize: number = 0; - private lastMtime: number = 0; - private readonly platform: string; - - constructor( - private readonly filePath: string, - private readonly options: WatchOptions = {} - ) { - super(); - this.platform = os.platform(); - - // Get initial stats - try { - const stats = fs.statSync(filePath); - this.lastSize = stats.size; - this.lastMtime = stats.mtimeMs; - } catch (_error) { - // File might not exist yet - this.lastSize = 0; - this.lastMtime = 0; - } - } - - /** - * Start watching the file - */ - start(): void { - // Use platform-specific optimizations - switch (this.platform) { - case 'linux': - this.startLinuxWatch(); - break; - case 'darwin': - this.startMacWatch(); - break; - case 'win32': - this.startWindowsWatch(); - break; - default: - // Fallback to standard fs.watch - this.startGenericWatch(); - } - } - - /** - * Linux-specific watching using inotify (through fs.watch with optimizations) - */ - private startLinuxWatch(): void { - // On Linux, fs.watch uses inotify which is already quite efficient - // But we can optimize by using a more aggressive polling check - // when we detect changes to reduce latency - - let rapidPollTimeout: NodeJS.Timeout | null = null; - - this.watcher = fs.watch( - this.filePath, - { persistent: this.options.persistent !== false }, - (eventType) => { - if (eventType === 'change') { - // Start rapid polling for 100ms to catch quick successive changes - if (rapidPollTimeout) { - clearTimeout(rapidPollTimeout); - } - - // Immediate check - this.checkFileChange(); - - // Rapid poll for a short period - let pollCount = 0; - const rapidPoll = () => { - this.checkFileChange(); - pollCount++; - if (pollCount < 10) { - // Poll 10 times over 100ms - rapidPollTimeout = setTimeout(rapidPoll, 10); - } else { - rapidPollTimeout = null; - } - }; - rapidPollTimeout = setTimeout(rapidPoll, 10); - } - } - ); - - this.watcher.on('error', (error) => { - this.emit('error', error); - // Fallback to polling on error - this.startPolling(50); // Fast polling as fallback - }); - } - - /** - * macOS-specific watching using FSEvents (through fs.watch with optimizations) - */ - private startMacWatch(): void { - // macOS fs.watch uses FSEvents which can have some latency - // We'll combine it with periodic stat checks for better responsiveness - - this.watcher = fs.watch( - this.filePath, - { persistent: this.options.persistent !== false }, - (eventType) => { - if (eventType === 'change') { - this.checkFileChange(); - } - } - ); - - this.watcher.on('error', (error) => { - this.emit('error', error); - this.startPolling(50); - }); - - // Also add a periodic check every 50ms for better latency on macOS - // FSEvents can sometimes batch changes causing delays - this.startPolling(50); - } - - /** - * Windows-specific watching with optimizations - */ - private startWindowsWatch(): void { - // Windows fs.watch uses ReadDirectoryChangesW which is quite responsive - // But we'll add some optimizations for better performance - - this.watcher = fs.watch( - this.filePath, - { persistent: this.options.persistent !== false }, - (eventType) => { - if (eventType === 'change') { - // On Windows, we might get multiple events for a single change - // Debounce by checking actual file stats - this.checkFileChange(); - } - } - ); - - this.watcher.on('error', (error) => { - this.emit('error', error); - this.startPolling(50); - }); - } - - /** - * Generic watching fallback - */ - private startGenericWatch(): void { - this.watcher = fs.watch( - this.filePath, - { persistent: this.options.persistent !== false }, - (eventType) => { - if (eventType === 'change') { - this.checkFileChange(); - } - } - ); - - this.watcher.on('error', (error) => { - this.emit('error', error); - this.startPolling(100); - }); - } - - /** - * Start polling as a fallback mechanism - */ - private startPolling(interval: number): void { - if (this.pollInterval) { - clearInterval(this.pollInterval); - } - - this.pollInterval = setInterval(() => { - this.checkFileChange(); - }, interval); - } - - /** - * Check if file has actually changed by comparing stats - */ - private checkFileChange(): void { - try { - const stats = fs.statSync(this.filePath); - - // Check if size or modification time changed - if (stats.size !== this.lastSize || stats.mtimeMs !== this.lastMtime) { - // Only emit if size increased (for append-only files like asciinema) - if (stats.size > this.lastSize) { - this.lastSize = stats.size; - this.lastMtime = stats.mtimeMs; - this.emit('change', stats); - } else if (stats.size !== this.lastSize) { - // File was truncated or replaced - this.lastSize = stats.size; - this.lastMtime = stats.mtimeMs; - this.emit('change', stats); - } - } - } catch (error) { - // File might have been deleted - if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { - this.emit('error', error as Error); - } - } - } - - /** - * Stop watching - */ - stop(): void { - if (this.watcher) { - this.watcher.close(); - this.watcher = undefined; - } - - if (this.pollInterval) { - clearInterval(this.pollInterval); - this.pollInterval = undefined; - } - } - - /** - * Check if watcher is active - */ - isWatching(): boolean { - return !!this.watcher || !!this.pollInterval; - } -} - -/** - * Factory function to create an optimized file watcher - */ -export function createOptimizedFileWatcher( - filePath: string, - options?: WatchOptions -): OptimizedFileWatcher { - return new OptimizedFileWatcher(filePath, options); -} diff --git a/web/src/server/services/stream-watcher.ts b/web/src/server/services/stream-watcher.ts index 92553eb5..d35841af 100644 --- a/web/src/server/services/stream-watcher.ts +++ b/web/src/server/services/stream-watcher.ts @@ -1,16 +1,18 @@ import * as fs from 'fs'; -import { OptimizedFileWatcher } from './optimized-file-watcher.js'; import { streamNotifier } from './stream-notifier.js'; +import { Response } from 'express'; interface StreamClient { - response: import('express').Response; + response: Response; startTime: number; } interface WatcherInfo { clients: Set; - watcher?: OptimizedFileWatcher; + watcher?: fs.FSWatcher; lastOffset: number; + lastSize: number; + lastMtime: number; lineBuffer: string; notificationListener?: (update: { sessionId: string; data: string; timestamp: number }) => void; } @@ -28,7 +30,7 @@ export class StreamWatcher { /** * Add a client to watch a stream file */ - addClient(sessionId: string, streamPath: string, response: import('express').Response): void { + addClient(sessionId: string, streamPath: string, response: Response): void { const startTime = Date.now() / 1000; const client: StreamClient = { response, startTime }; @@ -39,6 +41,8 @@ export class StreamWatcher { watcherInfo = { clients: new Set(), lastOffset: 0, + lastSize: 0, + lastMtime: 0, lineBuffer: '', }; this.activeWatchers.set(sessionId, watcherInfo); @@ -46,10 +50,12 @@ export class StreamWatcher { // Send existing content first this.sendExistingContent(streamPath, client); - // Get current file size + // Get current file size and stats if (fs.existsSync(streamPath)) { const stats = fs.statSync(streamPath); watcherInfo.lastOffset = stats.size; + watcherInfo.lastSize = stats.size; + watcherInfo.lastMtime = stats.mtimeMs; } // Start watching for new content @@ -92,7 +98,7 @@ export class StreamWatcher { if (watcherInfo.clients.size === 0) { console.log(`[STREAM] No more clients for session ${sessionId}, stopping watcher`); if (watcherInfo.watcher) { - watcherInfo.watcher.stop(); + watcherInfo.watcher.close(); } // Remove notification listener if (watcherInfo.notificationListener) { @@ -226,46 +232,52 @@ export class StreamWatcher { } else { console.log(`[STREAM] Using file watcher for session ${sessionId} (cross-process)`); - // Use optimized file watcher for cross-process scenarios - watcherInfo.watcher = new OptimizedFileWatcher(streamPath, { persistent: true }); + // Use standard fs.watch with stat checking for cross-process scenarios + watcherInfo.watcher = fs.watch(streamPath, { persistent: true }, (eventType) => { + if (eventType === 'change') { + try { + // Check if file actually changed by comparing stats + const stats = fs.statSync(streamPath); - watcherInfo.watcher.on('change', (stats) => { - try { - if (stats.size > watcherInfo.lastOffset) { - // Read only new data - const fd = fs.openSync(streamPath, 'r'); - const buffer = Buffer.alloc(stats.size - watcherInfo.lastOffset); - fs.readSync(fd, buffer, 0, buffer.length, watcherInfo.lastOffset); - fs.closeSync(fd); + // Only process if size increased (append-only file) + if (stats.size > watcherInfo.lastSize || stats.mtimeMs > watcherInfo.lastMtime) { + watcherInfo.lastSize = stats.size; + watcherInfo.lastMtime = stats.mtimeMs; - // Update offset - watcherInfo.lastOffset = stats.size; + // Read only new data + if (stats.size > watcherInfo.lastOffset) { + const fd = fs.openSync(streamPath, 'r'); + const buffer = Buffer.alloc(stats.size - watcherInfo.lastOffset); + fs.readSync(fd, buffer, 0, buffer.length, watcherInfo.lastOffset); + fs.closeSync(fd); - // Process new data - const newData = buffer.toString('utf8'); - watcherInfo.lineBuffer += newData; + // Update offset + watcherInfo.lastOffset = stats.size; - // Process complete lines - const lines = watcherInfo.lineBuffer.split('\n'); - watcherInfo.lineBuffer = lines.pop() || ''; + // Process new data + const newData = buffer.toString('utf8'); + watcherInfo.lineBuffer += newData; - for (const line of lines) { - if (line.trim()) { - this.broadcastLine(sessionId, line, watcherInfo); + // Process complete lines + const lines = watcherInfo.lineBuffer.split('\n'); + watcherInfo.lineBuffer = lines.pop() || ''; + + for (const line of lines) { + if (line.trim()) { + this.broadcastLine(sessionId, line, watcherInfo); + } + } } } + } catch (error) { + console.error(`[STREAM] Error reading file changes:`, error); } - } catch (error) { - console.error(`[STREAM] Error reading file changes:`, error); } }); watcherInfo.watcher.on('error', (error) => { console.error(`[STREAM] File watcher error for session ${sessionId}:`, error); }); - - // Start the watcher - watcherInfo.watcher.start(); } } @@ -339,7 +351,7 @@ export class StreamWatcher { private cleanup(): void { for (const [_sessionId, watcherInfo] of this.activeWatchers) { if (watcherInfo.watcher) { - watcherInfo.watcher.stop(); + watcherInfo.watcher.close(); } if (watcherInfo.notificationListener) { streamNotifier.removeListener('stream-update', watcherInfo.notificationListener);