refactor: Remove unused stream-notifier system

The direct notification system wasn't being used since hasListeners()
was checking before any listeners were set up. All sessions were using
file watching anyway. Simplified the code by removing the unused
notification system and keeping only the optimized file watching.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Mario Zechner 2025-06-22 21:09:57 +02:00
parent 312786230b
commit 1bb15097a4
3 changed files with 35 additions and 150 deletions

View file

@ -8,7 +8,6 @@
import * as fs from 'fs';
import * as path from 'path';
import { AsciinemaHeader, AsciinemaEvent, PtyError } from './types.js';
import { streamNotifier } from '../services/stream-notifier.js';
export class AsciinemaWriter {
private writeStream: fs.WriteStream;
@ -16,17 +15,11 @@ export class AsciinemaWriter {
private utf8Buffer: Buffer = Buffer.alloc(0);
private headerWritten = false;
private fd: number | null = null;
private sessionId: string;
constructor(
private filePath: string,
private header: AsciinemaHeader
) {
// Extract session ID from file path
const pathParts = this.filePath.split(path.sep);
const controlIndex = pathParts.indexOf('control');
this.sessionId =
controlIndex >= 0 && pathParts[controlIndex + 1] ? pathParts[controlIndex + 1] : 'unknown';
this.startTime = new Date();
// Ensure directory exists
@ -83,11 +76,6 @@ export class AsciinemaWriter {
const headerJson = JSON.stringify(this.header);
this.writeStream.write(headerJson + '\n');
this.headerWritten = true;
// Notify about header
if (streamNotifier.hasListeners(this.sessionId)) {
streamNotifier.notifyStreamUpdate(this.sessionId, headerJson + '\n');
}
}
/**
@ -160,11 +148,6 @@ export class AsciinemaWriter {
writeRawJson(jsonValue: unknown): void {
const jsonString = JSON.stringify(jsonValue);
this.writeStream.write(jsonString + '\n');
// Direct notification for exit events
if (streamNotifier.hasListeners(this.sessionId)) {
streamNotifier.notifyStreamUpdate(this.sessionId, jsonString + '\n');
}
}
/**
@ -176,13 +159,7 @@ export class AsciinemaWriter {
const eventJson = JSON.stringify(eventArray);
this.writeStream.write(eventJson + '\n');
// Direct notification for lowest latency
if (streamNotifier.hasListeners(this.sessionId)) {
streamNotifier.notifyStreamUpdate(this.sessionId, eventJson + '\n');
}
// Force immediate disk write to trigger file watchers
// This is critical for real-time performance with forwarded sessions
if (this.fd !== null) {
try {
fs.fsyncSync(this.fd);

View file

@ -1,64 +0,0 @@
import { EventEmitter } from 'events';
interface StreamUpdate {
sessionId: string;
data: string;
timestamp: number;
}
/**
* Global event emitter for direct in-process notifications between
* AsciinemaWriter and StreamWatcher to bypass file watching latency
*/
export class StreamNotifier extends EventEmitter {
// Define event types for type safety
on(event: 'stream-update', listener: (update: StreamUpdate) => void): this;
on(event: string, listener: (...args: any[]) => void): this {
return super.on(event, listener);
}
emit(event: 'stream-update', update: StreamUpdate): boolean;
emit(event: string, ...args: any[]): boolean {
return super.emit(event, ...args);
}
private static instance: StreamNotifier;
private constructor() {
super();
// Increase max listeners as we might have many sessions
this.setMaxListeners(1000);
}
/**
* Get singleton instance
*/
static getInstance(): StreamNotifier {
if (!StreamNotifier.instance) {
StreamNotifier.instance = new StreamNotifier();
}
return StreamNotifier.instance;
}
/**
* Notify about new stream data
*/
notifyStreamUpdate(sessionId: string, data: string): void {
const update: StreamUpdate = {
sessionId,
data,
timestamp: Date.now(),
};
this.emit('stream-update', update);
}
/**
* Check if there are any listeners for a session
*/
hasListeners(_sessionId: string): boolean {
// Check if there are any listeners at all
return this.listenerCount('stream-update') > 0;
}
}
// Export singleton instance
export const streamNotifier = StreamNotifier.getInstance();

View file

@ -1,5 +1,4 @@
import * as fs from 'fs';
import { streamNotifier } from './stream-notifier.js';
import { Response } from 'express';
interface StreamClient {
@ -14,7 +13,6 @@ interface WatcherInfo {
lastSize: number;
lastMtime: number;
lineBuffer: string;
notificationListener?: (update: { sessionId: string; data: string; timestamp: number }) => void;
}
export class StreamWatcher {
@ -100,10 +98,6 @@ export class StreamWatcher {
if (watcherInfo.watcher) {
watcherInfo.watcher.close();
}
// Remove notification listener
if (watcherInfo.notificationListener) {
streamNotifier.removeListener('stream-update', watcherInfo.notificationListener);
}
this.activeWatchers.delete(sessionId);
}
}
@ -187,73 +181,54 @@ export class StreamWatcher {
* Start watching a file for changes
*/
private startWatching(sessionId: string, streamPath: string, watcherInfo: WatcherInfo): void {
// Check if we should use direct notifications or file watching
const hasListeners = streamNotifier.hasListeners(sessionId);
console.log(`[STREAM] Using file watcher for session ${sessionId}`);
if (hasListeners) {
console.log(`[STREAM] Using direct notifications for session ${sessionId}`);
// Use standard fs.watch with stat checking
watcherInfo.watcher = fs.watch(streamPath, { persistent: true }, (eventType) => {
if (eventType === 'change') {
try {
// Check if file actually changed by comparing stats
const stats = fs.statSync(streamPath);
// Set up direct notification listener for lowest latency
watcherInfo.notificationListener = (update) => {
if (update.sessionId === sessionId) {
// Process the notification data directly
const lines = update.data.split('\n').filter((line) => line.trim());
for (const line of lines) {
this.broadcastLine(sessionId, line, watcherInfo);
}
}
};
streamNotifier.on('stream-update', watcherInfo.notificationListener);
} else {
console.log(`[STREAM] Using file watcher for session ${sessionId} (cross-process)`);
// Only process if size increased (append-only file)
if (stats.size > watcherInfo.lastSize || stats.mtimeMs > watcherInfo.lastMtime) {
watcherInfo.lastSize = stats.size;
watcherInfo.lastMtime = stats.mtimeMs;
// Use standard fs.watch with stat checking for cross-process scenarios
watcherInfo.watcher = fs.watch(streamPath, { persistent: true }, (eventType) => {
if (eventType === 'change') {
try {
// Check if file actually changed by comparing stats
const stats = fs.statSync(streamPath);
// Read only new data
if (stats.size > watcherInfo.lastOffset) {
const fd = fs.openSync(streamPath, 'r');
const buffer = Buffer.alloc(stats.size - watcherInfo.lastOffset);
fs.readSync(fd, buffer, 0, buffer.length, watcherInfo.lastOffset);
fs.closeSync(fd);
// Only process if size increased (append-only file)
if (stats.size > watcherInfo.lastSize || stats.mtimeMs > watcherInfo.lastMtime) {
watcherInfo.lastSize = stats.size;
watcherInfo.lastMtime = stats.mtimeMs;
// Update offset
watcherInfo.lastOffset = stats.size;
// Read only new data
if (stats.size > watcherInfo.lastOffset) {
const fd = fs.openSync(streamPath, 'r');
const buffer = Buffer.alloc(stats.size - watcherInfo.lastOffset);
fs.readSync(fd, buffer, 0, buffer.length, watcherInfo.lastOffset);
fs.closeSync(fd);
// Process new data
const newData = buffer.toString('utf8');
watcherInfo.lineBuffer += newData;
// Update offset
watcherInfo.lastOffset = stats.size;
// Process complete lines
const lines = watcherInfo.lineBuffer.split('\n');
watcherInfo.lineBuffer = lines.pop() || '';
// Process new data
const newData = buffer.toString('utf8');
watcherInfo.lineBuffer += newData;
// Process complete lines
const lines = watcherInfo.lineBuffer.split('\n');
watcherInfo.lineBuffer = lines.pop() || '';
for (const line of lines) {
if (line.trim()) {
this.broadcastLine(sessionId, line, watcherInfo);
}
for (const line of lines) {
if (line.trim()) {
this.broadcastLine(sessionId, line, watcherInfo);
}
}
}
} catch (error) {
console.error(`[STREAM] Error reading file changes:`, error);
}
} catch (error) {
console.error(`[STREAM] Error reading file changes:`, error);
}
});
}
});
watcherInfo.watcher.on('error', (error) => {
console.error(`[STREAM] File watcher error for session ${sessionId}:`, error);
});
}
watcherInfo.watcher.on('error', (error) => {
console.error(`[STREAM] File watcher error for session ${sessionId}:`, error);
});
}
/**
@ -328,9 +303,6 @@ export class StreamWatcher {
if (watcherInfo.watcher) {
watcherInfo.watcher.close();
}
if (watcherInfo.notificationListener) {
streamNotifier.removeListener('stream-update', watcherInfo.notificationListener);
}
}
this.activeWatchers.clear();
}