fix: Prevent duplicate notifications and improve latency

- Only use file watcher if no direct notifications are available
- Remove unnecessary deduplication logic
- Clean up logging for direct notifications
- Wait 100ms to detect if we're getting direct notifications before
  starting file watcher (for cross-process scenarios)

This should eliminate duplicate broadcasts and improve latency for
server-created sessions while maintaining compatibility with fwd.ts

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Mario Zechner 2025-06-22 20:36:30 +02:00
parent 76512c19c4
commit 52feefddf2
2 changed files with 101 additions and 57 deletions

56
web/scripts/latency-measure.js Executable file
View file

@ -0,0 +1,56 @@
#!/usr/bin/env node
/**
* Precise latency measurement for terminal rendering
*
* This script outputs timestamped markers to measure actual end-to-end latency
*/
const MARKER = '█';
const INTERVAL = 100; // 100ms intervals for easier measurement
console.log('Latency measurement test - watch for delay between timestamp and display\n');
let count = 0;
const startTime = Date.now();
const interval = setInterval(() => {
count++;
const now = Date.now();
const elapsed = now - startTime;
const timestamp = new Date(now).toISOString().split('T')[1].slice(0, -1);
// Output with high-precision timestamp
console.log(`[${count.toString().padStart(3, '0')}] ${timestamp} ${MARKER.repeat(10)} MARKER`);
// Flush stdout to ensure immediate output
if (process.stdout.isTTY) {
process.stdout.write('');
}
if (count >= 50) { // 5 seconds of data
clearInterval(interval);
console.log('\nTest complete. Compare the timestamps with when you see them appear.');
console.log('The difference is your end-to-end latency.');
process.exit(0);
}
}, INTERVAL);
// Also test immediate response to keypress
if (process.stdin.isTTY) {
process.stdin.setRawMode(true);
process.stdin.on('data', (data) => {
const key = data.toString();
const now = Date.now();
const timestamp = new Date(now).toISOString().split('T')[1].slice(0, -1);
if (key === '\x03') { // Ctrl+C
clearInterval(interval);
process.exit(0);
}
console.log(`[KEY] ${timestamp} You pressed: ${key.charCodeAt(0)} ${MARKER.repeat(20)}`);
});
}
console.log('Press any key to test input latency (Ctrl+C to exit)\n');

View file

@ -13,8 +13,6 @@ interface WatcherInfo {
lastOffset: number; lastOffset: number;
lineBuffer: string; lineBuffer: string;
notificationListener?: (update: { sessionId: string; data: string; timestamp: number }) => void; notificationListener?: (update: { sessionId: string; data: string; timestamp: number }) => void;
lastBroadcastTime: number;
recentBroadcasts: Set<string>;
} }
export class StreamWatcher { export class StreamWatcher {
@ -42,8 +40,6 @@ export class StreamWatcher {
clients: new Set(), clients: new Set(),
lastOffset: 0, lastOffset: 0,
lineBuffer: '', lineBuffer: '',
lastBroadcastTime: 0,
recentBroadcasts: new Set(),
}; };
this.activeWatchers.set(sessionId, watcherInfo); this.activeWatchers.set(sessionId, watcherInfo);
@ -211,8 +207,11 @@ export class StreamWatcher {
*/ */
private startWatching(sessionId: string, streamPath: string, watcherInfo: WatcherInfo): void { private startWatching(sessionId: string, streamPath: string, watcherInfo: WatcherInfo): void {
// First, set up direct notification listener for lowest latency // First, set up direct notification listener for lowest latency
let hasDirectNotifications = false;
watcherInfo.notificationListener = (update) => { watcherInfo.notificationListener = (update) => {
if (update.sessionId === sessionId) { if (update.sessionId === sessionId) {
hasDirectNotifications = true;
// Process the notification data directly // Process the notification data directly
const lines = update.data.split('\n').filter((line) => line.trim()); const lines = update.data.split('\n').filter((line) => line.trim());
for (const line of lines) { for (const line of lines) {
@ -222,7 +221,14 @@ export class StreamWatcher {
}; };
streamNotifier.on('stream-update', watcherInfo.notificationListener); streamNotifier.on('stream-update', watcherInfo.notificationListener);
// Also use optimized file watcher as fallback (for cross-process scenarios) // Only use file watcher if we're not getting direct notifications
// Give it a moment to see if we get direct notifications
setTimeout(() => {
if (!hasDirectNotifications) {
console.log(
`[STREAM] No direct notifications for session ${sessionId}, using file watcher`
);
// Use optimized file watcher as fallback (for cross-process scenarios)
watcherInfo.watcher = new OptimizedFileWatcher(streamPath, { persistent: true }); watcherInfo.watcher = new OptimizedFileWatcher(streamPath, { persistent: true });
watcherInfo.watcher.on('change', (stats) => { watcherInfo.watcher.on('change', (stats) => {
@ -245,7 +251,6 @@ export class StreamWatcher {
const lines = watcherInfo.lineBuffer.split('\n'); const lines = watcherInfo.lineBuffer.split('\n');
watcherInfo.lineBuffer = lines.pop() || ''; watcherInfo.lineBuffer = lines.pop() || '';
console.log(`[STREAM] New data from file watcher: ${newData}`);
for (const line of lines) { for (const line of lines) {
if (line.trim()) { if (line.trim()) {
this.broadcastLine(sessionId, line, watcherInfo); this.broadcastLine(sessionId, line, watcherInfo);
@ -263,33 +268,16 @@ export class StreamWatcher {
// Start the watcher // Start the watcher
watcherInfo.watcher.start(); watcherInfo.watcher.start();
}
}, 100); // Wait 100ms to see if we get direct notifications
console.log( console.log(`[STREAM] Started watching for session ${sessionId}`);
`[STREAM] Started optimized watching with direct notifications for session ${sessionId}`
);
} }
/** /**
* Broadcast a line to all clients * Broadcast a line to all clients
*/ */
private broadcastLine(sessionId: string, line: string, watcherInfo: WatcherInfo): void { private broadcastLine(sessionId: string, line: string, watcherInfo: WatcherInfo): void {
// Deduplication: check if we've broadcast this line very recently
const now = Date.now();
const lineHash = `${line.substring(0, 100)}_${line.length}`; // Simple hash
if (watcherInfo.recentBroadcasts.has(lineHash) && now - watcherInfo.lastBroadcastTime < 50) {
// Skip duplicate within 50ms window
return;
}
// Clean up old broadcasts
if (now - watcherInfo.lastBroadcastTime > 100) {
watcherInfo.recentBroadcasts.clear();
}
watcherInfo.recentBroadcasts.add(lineHash);
watcherInfo.lastBroadcastTime = now;
let eventData: string | null = null; let eventData: string | null = null;
try { try {