mirror of
https://github.com/samsonjs/vibetunnel.git
synced 2026-03-27 09:45:53 +00:00
Improve asciicast clear offset caching (#333)
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
parent
e8191181c9
commit
192fd7e941
7 changed files with 151 additions and 26 deletions
|
|
@ -432,8 +432,8 @@ export class DirectKeyboardManager {
|
|||
key: string,
|
||||
isModifier?: boolean,
|
||||
isSpecial?: boolean,
|
||||
isToggle?: boolean,
|
||||
pasteText?: string
|
||||
_isToggle?: boolean,
|
||||
_pasteText?: string
|
||||
): Promise<void> => {
|
||||
if (!this.inputManager) {
|
||||
logger.error('No input manager found');
|
||||
|
|
@ -737,7 +737,9 @@ export class DirectKeyboardManager {
|
|||
// Restore all original styles
|
||||
Object.entries(originalStyles).forEach(([key, value]) => {
|
||||
if (value !== undefined) {
|
||||
(this.hiddenInput!.style as any)[key] = value;
|
||||
if (this.hiddenInput?.style) {
|
||||
(this.hiddenInput.style as unknown as Record<string, string>)[key] = value;
|
||||
}
|
||||
}
|
||||
});
|
||||
this.hiddenInput.placeholder = '';
|
||||
|
|
|
|||
|
|
@ -196,7 +196,7 @@ export class TerminalQuickKeys extends LitElement {
|
|||
}
|
||||
}
|
||||
|
||||
private handlePasteImmediate(e: Event) {
|
||||
private handlePasteImmediate(_e: Event) {
|
||||
console.log('[QuickKeys] Paste button touched - delegating to paste handler');
|
||||
|
||||
// Always delegate to the main paste handler in direct-keyboard-manager
|
||||
|
|
|
|||
|
|
@ -246,6 +246,7 @@ export class PtyManager extends EventEmitter {
|
|||
startedAt: new Date().toISOString(),
|
||||
initialCols: cols,
|
||||
initialRows: rows,
|
||||
lastClearOffset: 0,
|
||||
version: VERSION,
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -448,7 +448,7 @@ export async function createApp(): Promise<AppInstance> {
|
|||
logger.debug('Initialized terminal manager');
|
||||
|
||||
// Initialize stream watcher for file-based streaming
|
||||
const streamWatcher = new StreamWatcher();
|
||||
const streamWatcher = new StreamWatcher(sessionManager);
|
||||
logger.debug('Initialized stream watcher');
|
||||
|
||||
// Initialize activity monitor
|
||||
|
|
|
|||
|
|
@ -1,11 +1,16 @@
|
|||
import chalk from 'chalk';
|
||||
import type { Response } from 'express';
|
||||
import * as fs from 'fs';
|
||||
import type { SessionManager } from '../pty/session-manager.js';
|
||||
import type { AsciinemaHeader } from '../pty/types.js';
|
||||
import { createLogger } from '../utils/logger.js';
|
||||
|
||||
const logger = createLogger('stream-watcher');
|
||||
|
||||
// Constants
|
||||
const HEADER_READ_BUFFER_SIZE = 4096;
|
||||
const CLEAR_SEQUENCE = '\x1b[3J';
|
||||
|
||||
interface StreamClient {
|
||||
response: Response;
|
||||
startTime: number;
|
||||
|
|
@ -39,6 +44,15 @@ function isExitEvent(event: AsciinemaEvent): event is AsciinemaExitEvent {
|
|||
return Array.isArray(event) && event[0] === 'exit';
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if an output event contains a terminal clear sequence
|
||||
* @param event - The asciinema event to check
|
||||
* @returns true if the event contains a clear sequence
|
||||
*/
|
||||
function containsClearSequence(event: AsciinemaEvent): boolean {
|
||||
return isOutputEvent(event) && event[2].includes(CLEAR_SEQUENCE);
|
||||
}
|
||||
|
||||
interface WatcherInfo {
|
||||
clients: Set<StreamClient>;
|
||||
watcher?: fs.FSWatcher;
|
||||
|
|
@ -50,8 +64,10 @@ interface WatcherInfo {
|
|||
|
||||
export class StreamWatcher {
|
||||
private activeWatchers: Map<string, WatcherInfo> = new Map();
|
||||
private sessionManager: SessionManager;
|
||||
|
||||
constructor() {
|
||||
constructor(sessionManager: SessionManager) {
|
||||
this.sessionManager = sessionManager;
|
||||
// Clean up notification listeners on exit
|
||||
process.on('beforeExit', () => {
|
||||
this.cleanup();
|
||||
|
|
@ -82,7 +98,7 @@ export class StreamWatcher {
|
|||
this.activeWatchers.set(sessionId, watcherInfo);
|
||||
|
||||
// Send existing content first
|
||||
this.sendExistingContent(streamPath, client);
|
||||
this.sendExistingContent(sessionId, streamPath, client);
|
||||
|
||||
// Get current file size and stats
|
||||
if (fs.existsSync(streamPath)) {
|
||||
|
|
@ -99,7 +115,7 @@ export class StreamWatcher {
|
|||
this.startWatching(sessionId, streamPath, watcherInfo);
|
||||
} else {
|
||||
// Send existing content to new client
|
||||
this.sendExistingContent(streamPath, client);
|
||||
this.sendExistingContent(sessionId, streamPath, client);
|
||||
}
|
||||
|
||||
// Add client to set
|
||||
|
|
@ -150,23 +166,91 @@ export class StreamWatcher {
|
|||
/**
|
||||
* Send existing content to a client
|
||||
*/
|
||||
private sendExistingContent(streamPath: string, client: StreamClient): void {
|
||||
private sendExistingContent(sessionId: string, streamPath: string, client: StreamClient): void {
|
||||
try {
|
||||
// First pass: analyze the stream to find the last clear and track resize events
|
||||
const analysisStream = fs.createReadStream(streamPath, { encoding: 'utf8' });
|
||||
// Load existing session info or use defaults, but don't save incomplete session data
|
||||
const sessionInfo = this.sessionManager.loadSessionInfo(sessionId);
|
||||
|
||||
// Validate offset to ensure we don't read beyond file size
|
||||
let startOffset = sessionInfo?.lastClearOffset ?? 0;
|
||||
if (fs.existsSync(streamPath)) {
|
||||
const stats = fs.statSync(streamPath);
|
||||
startOffset = Math.min(startOffset, stats.size);
|
||||
}
|
||||
|
||||
// Read header line separately (first line of file)
|
||||
// We need to track byte position separately from string length due to UTF-8 encoding
|
||||
let header: AsciinemaHeader | null = null;
|
||||
let fd: number | null = null;
|
||||
try {
|
||||
fd = fs.openSync(streamPath, 'r');
|
||||
const buf = Buffer.alloc(HEADER_READ_BUFFER_SIZE);
|
||||
let data = '';
|
||||
|
||||
// Important: Use filePosition (bytes) not data.length (characters) for fs.readSync
|
||||
// UTF-8 strings have character count != byte count for multi-byte characters
|
||||
let filePosition = 0; // Track actual byte position in file
|
||||
let bytesRead = fs.readSync(fd, buf, 0, buf.length, filePosition);
|
||||
|
||||
while (!data.includes('\n') && bytesRead > 0) {
|
||||
data += buf.toString('utf8', 0, bytesRead);
|
||||
|
||||
// Increment by actual bytes read, not string characters
|
||||
// This ensures correct file positioning for subsequent reads
|
||||
filePosition += bytesRead;
|
||||
|
||||
if (!data.includes('\n')) {
|
||||
// Use filePosition (byte offset) not data.length (character count)
|
||||
bytesRead = fs.readSync(fd, buf, 0, buf.length, filePosition);
|
||||
}
|
||||
}
|
||||
|
||||
const idx = data.indexOf('\n');
|
||||
if (idx !== -1) {
|
||||
header = JSON.parse(data.slice(0, idx));
|
||||
}
|
||||
} catch (e) {
|
||||
logger.debug(`failed to read asciinema header for session ${sessionId}: ${e}`);
|
||||
} finally {
|
||||
// Ensure file descriptor is always closed to prevent leaks
|
||||
// This executes even if an exception occurs during read operations
|
||||
if (fd !== null) {
|
||||
try {
|
||||
fs.closeSync(fd);
|
||||
} catch (closeError) {
|
||||
logger.debug(`failed to close file descriptor: ${closeError}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Analyze the stream starting from stored offset to find the most recent clear sequence
|
||||
// This allows us to prune old terminal content and only send what's currently visible
|
||||
const analysisStream = fs.createReadStream(streamPath, {
|
||||
encoding: 'utf8',
|
||||
start: startOffset,
|
||||
});
|
||||
let lineBuffer = '';
|
||||
const events: AsciinemaEvent[] = [];
|
||||
let lastClearIndex = -1;
|
||||
let lastResizeBeforeClear: AsciinemaResizeEvent | null = null;
|
||||
let currentResize: AsciinemaResizeEvent | null = null;
|
||||
let header: AsciinemaHeader | null = null;
|
||||
|
||||
// Track byte offset in the file for accurate position tracking
|
||||
// This is crucial for UTF-8 encoded files where character count != byte count
|
||||
let fileOffset = startOffset;
|
||||
let lastClearOffset = startOffset;
|
||||
|
||||
analysisStream.on('data', (chunk: string | Buffer) => {
|
||||
lineBuffer += chunk.toString();
|
||||
const lines = lineBuffer.split('\n');
|
||||
lineBuffer = lines.pop() || ''; // Keep incomplete line for next chunk
|
||||
let index = lineBuffer.indexOf('\n');
|
||||
while (index !== -1) {
|
||||
const line = lineBuffer.slice(0, index);
|
||||
lineBuffer = lineBuffer.slice(index + 1);
|
||||
|
||||
// Calculate byte length of the line plus newline character
|
||||
// Buffer.byteLength correctly handles multi-byte UTF-8 characters
|
||||
fileOffset += Buffer.byteLength(line, 'utf8') + 1;
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.trim()) {
|
||||
try {
|
||||
const parsed = JSON.parse(line);
|
||||
|
|
@ -185,9 +269,10 @@ export class StreamWatcher {
|
|||
}
|
||||
|
||||
// Check for clear sequence in output events
|
||||
if (isOutputEvent(event) && event[2].includes('\x1b[3J')) {
|
||||
if (containsClearSequence(event)) {
|
||||
lastClearIndex = events.length;
|
||||
lastResizeBeforeClear = currentResize;
|
||||
lastClearOffset = fileOffset;
|
||||
logger.debug(
|
||||
`found clear sequence at event index ${lastClearIndex}, current resize: ${currentResize ? currentResize[2] : 'none'}`
|
||||
);
|
||||
|
|
@ -200,6 +285,7 @@ export class StreamWatcher {
|
|||
logger.debug(`skipping invalid JSON line during analysis: ${e}`);
|
||||
}
|
||||
}
|
||||
index = lineBuffer.indexOf('\n');
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -208,6 +294,7 @@ export class StreamWatcher {
|
|||
if (lineBuffer.trim()) {
|
||||
try {
|
||||
const parsed = JSON.parse(lineBuffer);
|
||||
fileOffset += Buffer.byteLength(lineBuffer, 'utf8');
|
||||
if (Array.isArray(parsed)) {
|
||||
if (parsed[0] === 'exit') {
|
||||
events.push(parsed as AsciinemaExitEvent);
|
||||
|
|
@ -217,9 +304,10 @@ export class StreamWatcher {
|
|||
if (isResizeEvent(event)) {
|
||||
currentResize = event;
|
||||
}
|
||||
if (isOutputEvent(event) && event[2].includes('\x1b[3J')) {
|
||||
if (containsClearSequence(event)) {
|
||||
lastClearIndex = events.length;
|
||||
lastResizeBeforeClear = currentResize;
|
||||
lastClearOffset = fileOffset;
|
||||
logger.debug(
|
||||
`found clear sequence at event index ${lastClearIndex} (last event)`
|
||||
);
|
||||
|
|
@ -239,8 +327,16 @@ export class StreamWatcher {
|
|||
// Start from after the last clear
|
||||
startIndex = lastClearIndex + 1;
|
||||
logger.log(
|
||||
chalk.green(`pruning stream: skipping ${lastClearIndex + 1} events before last clear`)
|
||||
chalk.green(
|
||||
`pruning stream: skipping ${lastClearIndex + 1} events before last clear at offset ${lastClearOffset}`
|
||||
)
|
||||
);
|
||||
|
||||
// Persist new clear offset to session only if session already exists
|
||||
if (sessionInfo) {
|
||||
sessionInfo.lastClearOffset = lastClearOffset;
|
||||
this.sessionManager.saveSessionInfo(sessionId, sessionInfo);
|
||||
}
|
||||
}
|
||||
|
||||
// Send header first - update dimensions if we have a resize
|
||||
|
|
@ -283,7 +379,7 @@ export class StreamWatcher {
|
|||
analysisStream.on('error', (error) => {
|
||||
logger.error('failed to analyze stream for pruning:', error);
|
||||
// Fall back to original implementation without pruning
|
||||
this.sendExistingContentWithoutPruning(streamPath, client);
|
||||
this.sendExistingContentWithoutPruning(sessionId, streamPath, client);
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('failed to create read stream:', error);
|
||||
|
|
@ -293,7 +389,11 @@ export class StreamWatcher {
|
|||
/**
|
||||
* Original implementation without pruning (fallback)
|
||||
*/
|
||||
private sendExistingContentWithoutPruning(streamPath: string, client: StreamClient): void {
|
||||
private sendExistingContentWithoutPruning(
|
||||
_sessionId: string,
|
||||
streamPath: string,
|
||||
client: StreamClient
|
||||
): void {
|
||||
try {
|
||||
const stream = fs.createReadStream(streamPath, { encoding: 'utf8' });
|
||||
let exitEventFound = false;
|
||||
|
|
|
|||
|
|
@ -22,6 +22,11 @@ export interface SessionInfo {
|
|||
pid?: number;
|
||||
initialCols?: number;
|
||||
initialRows?: number;
|
||||
/**
|
||||
* Byte offset of the last clear event in the session stdout file.
|
||||
* Used to quickly seek to the most recent content when replaying casts.
|
||||
*/
|
||||
lastClearOffset?: number;
|
||||
version?: string; // VibeTunnel version that created this session
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import * as fs from 'fs';
|
|||
import * as os from 'os';
|
||||
import * as path from 'path';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { SessionManager } from '../../server/pty/session-manager.js';
|
||||
import type { AsciinemaHeader } from '../../server/pty/types.js';
|
||||
import { StreamWatcher } from '../../server/services/stream-watcher.js';
|
||||
import {
|
||||
|
|
@ -35,7 +36,8 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => {
|
|||
locals: {},
|
||||
};
|
||||
|
||||
streamWatcher = new StreamWatcher();
|
||||
const sessionManager = new SessionManager(tempDir);
|
||||
streamWatcher = new StreamWatcher(sessionManager);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
|
|
@ -80,7 +82,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => {
|
|||
// Use reflection to call private method
|
||||
// biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing
|
||||
const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher);
|
||||
sendExistingContent(filepath, { response: mockResponse, startTime: Date.now() / 1000 });
|
||||
sendExistingContent('session1', filepath, {
|
||||
response: mockResponse,
|
||||
startTime: Date.now() / 1000,
|
||||
});
|
||||
|
||||
// Wait for async operations to complete
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
|
@ -117,7 +122,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => {
|
|||
|
||||
// biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing
|
||||
const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher);
|
||||
sendExistingContent(filepath, { response: mockResponse, startTime: Date.now() / 1000 });
|
||||
sendExistingContent('session2', filepath, {
|
||||
response: mockResponse,
|
||||
startTime: Date.now() / 1000,
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
|
|
@ -138,7 +146,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => {
|
|||
|
||||
// biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing
|
||||
const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher);
|
||||
sendExistingContent(filepath, { response: mockResponse, startTime: Date.now() / 1000 });
|
||||
sendExistingContent('session3', filepath, {
|
||||
response: mockResponse,
|
||||
startTime: Date.now() / 1000,
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
|
|
@ -157,7 +168,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => {
|
|||
|
||||
// biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing
|
||||
const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher);
|
||||
sendExistingContent(nonExistentPath, { response: mockResponse, startTime: Date.now() / 1000 });
|
||||
sendExistingContent('session4', nonExistentPath, {
|
||||
response: mockResponse,
|
||||
startTime: Date.now() / 1000,
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
|
|
@ -171,7 +185,10 @@ describe('StreamWatcher - Asciinema Stream Pruning', () => {
|
|||
|
||||
// biome-ignore lint/suspicious/noExplicitAny: accessing private method for testing
|
||||
const sendExistingContent = (streamWatcher as any).sendExistingContent.bind(streamWatcher);
|
||||
sendExistingContent(filepath, { response: mockResponse, startTime: Date.now() / 1000 });
|
||||
sendExistingContent('session5', filepath, {
|
||||
response: mockResponse,
|
||||
startTime: Date.now() / 1000,
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue