refactor: Simplify file watching - remove unnecessary OptimizedFileWatcher

- Integrate stat checking directly into StreamWatcher
- Remove platform-specific code paths that all used fs.watch anyway
- Keep the actual optimization: checking file stats to avoid spurious events
- Simpler, cleaner code with the same benefits

The real improvements remain:
1. Direct notifications for in-process sessions
2. Stat checking to verify actual file changes
3. Only processing when file size increases

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Mario Zechner 2025-06-22 20:57:46 +02:00
parent daa1db3392
commit d7e811412a
2 changed files with 45 additions and 297 deletions

View file

@ -1,264 +0,0 @@
import * as fs from 'fs';
import * as os from 'os';
import { EventEmitter } from 'events';
interface WatchOptions {
persistent?: boolean;
interval?: number; // For polling fallback
}
/**
* Optimized file watcher that uses platform-specific mechanisms for lower latency
*/
export class OptimizedFileWatcher extends EventEmitter {
// Define event types for type safety
on(event: 'change', listener: (stats: fs.Stats) => void): this;
on(event: 'error', listener: (error: Error) => void): this;
on(event: string, listener: (...args: any[]) => void): this {
return super.on(event, listener);
}
emit(event: 'change', stats: fs.Stats): boolean;
emit(event: 'error', error: Error): boolean;
emit(event: string, ...args: any[]): boolean {
return super.emit(event, ...args);
}
private watcher?: fs.FSWatcher;
private pollInterval?: NodeJS.Timeout;
private lastSize: number = 0;
private lastMtime: number = 0;
private readonly platform: string;
constructor(
private readonly filePath: string,
private readonly options: WatchOptions = {}
) {
super();
this.platform = os.platform();
// Get initial stats
try {
const stats = fs.statSync(filePath);
this.lastSize = stats.size;
this.lastMtime = stats.mtimeMs;
} catch (_error) {
// File might not exist yet
this.lastSize = 0;
this.lastMtime = 0;
}
}
/**
* Start watching the file
*/
start(): void {
// Use platform-specific optimizations
switch (this.platform) {
case 'linux':
this.startLinuxWatch();
break;
case 'darwin':
this.startMacWatch();
break;
case 'win32':
this.startWindowsWatch();
break;
default:
// Fallback to standard fs.watch
this.startGenericWatch();
}
}
/**
* Linux-specific watching using inotify (through fs.watch with optimizations)
*/
private startLinuxWatch(): void {
// On Linux, fs.watch uses inotify which is already quite efficient
// But we can optimize by using a more aggressive polling check
// when we detect changes to reduce latency
let rapidPollTimeout: NodeJS.Timeout | null = null;
this.watcher = fs.watch(
this.filePath,
{ persistent: this.options.persistent !== false },
(eventType) => {
if (eventType === 'change') {
// Start rapid polling for 100ms to catch quick successive changes
if (rapidPollTimeout) {
clearTimeout(rapidPollTimeout);
}
// Immediate check
this.checkFileChange();
// Rapid poll for a short period
let pollCount = 0;
const rapidPoll = () => {
this.checkFileChange();
pollCount++;
if (pollCount < 10) {
// Poll 10 times over 100ms
rapidPollTimeout = setTimeout(rapidPoll, 10);
} else {
rapidPollTimeout = null;
}
};
rapidPollTimeout = setTimeout(rapidPoll, 10);
}
}
);
this.watcher.on('error', (error) => {
this.emit('error', error);
// Fallback to polling on error
this.startPolling(50); // Fast polling as fallback
});
}
/**
* macOS-specific watching using FSEvents (through fs.watch with optimizations)
*/
private startMacWatch(): void {
// macOS fs.watch uses FSEvents which can have some latency
// We'll combine it with periodic stat checks for better responsiveness
this.watcher = fs.watch(
this.filePath,
{ persistent: this.options.persistent !== false },
(eventType) => {
if (eventType === 'change') {
this.checkFileChange();
}
}
);
this.watcher.on('error', (error) => {
this.emit('error', error);
this.startPolling(50);
});
// Also add a periodic check every 50ms for better latency on macOS
// FSEvents can sometimes batch changes causing delays
this.startPolling(50);
}
/**
* Windows-specific watching with optimizations
*/
private startWindowsWatch(): void {
// Windows fs.watch uses ReadDirectoryChangesW which is quite responsive
// But we'll add some optimizations for better performance
this.watcher = fs.watch(
this.filePath,
{ persistent: this.options.persistent !== false },
(eventType) => {
if (eventType === 'change') {
// On Windows, we might get multiple events for a single change
// Debounce by checking actual file stats
this.checkFileChange();
}
}
);
this.watcher.on('error', (error) => {
this.emit('error', error);
this.startPolling(50);
});
}
/**
* Generic watching fallback
*/
private startGenericWatch(): void {
this.watcher = fs.watch(
this.filePath,
{ persistent: this.options.persistent !== false },
(eventType) => {
if (eventType === 'change') {
this.checkFileChange();
}
}
);
this.watcher.on('error', (error) => {
this.emit('error', error);
this.startPolling(100);
});
}
/**
* Start polling as a fallback mechanism
*/
private startPolling(interval: number): void {
if (this.pollInterval) {
clearInterval(this.pollInterval);
}
this.pollInterval = setInterval(() => {
this.checkFileChange();
}, interval);
}
/**
* Check if file has actually changed by comparing stats
*/
private checkFileChange(): void {
try {
const stats = fs.statSync(this.filePath);
// Check if size or modification time changed
if (stats.size !== this.lastSize || stats.mtimeMs !== this.lastMtime) {
// Only emit if size increased (for append-only files like asciinema)
if (stats.size > this.lastSize) {
this.lastSize = stats.size;
this.lastMtime = stats.mtimeMs;
this.emit('change', stats);
} else if (stats.size !== this.lastSize) {
// File was truncated or replaced
this.lastSize = stats.size;
this.lastMtime = stats.mtimeMs;
this.emit('change', stats);
}
}
} catch (error) {
// File might have been deleted
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
this.emit('error', error as Error);
}
}
}
/**
* Stop watching
*/
stop(): void {
if (this.watcher) {
this.watcher.close();
this.watcher = undefined;
}
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = undefined;
}
}
/**
* Check if watcher is active
*/
isWatching(): boolean {
return !!this.watcher || !!this.pollInterval;
}
}
/**
* Factory function to create an optimized file watcher
*/
export function createOptimizedFileWatcher(
filePath: string,
options?: WatchOptions
): OptimizedFileWatcher {
return new OptimizedFileWatcher(filePath, options);
}

View file

@ -1,16 +1,18 @@
import * as fs from 'fs';
import { OptimizedFileWatcher } from './optimized-file-watcher.js';
import { streamNotifier } from './stream-notifier.js';
import { Response } from 'express';
interface StreamClient {
response: import('express').Response;
response: Response;
startTime: number;
}
interface WatcherInfo {
clients: Set<StreamClient>;
watcher?: OptimizedFileWatcher;
watcher?: fs.FSWatcher;
lastOffset: number;
lastSize: number;
lastMtime: number;
lineBuffer: string;
notificationListener?: (update: { sessionId: string; data: string; timestamp: number }) => void;
}
@ -28,7 +30,7 @@ export class StreamWatcher {
/**
* Add a client to watch a stream file
*/
addClient(sessionId: string, streamPath: string, response: import('express').Response): void {
addClient(sessionId: string, streamPath: string, response: Response): void {
const startTime = Date.now() / 1000;
const client: StreamClient = { response, startTime };
@ -39,6 +41,8 @@ export class StreamWatcher {
watcherInfo = {
clients: new Set(),
lastOffset: 0,
lastSize: 0,
lastMtime: 0,
lineBuffer: '',
};
this.activeWatchers.set(sessionId, watcherInfo);
@ -46,10 +50,12 @@ export class StreamWatcher {
// Send existing content first
this.sendExistingContent(streamPath, client);
// Get current file size
// Get current file size and stats
if (fs.existsSync(streamPath)) {
const stats = fs.statSync(streamPath);
watcherInfo.lastOffset = stats.size;
watcherInfo.lastSize = stats.size;
watcherInfo.lastMtime = stats.mtimeMs;
}
// Start watching for new content
@ -92,7 +98,7 @@ export class StreamWatcher {
if (watcherInfo.clients.size === 0) {
console.log(`[STREAM] No more clients for session ${sessionId}, stopping watcher`);
if (watcherInfo.watcher) {
watcherInfo.watcher.stop();
watcherInfo.watcher.close();
}
// Remove notification listener
if (watcherInfo.notificationListener) {
@ -226,46 +232,52 @@ export class StreamWatcher {
} else {
console.log(`[STREAM] Using file watcher for session ${sessionId} (cross-process)`);
// Use optimized file watcher for cross-process scenarios
watcherInfo.watcher = new OptimizedFileWatcher(streamPath, { persistent: true });
// Use standard fs.watch with stat checking for cross-process scenarios
watcherInfo.watcher = fs.watch(streamPath, { persistent: true }, (eventType) => {
if (eventType === 'change') {
try {
// Check if file actually changed by comparing stats
const stats = fs.statSync(streamPath);
watcherInfo.watcher.on('change', (stats) => {
try {
if (stats.size > watcherInfo.lastOffset) {
// Read only new data
const fd = fs.openSync(streamPath, 'r');
const buffer = Buffer.alloc(stats.size - watcherInfo.lastOffset);
fs.readSync(fd, buffer, 0, buffer.length, watcherInfo.lastOffset);
fs.closeSync(fd);
// Only process if size increased (append-only file)
if (stats.size > watcherInfo.lastSize || stats.mtimeMs > watcherInfo.lastMtime) {
watcherInfo.lastSize = stats.size;
watcherInfo.lastMtime = stats.mtimeMs;
// Update offset
watcherInfo.lastOffset = stats.size;
// Read only new data
if (stats.size > watcherInfo.lastOffset) {
const fd = fs.openSync(streamPath, 'r');
const buffer = Buffer.alloc(stats.size - watcherInfo.lastOffset);
fs.readSync(fd, buffer, 0, buffer.length, watcherInfo.lastOffset);
fs.closeSync(fd);
// Process new data
const newData = buffer.toString('utf8');
watcherInfo.lineBuffer += newData;
// Update offset
watcherInfo.lastOffset = stats.size;
// Process complete lines
const lines = watcherInfo.lineBuffer.split('\n');
watcherInfo.lineBuffer = lines.pop() || '';
// Process new data
const newData = buffer.toString('utf8');
watcherInfo.lineBuffer += newData;
for (const line of lines) {
if (line.trim()) {
this.broadcastLine(sessionId, line, watcherInfo);
// Process complete lines
const lines = watcherInfo.lineBuffer.split('\n');
watcherInfo.lineBuffer = lines.pop() || '';
for (const line of lines) {
if (line.trim()) {
this.broadcastLine(sessionId, line, watcherInfo);
}
}
}
}
} catch (error) {
console.error(`[STREAM] Error reading file changes:`, error);
}
} catch (error) {
console.error(`[STREAM] Error reading file changes:`, error);
}
});
watcherInfo.watcher.on('error', (error) => {
console.error(`[STREAM] File watcher error for session ${sessionId}:`, error);
});
// Start the watcher
watcherInfo.watcher.start();
}
}
@ -339,7 +351,7 @@ export class StreamWatcher {
private cleanup(): void {
for (const [_sessionId, watcherInfo] of this.activeWatchers) {
if (watcherInfo.watcher) {
watcherInfo.watcher.stop();
watcherInfo.watcher.close();
}
if (watcherInfo.notificationListener) {
streamNotifier.removeListener('stream-update', watcherInfo.notificationListener);