fix: Simplify notification logic - use direct or file, never both

- Check upfront if we have listeners for direct notifications
- Use EITHER direct notifications OR file watcher, not both
- This eliminates any possibility of duplicate broadcasts
- Server sessions get instant updates via direct notifications
- Forwarded sessions use optimized file watcher

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Mario Zechner 2025-06-22 20:41:13 +02:00
parent 52feefddf2
commit c43a549ed8

View file

@ -206,72 +206,67 @@ export class StreamWatcher {
* Start watching a file for changes * Start watching a file for changes
*/ */
private startWatching(sessionId: string, streamPath: string, watcherInfo: WatcherInfo): void { private startWatching(sessionId: string, streamPath: string, watcherInfo: WatcherInfo): void {
// First, set up direct notification listener for lowest latency // Check if we should use direct notifications or file watching
let hasDirectNotifications = false; const hasListeners = streamNotifier.hasListeners(sessionId);
watcherInfo.notificationListener = (update) => { if (hasListeners) {
if (update.sessionId === sessionId) { console.log(`[STREAM] Using direct notifications for session ${sessionId}`);
hasDirectNotifications = true;
// Process the notification data directly // Set up direct notification listener for lowest latency
const lines = update.data.split('\n').filter((line) => line.trim()); watcherInfo.notificationListener = (update) => {
for (const line of lines) { if (update.sessionId === sessionId) {
this.broadcastLine(sessionId, line, watcherInfo); // Process the notification data directly
const lines = update.data.split('\n').filter((line) => line.trim());
for (const line of lines) {
this.broadcastLine(sessionId, line, watcherInfo);
}
} }
} };
}; streamNotifier.on('stream-update', watcherInfo.notificationListener);
streamNotifier.on('stream-update', watcherInfo.notificationListener); } else {
console.log(`[STREAM] Using file watcher for session ${sessionId} (cross-process)`);
// Only use file watcher if we're not getting direct notifications // Use optimized file watcher for cross-process scenarios
// Give it a moment to see if we get direct notifications watcherInfo.watcher = new OptimizedFileWatcher(streamPath, { persistent: true });
setTimeout(() => {
if (!hasDirectNotifications) {
console.log(
`[STREAM] No direct notifications for session ${sessionId}, using file watcher`
);
// Use optimized file watcher as fallback (for cross-process scenarios)
watcherInfo.watcher = new OptimizedFileWatcher(streamPath, { persistent: true });
watcherInfo.watcher.on('change', (stats) => { watcherInfo.watcher.on('change', (stats) => {
try { try {
if (stats.size > watcherInfo.lastOffset) { if (stats.size > watcherInfo.lastOffset) {
// Read only new data // Read only new data
const fd = fs.openSync(streamPath, 'r'); const fd = fs.openSync(streamPath, 'r');
const buffer = Buffer.alloc(stats.size - watcherInfo.lastOffset); const buffer = Buffer.alloc(stats.size - watcherInfo.lastOffset);
fs.readSync(fd, buffer, 0, buffer.length, watcherInfo.lastOffset); fs.readSync(fd, buffer, 0, buffer.length, watcherInfo.lastOffset);
fs.closeSync(fd); fs.closeSync(fd);
// Update offset // Update offset
watcherInfo.lastOffset = stats.size; watcherInfo.lastOffset = stats.size;
// Process new data // Process new data
const newData = buffer.toString('utf8'); const newData = buffer.toString('utf8');
watcherInfo.lineBuffer += newData; watcherInfo.lineBuffer += newData;
// Process complete lines // Process complete lines
const lines = watcherInfo.lineBuffer.split('\n'); const lines = watcherInfo.lineBuffer.split('\n');
watcherInfo.lineBuffer = lines.pop() || ''; watcherInfo.lineBuffer = lines.pop() || '';
for (const line of lines) { for (const line of lines) {
if (line.trim()) { if (line.trim()) {
this.broadcastLine(sessionId, line, watcherInfo); this.broadcastLine(sessionId, line, watcherInfo);
}
} }
} }
} catch (error) {
console.error(`[STREAM] Error reading file changes:`, error);
} }
}); } catch (error) {
console.error(`[STREAM] Error reading file changes:`, error);
}
});
watcherInfo.watcher.on('error', (error) => { watcherInfo.watcher.on('error', (error) => {
console.error(`[STREAM] File watcher error for session ${sessionId}:`, error); console.error(`[STREAM] File watcher error for session ${sessionId}:`, error);
}); });
// Start the watcher // Start the watcher
watcherInfo.watcher.start(); watcherInfo.watcher.start();
} }
}, 100); // Wait 100ms to see if we get direct notifications
console.log(`[STREAM] Started watching for session ${sessionId}`);
} }
/** /**