diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 00000000..64cd343f --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,8 @@ +{ + "permissions": { + "allow": [ + "Bash(pnpm test:*)" + ], + "deny": [] + } +} \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md index e07d95b3..bf0eded2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -78,7 +78,7 @@ In the `mac/` directory: ## Key Files Quick Reference -- API Documentation: `docs/API.md` - Architecture Details: `docs/ARCHITECTURE.md` +- API Specifications: `docs/spec.md` - Server Implementation Guide: `web/spec.md` -- Build Configuration: `web/package.json`, `mac/Package.swift` \ No newline at end of file +- Build Configuration: `web/package.json`, `mac/Package.swift` diff --git a/web/src/client/components/session-view/direct-keyboard-manager.ts b/web/src/client/components/session-view/direct-keyboard-manager.ts index d5840d03..8ae408d5 100644 --- a/web/src/client/components/session-view/direct-keyboard-manager.ts +++ b/web/src/client/components/session-view/direct-keyboard-manager.ts @@ -293,14 +293,14 @@ export class DirectKeyboardManager { } if (e.key === 'Enter' && this.inputManager) { - this.inputManager.sendInputText('enter'); + this.inputManager.sendInput('enter'); } else if (e.key === 'Backspace' && this.inputManager) { // Always send backspace to terminal - this.inputManager.sendInputText('backspace'); + this.inputManager.sendInput('backspace'); } else if (e.key === 'Tab' && this.inputManager) { - this.inputManager.sendInputText(e.shiftKey ? 'shift_tab' : 'tab'); + this.inputManager.sendInput(e.shiftKey ? 'shift_tab' : 'tab'); } else if (e.key === 'Escape' && this.inputManager) { - this.inputManager.sendInputText('escape'); + this.inputManager.sendInput('escape'); } }); @@ -416,6 +416,10 @@ export class DirectKeyboardManager { } handleQuickKeyPress = (key: string, isModifier?: boolean, isSpecial?: boolean): void => { + if (!this.inputManager) { + logger.error('No input manager found'); + return; + } if (isSpecial && key === 'Done') { // Dismiss the keyboard logger.log('Done button pressed - dismissing keyboard'); @@ -457,53 +461,53 @@ export class DirectKeyboardManager { } } return; - } else if (key === 'Ctrl+A' && this.inputManager) { + } else if (key === 'Ctrl+A') { // Send Ctrl+A (start of line) - this.inputManager.sendInputText('\x01'); - } else if (key === 'Ctrl+C' && this.inputManager) { + this.inputManager.sendControlSequence('\x01'); + } else if (key === 'Ctrl+C') { // Send Ctrl+C (interrupt signal) - this.inputManager.sendInputText('\x03'); - } else if (key === 'Ctrl+D' && this.inputManager) { + this.inputManager.sendControlSequence('\x03'); + } else if (key === 'Ctrl+D') { // Send Ctrl+D (EOF) - this.inputManager.sendInputText('\x04'); - } else if (key === 'Ctrl+E' && this.inputManager) { + this.inputManager.sendControlSequence('\x04'); + } else if (key === 'Ctrl+E') { // Send Ctrl+E (end of line) - this.inputManager.sendInputText('\x05'); - } else if (key === 'Ctrl+K' && this.inputManager) { + this.inputManager.sendControlSequence('\x05'); + } else if (key === 'Ctrl+K') { // Send Ctrl+K (kill to end of line) - this.inputManager.sendInputText('\x0b'); - } else if (key === 'Ctrl+L' && this.inputManager) { + this.inputManager.sendControlSequence('\x0b'); + } else if (key === 'Ctrl+L') { // Send Ctrl+L (clear screen) - this.inputManager.sendInputText('\x0c'); - } else if (key === 'Ctrl+R' && this.inputManager) { + this.inputManager.sendControlSequence('\x0c'); + } else if (key === 'Ctrl+R') { // Send Ctrl+R (reverse search) - this.inputManager.sendInputText('\x12'); - } else if (key === 'Ctrl+U' && this.inputManager) { + this.inputManager.sendControlSequence('\x12'); + } else if (key === 'Ctrl+U') { // Send Ctrl+U (clear line) - this.inputManager.sendInputText('\x15'); - } else if (key === 'Ctrl+W' && this.inputManager) { + this.inputManager.sendControlSequence('\x15'); + } else if (key === 'Ctrl+W') { // Send Ctrl+W (delete word) - this.inputManager.sendInputText('\x17'); - } else if (key === 'Ctrl+Z' && this.inputManager) { + this.inputManager.sendControlSequence('\x17'); + } else if (key === 'Ctrl+Z') { // Send Ctrl+Z (suspend signal) - this.inputManager.sendInputText('\x1a'); - } else if (key === 'Option' && this.inputManager) { + this.inputManager.sendControlSequence('\x1a'); + } else if (key === 'Option') { // Send ESC prefix for Option/Alt key - this.inputManager.sendInputText('\x1b'); + this.inputManager.sendControlSequence('\x1b'); } else if (key === 'Command') { // Command key doesn't have a direct terminal equivalent // Could potentially show a message or ignore return; - } else if (key === 'Delete' && this.inputManager) { + } else if (key === 'Delete') { // Send delete key - this.inputManager.sendInputText('delete'); - } else if (key.startsWith('F') && this.inputManager) { + this.inputManager.sendInput('delete'); + } else if (key.startsWith('F')) { // Handle function keys F1-F12 const fNum = Number.parseInt(key.substring(1)); if (fNum >= 1 && fNum <= 12) { - this.inputManager.sendInputText(`f${fNum}`); + this.inputManager.sendInput(`f${fNum}`); } - } else if (this.inputManager) { + } else { // Map key names to proper values let keyToSend = key; if (key === 'Tab') { @@ -529,7 +533,7 @@ export class DirectKeyboardManager { } // Send the key to terminal - this.inputManager.sendInputText(keyToSend.toLowerCase()); + this.inputManager.sendInput(keyToSend.toLowerCase()); } // Always keep focus on hidden input after any key press (except Done) diff --git a/web/src/client/components/session-view/input-manager.ts b/web/src/client/components/session-view/input-manager.ts index e951106f..d93e205f 100644 --- a/web/src/client/components/session-view/input-manager.ts +++ b/web/src/client/components/session-view/input-manager.ts @@ -6,6 +6,7 @@ */ import { authClient } from '../../services/auth-client.js'; +import { websocketInputClient } from '../../services/websocket-input-client.js'; import { createLogger } from '../../utils/logger.js'; import type { Session } from '../session-list.js'; @@ -18,9 +19,27 @@ export interface InputManagerCallbacks { export class InputManager { private session: Session | null = null; private callbacks: InputManagerCallbacks | null = null; + private useWebSocketInput = true; // Feature flag for WebSocket input setSession(session: Session | null): void { this.session = session; + + // Check URL parameter for WebSocket input feature flag + const urlParams = new URLSearchParams(window.location.search); + const socketInputParam = urlParams.get('socket_input'); + if (socketInputParam !== null) { + this.useWebSocketInput = socketInputParam === 'true'; + logger.log( + `WebSocket input ${this.useWebSocketInput ? 'enabled' : 'disabled'} via URL parameter` + ); + } + + // Connect to WebSocket when session is set (if feature enabled) + if (session && this.useWebSocketInput) { + websocketInputClient.connect(session).catch((error) => { + logger.debug('WebSocket connection failed, will use HTTP fallback:', error); + }); + } } setCallbacks(callbacks: InputManagerCallbacks): void { @@ -122,118 +141,32 @@ export class InputManager { await this.sendInput(inputText); } - async sendInputText(text: string): Promise { + private async sendInputInternal( + input: { text?: string; key?: string }, + errorContext: string + ): Promise { if (!this.session) return; try { - // Determine if we should send as key or text - const body = [ - 'enter', - 'escape', - 'backspace', - 'tab', - 'shift_tab', - 'arrow_up', - 'arrow_down', - 'arrow_left', - 'arrow_right', - 'ctrl_enter', - 'shift_enter', - 'page_up', - 'page_down', - 'home', - 'end', - 'delete', - 'f1', - 'f2', - 'f3', - 'f4', - 'f5', - 'f6', - 'f7', - 'f8', - 'f9', - 'f10', - 'f11', - 'f12', - ].includes(text) - ? { key: text } - : { text }; + // Try WebSocket first if feature enabled - non-blocking (connection should already be established) + if (this.useWebSocketInput) { + const sentViaWebSocket = websocketInputClient.sendInput(input); - const response = await fetch(`/api/sessions/${this.session.id}/input`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - ...authClient.getAuthHeader(), - }, - body: JSON.stringify(body), - }); - - if (!response.ok) { - logger.error('failed to send input to session', { status: response.status }); - - // Check if session has exited (400 response) - if (response.status === 400) { - // Update session status to exited - if (this.session) { - this.session.status = 'exited'; - // Trigger UI update through callbacks - if (this.callbacks) { - this.callbacks.requestUpdate(); - } - } + if (sentViaWebSocket) { + // Successfully sent via WebSocket, no need for HTTP fallback + return; } } - } catch (error) { - logger.error('error sending input', error); - } - } - - private async sendInput(inputText: string): Promise { - try { - // Determine if we should send as key or text - const body = [ - 'enter', - 'escape', - 'backspace', - 'tab', - 'shift_tab', - 'arrow_up', - 'arrow_down', - 'arrow_left', - 'arrow_right', - 'ctrl_enter', - 'shift_enter', - 'page_up', - 'page_down', - 'home', - 'end', - 'delete', - 'f1', - 'f2', - 'f3', - 'f4', - 'f5', - 'f6', - 'f7', - 'f8', - 'f9', - 'f10', - 'f11', - 'f12', - ].includes(inputText) - ? { key: inputText } - : { text: inputText }; - - if (!this.session) return; + // Fallback to HTTP if WebSocket failed + logger.debug('WebSocket unavailable, falling back to HTTP'); const response = await fetch(`/api/sessions/${this.session.id}/input`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...authClient.getAuthHeader(), }, - body: JSON.stringify(body), + body: JSON.stringify(input), }); if (!response.ok) { @@ -248,14 +181,63 @@ export class InputManager { } } } else { - logger.error('failed to send input to session', { status: response.status }); + logger.error(`failed to ${errorContext}`, { status: response.status }); } } } catch (error) { - logger.error('error sending input', error); + logger.error(`error ${errorContext}`, error); } } + async sendInputText(text: string): Promise { + // sendInputText is used for pasted content - always treat as literal text + // Never interpret pasted text as special keys to avoid ambiguity + await this.sendInputInternal({ text }, 'send input to session'); + } + + async sendControlSequence(controlChar: string): Promise { + // sendControlSequence is for control characters - always send as literal text + // Control characters like '\x12' (Ctrl+R) should be sent directly + await this.sendInputInternal({ text: controlChar }, 'send control sequence to session'); + } + + async sendInput(inputText: string): Promise { + // Determine if we should send as key or text + const specialKeys = [ + 'enter', + 'escape', + 'backspace', + 'tab', + 'shift_tab', + 'arrow_up', + 'arrow_down', + 'arrow_left', + 'arrow_right', + 'ctrl_enter', + 'shift_enter', + 'page_up', + 'page_down', + 'home', + 'end', + 'delete', + 'f1', + 'f2', + 'f3', + 'f4', + 'f5', + 'f6', + 'f7', + 'f8', + 'f9', + 'f10', + 'f11', + 'f12', + ]; + + const input = specialKeys.includes(inputText) ? { key: inputText } : { text: inputText }; + await this.sendInputInternal(input, 'send input to session'); + } + isKeyboardShortcut(e: KeyboardEvent): boolean { // Check if we're typing in an input field or editor const target = e.target as HTMLElement; @@ -314,6 +296,11 @@ export class InputManager { } cleanup(): void { + // Disconnect WebSocket if feature was enabled + if (this.useWebSocketInput) { + websocketInputClient.disconnect(); + } + // Clear references to prevent memory leaks this.session = null; this.callbacks = null; diff --git a/web/src/client/components/session-view/mobile-input-manager.ts b/web/src/client/components/session-view/mobile-input-manager.ts index 73ee95d0..e2f8c34a 100644 --- a/web/src/client/components/session-view/mobile-input-manager.ts +++ b/web/src/client/components/session-view/mobile-input-manager.ts @@ -96,7 +96,8 @@ export class MobileInputManager { // Add enter key at the end to execute the command if (this.inputManager) { await this.inputManager.sendInputText(textToSend); - await this.inputManager.sendInputText('enter'); + // Use sendInput (not sendInputText) for special keys like 'enter' + await this.inputManager.sendInput('enter'); } // Clear the reactive property diff --git a/web/src/client/services/websocket-input-client.ts b/web/src/client/services/websocket-input-client.ts new file mode 100644 index 00000000..9ba04c84 --- /dev/null +++ b/web/src/client/services/websocket-input-client.ts @@ -0,0 +1,214 @@ +/** + * WebSocket Input Client for VibeTunnel + * + * Provides low-latency input transmission via WebSocket connection + * with automatic reconnection and fallback to HTTP. + * + * Optimized for absolute minimal latency: + * - Fire-and-forget input (no ACKs) + * - Raw text transmission (no JSON overhead) + * - Single persistent connection per session + */ + +import type { Session } from '../../shared/types.js'; +import { createLogger } from '../utils/logger.js'; + +const logger = createLogger('websocket-input-client'); + +export class WebSocketInputClient { + private ws: WebSocket | null = null; + private session: Session | null = null; + private reconnectTimeout: NodeJS.Timeout | null = null; + private connectionPromise: Promise | null = null; + private isConnecting = false; + + // Configuration + private readonly RECONNECT_DELAY = 1000; + private readonly MAX_RECONNECT_DELAY = 5000; + + constructor() { + this.cleanup = this.cleanup.bind(this); + window.addEventListener('beforeunload', this.cleanup); + } + + /** + * Connect to WebSocket server for a session + */ + async connect(session: Session): Promise { + // If already connected to this session and WebSocket is open, no-op + if (this.session?.id === session.id && this.ws?.readyState === WebSocket.OPEN) { + logger.debug(`Already connected to session ${session.id}`); + return; + } + + // If we're connecting to a different session, disconnect first + if (this.session?.id !== session.id) { + logger.debug(`Switching from session ${this.session?.id} to ${session.id}`); + this.disconnect(); + } + + this.session = session; + logger.debug(`Connecting to WebSocket for session ${session.id}`); + + // If currently connecting to this session, wait for it + if (this.connectionPromise) { + return this.connectionPromise; + } + + this.connectionPromise = this.establishConnection(); + try { + await this.connectionPromise; + } finally { + this.connectionPromise = null; + } + } + + private async establishConnection(): Promise { + if (!this.session) { + throw new Error('No session provided'); + } + + this.isConnecting = true; + + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const host = window.location.host; + const sessionId = this.session.id; + + // Get auth token from localStorage or use a development token + const authToken = + localStorage.getItem('vibetunnel_auth_token') || + localStorage.getItem('auth_token') || + `dev-token-${Date.now()}`; + + const wsUrl = `${protocol}//${host}/ws/input?sessionId=${sessionId}&token=${encodeURIComponent(authToken)}`; + + try { + logger.log(`Connecting to WebSocket: ${wsUrl}`); + this.ws = new WebSocket(wsUrl); + + this.ws.onopen = () => { + logger.log('WebSocket connected successfully'); + this.isConnecting = false; + }; + + this.ws.onclose = (event) => { + logger.log(`WebSocket closed: code=${event.code}, reason=${event.reason}`); + this.isConnecting = false; + this.ws = null; + this.scheduleReconnect(); + }; + + this.ws.onerror = (error) => { + logger.error('WebSocket error:', error); + this.isConnecting = false; + }; + + // Wait for connection to establish + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error('WebSocket connection timeout')); + }, 5000); + + this.ws?.addEventListener('open', () => { + clearTimeout(timeout); + resolve(); + }); + + this.ws?.addEventListener('error', () => { + clearTimeout(timeout); + reject(new Error('WebSocket connection failed')); + }); + }); + } catch (error) { + logger.error('Failed to establish WebSocket connection:', error); + this.isConnecting = false; + throw error; + } + } + + /** + * Send input via WebSocket - fire and forget for minimal latency + * Returns true if sent via WebSocket, false if should fallback to HTTP + */ + sendInput(input: { text?: string; key?: string }): boolean { + if (!this.session || !this.ws || this.ws.readyState !== WebSocket.OPEN) { + return false; // Fallback to HTTP + } + + try { + // Ultra-minimal: send raw input with special key markers + let rawInput: string; + + if (input.key) { + // Special keys: wrap in null bytes to distinguish from literal text + rawInput = `\x00${input.key}\x00`; + logger.debug(`Sending special key: "${input.key}" as: ${JSON.stringify(rawInput)}`); + } else if (input.text) { + // Regular text: send as-is + rawInput = input.text; + logger.debug(`Sending text: ${JSON.stringify(rawInput)}`); + } else { + return false; + } + + this.ws.send(rawInput); + logger.debug('Sent raw input via WebSocket:', JSON.stringify(rawInput)); + return true; + } catch (error) { + logger.error('Failed to send via WebSocket:', error); + return false; // Fallback to HTTP + } + } + + private scheduleReconnect(): void { + if (this.reconnectTimeout) { + return; // Already scheduled + } + + const delay = Math.min(this.RECONNECT_DELAY * 2, this.MAX_RECONNECT_DELAY); + + logger.log(`Scheduling reconnect in ${delay}ms`); + + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null; + if (this.session) { + this.connect(this.session).catch((error) => { + logger.error('Reconnection failed:', error); + }); + } + }, delay); + } + + /** + * Check if WebSocket is connected and ready + */ + isConnected(): boolean { + return this.ws?.readyState === WebSocket.OPEN; + } + + /** + * Disconnect and cleanup + */ + disconnect(): void { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } + + if (this.ws) { + this.ws.close(); + this.ws = null; + } + + this.session = null; + this.isConnecting = false; + } + + private cleanup(): void { + this.disconnect(); + window.removeEventListener('beforeunload', this.cleanup); + } +} + +// Singleton instance +export const websocketInputClient = new WebSocketInputClient(); diff --git a/web/src/server/pty/pty-manager.ts b/web/src/server/pty/pty-manager.ts index 0b8730f6..6a9748ee 100644 --- a/web/src/server/pty/pty-manager.ts +++ b/web/src/server/pty/pty-manager.ts @@ -638,8 +638,14 @@ export class PtyManager extends EventEmitter { let dataToSend = ''; if (input.text !== undefined) { dataToSend = input.text; + logger.debug( + `Received text input: ${JSON.stringify(input.text)} -> sending: ${JSON.stringify(dataToSend)}` + ); } else if (input.key !== undefined) { dataToSend = this.convertSpecialKey(input.key); + logger.debug( + `Received special key: "${input.key}" -> converted to: ${JSON.stringify(dataToSend)}` + ); } else { throw new PtyError('No text or key specified in input', 'INVALID_INPUT'); } diff --git a/web/src/server/routes/websocket-input.ts b/web/src/server/routes/websocket-input.ts new file mode 100644 index 00000000..6d7ef906 --- /dev/null +++ b/web/src/server/routes/websocket-input.ts @@ -0,0 +1,198 @@ +/** + * WebSocket Input Handler for VibeTunnel + * + * Handles WebSocket connections for low-latency input transmission. + * Optimized for speed: + * - Fire-and-forget input (no ACKs) + * - Minimal message parsing + * - Direct PTY forwarding + */ + +import type { WebSocket as WSWebSocket } from 'ws'; +import type { SessionInput, SpecialKey } from '../../shared/types.js'; +import type { PtyManager } from '../pty/index.js'; +import type { ActivityMonitor } from '../services/activity-monitor.js'; +import type { AuthService } from '../services/auth-service.js'; +import type { RemoteRegistry } from '../services/remote-registry.js'; +import type { TerminalManager } from '../services/terminal-manager.js'; +import { createLogger } from '../utils/logger.js'; + +const logger = createLogger('websocket-input'); + +interface WebSocketInputHandlerOptions { + ptyManager: PtyManager; + terminalManager: TerminalManager; + activityMonitor: ActivityMonitor; + remoteRegistry: RemoteRegistry | null; + authService: AuthService; + isHQMode: boolean; +} + +export class WebSocketInputHandler { + private ptyManager: PtyManager; + private terminalManager: TerminalManager; + private activityMonitor: ActivityMonitor; + private remoteRegistry: RemoteRegistry | null; + private authService: AuthService; + private isHQMode: boolean; + private remoteConnections: Map = new Map(); + + constructor(options: WebSocketInputHandlerOptions) { + this.ptyManager = options.ptyManager; + this.terminalManager = options.terminalManager; + this.activityMonitor = options.activityMonitor; + this.remoteRegistry = options.remoteRegistry; + this.authService = options.authService; + this.isHQMode = options.isHQMode; + } + + private async connectToRemote( + remoteUrl: string, + sessionId: string, + token: string + ): Promise { + const wsUrl = remoteUrl.replace(/^https?:/, (match) => (match === 'https:' ? 'wss:' : 'ws:')); + const fullUrl = `${wsUrl}/ws/input?sessionId=${sessionId}&token=${encodeURIComponent(token)}`; + + logger.log(`Establishing proxy connection to remote: ${fullUrl}`); + + const remoteWs = new WebSocket(fullUrl); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + remoteWs.close(); + reject(new Error('Remote WebSocket connection timeout')); + }, 5000); + + remoteWs.addEventListener('open', () => { + clearTimeout(timeout); + logger.log(`Remote WebSocket proxy established for session ${sessionId}`); + resolve(remoteWs); + }); + + remoteWs.addEventListener('error', (error) => { + clearTimeout(timeout); + logger.error(`Remote WebSocket error for session ${sessionId}:`, error); + reject(error); + }); + }); + } + + async handleConnection(ws: WSWebSocket, sessionId: string, userId: string): Promise { + logger.log(`WebSocket input connection established for session ${sessionId}, user ${userId}`); + + // Check if this is a remote session in HQ mode + let remoteWs: WebSocket | null = null; + if (this.isHQMode && this.remoteRegistry) { + const remote = this.remoteRegistry.getRemoteBySessionId(sessionId); + if (remote) { + logger.log( + `Session ${sessionId} is on remote ${remote.name}, establishing proxy connection` + ); + + try { + remoteWs = await this.connectToRemote(remote.url, sessionId, remote.token); + this.remoteConnections.set(sessionId, remoteWs); + + // Set up remote connection error handling + remoteWs.addEventListener('close', () => { + logger.log(`Remote WebSocket closed for session ${sessionId}`); + this.remoteConnections.delete(sessionId); + ws.close(); // Close client connection when remote closes + }); + + remoteWs.addEventListener('error', (error) => { + logger.error(`Remote WebSocket error for session ${sessionId}:`, error); + this.remoteConnections.delete(sessionId); + ws.close(); // Close client connection on remote error + }); + } catch (error) { + logger.error( + `Failed to establish proxy connection to remote for session ${sessionId}:`, + error + ); + ws.close(); + return; + } + } + } + + ws.on('message', (data) => { + try { + // If we have a remote connection, just forward the raw data + if (remoteWs && remoteWs.readyState === WebSocket.OPEN) { + // Convert ws library's RawData to something native WebSocket can send + if (data instanceof Buffer) { + remoteWs.send(data); + } else if (Array.isArray(data)) { + // Concatenate buffer array + remoteWs.send(Buffer.concat(data)); + } else { + // ArrayBuffer or other types + remoteWs.send(data); + } + return; + } + + // Otherwise, handle local session + // Ultra-minimal: expect raw text input directly + const inputReceived = data.toString(); + + if (!inputReceived) { + return; // Ignore empty messages + } + + // Parse input with special key marker detection + // Special keys are wrapped in null bytes: "\x00enter\x00" + // Regular text (including literal "enter") is sent as-is + try { + let input: SessionInput; + + // Debug logging to see what we're receiving + logger.debug( + `Raw WebSocket input: ${JSON.stringify(inputReceived)} (length: ${inputReceived.length})` + ); + + if ( + inputReceived.startsWith('\x00') && + inputReceived.endsWith('\x00') && + inputReceived.length > 2 + ) { + // Special key wrapped in null bytes + const keyName = inputReceived.slice(1, -1); // Remove null byte markers + logger.debug(`Detected special key: "${keyName}"`); + input = { key: keyName as SpecialKey }; + logger.debug(`Mapped to special key: ${JSON.stringify(input)}`); + } else { + // Regular text (including literal words like "enter", "escape", etc.) + input = { text: inputReceived }; + logger.debug(`Regular text input: ${JSON.stringify(input)}`); + } + + logger.debug(`Sending to PTY manager: ${JSON.stringify(input)}`); + this.ptyManager.sendInput(sessionId, input); + } catch (error) { + logger.warn(`Failed to send input to session ${sessionId}:`, error); + // Don't close connection on input errors, just log + } + } catch (error) { + logger.error('Error processing WebSocket input message:', error); + // Don't close connection on errors, just ignore + } + }); + + ws.on('close', () => { + logger.log(`WebSocket input connection closed for session ${sessionId}`); + + // Clean up remote connection if exists + if (remoteWs) { + remoteWs.close(); + this.remoteConnections.delete(sessionId); + } + }); + + ws.on('error', (error) => { + logger.error(`WebSocket input error for session ${sessionId}:`, error); + }); + } +} diff --git a/web/src/server/server.ts b/web/src/server/server.ts index 02d3ad82..1e5a8201 100644 --- a/web/src/server/server.ts +++ b/web/src/server/server.ts @@ -2,6 +2,7 @@ import chalk from 'chalk'; import type { Response as ExpressResponse } from 'express'; import express from 'express'; import * as fs from 'fs'; +import type * as http from 'http'; import { createServer } from 'http'; import * as os from 'os'; import * as path from 'path'; @@ -16,6 +17,7 @@ import { createLogRoutes } from './routes/logs.js'; import { createPushRoutes } from './routes/push.js'; import { createRemoteRoutes } from './routes/remotes.js'; import { createSessionRoutes } from './routes/sessions.js'; +import { WebSocketInputHandler } from './routes/websocket-input.js'; import { ActivityMonitor } from './services/activity-monitor.js'; import { AuthService } from './services/auth-service.js'; import { BellEventHandler } from './services/bell-event-handler.js'; @@ -30,6 +32,14 @@ import { closeLogger, createLogger, initLogger, setDebugMode } from './utils/log import { VapidManager } from './utils/vapid-manager.js'; import { getVersionInfo, printVersionBanner } from './version.js'; +// Extended WebSocket request with authentication and routing info +interface WebSocketRequest extends http.IncomingMessage { + pathname?: string; + searchParams?: URLSearchParams; + userId?: string; + authMethod?: string; +} + const logger = createLogger('server'); // Global shutdown state management @@ -443,6 +453,10 @@ export async function createApp(): Promise { logger.debug(`Generated bearer token for remote server: ${config.remoteName}`); } + // Initialize authentication service + const authService = new AuthService(); + logger.debug('Initialized authentication service'); + // Initialize buffer aggregator bufferAggregator = new BufferAggregator({ terminalManager, @@ -451,9 +465,16 @@ export async function createApp(): Promise { }); logger.debug('Initialized buffer aggregator'); - // Initialize authentication service - const authService = new AuthService(); - logger.debug('Initialized authentication service'); + // Initialize WebSocket input handler + const websocketInputHandler = new WebSocketInputHandler({ + ptyManager, + terminalManager, + activityMonitor, + remoteRegistry, + authService, + isHQMode: config.isHQMode, + }); + logger.debug('Initialized WebSocket input handler'); // Set up authentication const authMiddleware = createAuthMiddleware({ @@ -565,18 +586,26 @@ export async function createApp(): Promise { // Parse the URL to extract path and query parameters const parsedUrl = new URL(request.url || '', `http://${request.headers.host || 'localhost'}`); - // Only handle /buffers path - if (parsedUrl.pathname !== '/buffers') { + // Handle both /buffers and /ws/input paths + if (parsedUrl.pathname !== '/buffers' && parsedUrl.pathname !== '/ws/input') { socket.write('HTTP/1.1 404 Not Found\r\n\r\n'); socket.destroy(); return; } - // Check authentication - const isAuthenticated = await new Promise((resolve) => { + // Check authentication and capture user info + const authResult = await new Promise<{ + authenticated: boolean; + userId?: string; + authMethod?: string; + }>((resolve) => { // Track if promise has been resolved to prevent multiple resolutions let resolved = false; - const safeResolve = (value: boolean) => { + const safeResolve = (value: { + authenticated: boolean; + userId?: string; + authMethod?: string; + }) => { if (!resolved) { resolved = true; resolve(value); @@ -616,7 +645,7 @@ export async function createApp(): Promise { // Only consider it a failure if it's an error status code if (code >= 400) { authFailed = true; - safeResolve(false); + safeResolve({ authenticated: false }); } return { json: () => {}, @@ -632,13 +661,18 @@ export async function createApp(): Promise { const next = (error?: unknown) => { // Authentication succeeds if next() is called without error and no auth failure was recorded - safeResolve(!error && !authFailed); + const authenticated = !error && !authFailed; + safeResolve({ + authenticated, + userId: req.userId, + authMethod: req.authMethod, + }); }; // Add a timeout to prevent indefinite hanging const timeoutId = setTimeout(() => { logger.error('WebSocket auth timeout - auth middleware did not complete in time'); - safeResolve(false); + safeResolve({ authenticated: false }); }, 5000); // 5 second timeout // Call authMiddleware and handle potential async errors @@ -649,11 +683,11 @@ export async function createApp(): Promise { .catch((error) => { clearTimeout(timeoutId); logger.error('Auth middleware error:', error); - safeResolve(false); + safeResolve({ authenticated: false }); }); }); - if (!isAuthenticated) { + if (!authResult.authenticated) { logger.debug('WebSocket connection rejected: unauthorized'); socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); @@ -662,16 +696,46 @@ export async function createApp(): Promise { // Handle the upgrade wss.handleUpgrade(request, socket, head, (ws) => { - wss.emit('connection', ws, request); + // Add path and auth information to the request for routing + const wsRequest = request as WebSocketRequest; + wsRequest.pathname = parsedUrl.pathname; + wsRequest.searchParams = parsedUrl.searchParams; + wsRequest.userId = authResult.userId; + wsRequest.authMethod = authResult.authMethod; + wss.emit('connection', ws, wsRequest); }); }); - // WebSocket endpoint for buffer updates - wss.on('connection', (ws, _req) => { - if (bufferAggregator) { - bufferAggregator.handleClientConnection(ws); + // WebSocket connection router + wss.on('connection', (ws, req) => { + const wsReq = req as WebSocketRequest; + const pathname = wsReq.pathname; + const searchParams = wsReq.searchParams; + + if (pathname === '/buffers') { + // Handle buffer updates WebSocket + if (bufferAggregator) { + bufferAggregator.handleClientConnection(ws); + } else { + logger.error('BufferAggregator not initialized for WebSocket connection'); + ws.close(); + } + } else if (pathname === '/ws/input') { + // Handle input WebSocket + const sessionId = searchParams?.get('sessionId'); + + if (!sessionId) { + logger.error('WebSocket input connection missing sessionId parameter'); + ws.close(); + return; + } + + // Extract user ID from the authenticated request + const userId = wsReq.userId || 'unknown'; + + websocketInputHandler.handleConnection(ws, sessionId, userId); } else { - logger.error('BufferAggregator not initialized for WebSocket connection'); + logger.error(`Unknown WebSocket path: ${pathname}`); ws.close(); } }); @@ -852,8 +916,8 @@ export async function startVibeTunnelServer() { config, } = appInstance; - // Update debug mode based on config - if (config.debug) { + // Update debug mode based on config or environment variable + if (config.debug || process.env.DEBUG === 'true') { setDebugMode(true); logger.log(chalk.gray('Debug logging enabled')); } diff --git a/web/src/test/unit/websocket-input-handler.test.ts b/web/src/test/unit/websocket-input-handler.test.ts new file mode 100644 index 00000000..7626b3e0 --- /dev/null +++ b/web/src/test/unit/websocket-input-handler.test.ts @@ -0,0 +1,589 @@ +/** + * Tests for WebSocket Input Handler + * + * This tests the low-latency WebSocket input protocol for VibeTunnel, + * focusing on special key handling, text input, and HQ mode support. + */ + +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { WebSocket as WSWebSocket } from 'ws'; +import type { PtyManager } from '../../server/pty/pty-manager'; +import { WebSocketInputHandler } from '../../server/routes/websocket-input'; +import type { ActivityMonitor } from '../../server/services/activity-monitor'; +import type { AuthService } from '../../server/services/auth-service'; +import type { RemoteRegistry } from '../../server/services/remote-registry'; +import type { TerminalManager } from '../../server/services/terminal-manager'; +import type { SpecialKey } from '../../shared/types'; + +// Type definitions for mock objects +type MockEventListener = (...args: unknown[]) => void; + +// Mock WebSocket +const mockWebSocket = () => { + const listeners: Record = {}; + const ws = { + readyState: 1, // OPEN + on: vi.fn((event: string, listener: MockEventListener) => { + if (!listeners[event]) listeners[event] = []; + listeners[event].push(listener); + }), + off: vi.fn(), + close: vi.fn(), + send: vi.fn(), + // Helper to emit events + emit: (event: string, ...args: unknown[]) => { + if (listeners[event]) { + listeners[event].forEach((listener) => listener(...args)); + } + }, + }; + return ws as unknown as WSWebSocket; +}; + +// Mock remote WebSocket for HQ mode testing +const mockRemoteWebSocket = () => { + const listeners: Record = {}; + const remoteWs = { + readyState: 1, // OPEN + addEventListener: vi.fn((event: string, listener: MockEventListener) => { + if (!listeners[event]) listeners[event] = []; + listeners[event].push(listener); + }), + close: vi.fn(), + send: vi.fn(), + // Helper to emit events + emit: (event: string, ...args: unknown[]) => { + if (listeners[event]) { + listeners[event].forEach((listener) => listener(...args)); + } + }, + }; + + // Mock global WebSocket constructor for remote connections + global.WebSocket = vi.fn(() => { + // Immediately emit 'open' event to simulate successful connection + setTimeout(() => { + remoteWs.emit('open'); + }, 0); + return remoteWs; + }) as unknown as new ( + url: string + ) => WebSocket; + + // Mock WebSocket constants + (global.WebSocket as unknown as { OPEN: number }).OPEN = 1; + + return remoteWs; +}; + +describe('WebSocketInputHandler', () => { + let handler: WebSocketInputHandler; + let mockPtyManager: PtyManager; + let mockTerminalManager: TerminalManager; + let mockActivityMonitor: ActivityMonitor; + let mockAuthService: AuthService; + let mockRemoteRegistry: RemoteRegistry | null; + + beforeAll(() => { + // Create mocks + mockPtyManager = { + sendInput: vi.fn(), + } as unknown as PtyManager; + + mockTerminalManager = {} as unknown as TerminalManager; + mockActivityMonitor = {} as unknown as ActivityMonitor; + mockAuthService = {} as unknown as AuthService; + }); + + beforeEach(() => { + vi.clearAllMocks(); + mockRemoteRegistry = null; + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + afterAll(() => { + vi.restoreAllMocks(); + }); + + describe('Local Session Input Handling', () => { + beforeEach(() => { + handler = new WebSocketInputHandler({ + ptyManager: mockPtyManager, + terminalManager: mockTerminalManager, + activityMonitor: mockActivityMonitor, + remoteRegistry: mockRemoteRegistry, + authService: mockAuthService, + isHQMode: false, + }); + }); + + it('should handle regular text input', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-1'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Send regular text + const inputText = 'hello world'; + ws.emit('message', Buffer.from(inputText)); + + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + text: inputText, + }); + }); + + it('should handle text containing key names without special treatment', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-2'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Text that contains key names but isn't wrapped in null bytes + const inputText = 'i enter the world and press escape to exit'; + ws.emit('message', Buffer.from(inputText)); + + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + text: inputText, + }); + }); + + it('should handle special keys wrapped in null bytes', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-3'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Test various special keys + const specialKeys: SpecialKey[] = [ + 'enter', + 'escape', + 'backspace', + 'tab', + 'arrow_up', + 'arrow_down', + 'arrow_left', + 'arrow_right', + 'ctrl_enter', + 'shift_enter', + 'page_up', + 'page_down', + 'home', + 'end', + 'delete', + 'f1', + 'f2', + 'f3', + 'f4', + 'f5', + 'f6', + 'f7', + 'f8', + 'f9', + 'f10', + 'f11', + 'f12', + ]; + + for (const key of specialKeys) { + const wrappedKey = `\x00${key}\x00`; + ws.emit('message', Buffer.from(wrappedKey)); + + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + key: key, + }); + } + + expect(mockPtyManager.sendInput).toHaveBeenCalledTimes(specialKeys.length); + }); + + it('should treat unknown special keys as text', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-4'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Unknown key wrapped in null bytes should be treated as text + const unknownKey = '\x00unknown_key\x00'; + ws.emit('message', Buffer.from(unknownKey)); + + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + key: 'unknown_key', + }); + }); + + it('should handle malformed special key markers', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-5'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Test malformed markers + const malformedInputs = [ + '\x00enter', // Missing closing null byte + 'escape\x00', // Missing opening null byte + '\x00\x00', // Empty key name + '\x00', // Just opening null byte + '\x00enter\x00extra', // Extra content after + ]; + + for (const input of malformedInputs) { + ws.emit('message', Buffer.from(input)); + + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + text: input, + }); + } + }); + + it('should ignore empty messages', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-6'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Send empty message + ws.emit('message', Buffer.from('')); + + expect(mockPtyManager.sendInput).not.toHaveBeenCalled(); + }); + + it('should handle rapid input without losing messages', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-7'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Send multiple rapid inputs + const inputs = ['a', 'b', 'c', '\x00enter\x00', 'hello', '\x00escape\x00']; + + for (const input of inputs) { + ws.emit('message', Buffer.from(input)); + } + + expect(mockPtyManager.sendInput).toHaveBeenCalledTimes(6); + expect(mockPtyManager.sendInput).toHaveBeenNthCalledWith(1, sessionId, { text: 'a' }); + expect(mockPtyManager.sendInput).toHaveBeenNthCalledWith(2, sessionId, { text: 'b' }); + expect(mockPtyManager.sendInput).toHaveBeenNthCalledWith(3, sessionId, { text: 'c' }); + expect(mockPtyManager.sendInput).toHaveBeenNthCalledWith(4, sessionId, { key: 'enter' }); + expect(mockPtyManager.sendInput).toHaveBeenNthCalledWith(5, sessionId, { text: 'hello' }); + expect(mockPtyManager.sendInput).toHaveBeenNthCalledWith(6, sessionId, { key: 'escape' }); + }); + + it('should handle binary data gracefully', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-8'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Send binary data that doesn't match special key pattern + const binaryData = Buffer.from([0xff, 0xfe, 0xfd]); + ws.emit('message', binaryData); + + // Should be converted to string and treated as text + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + text: binaryData.toString(), + }); + }); + }); + + describe('HQ Mode Remote Session Handling', () => { + let mockRemoteRegistry: RemoteRegistry; + let mockRemoteWs: ReturnType; + + beforeEach(() => { + mockRemoteWs = mockRemoteWebSocket(); + + mockRemoteRegistry = { + getRemoteBySessionId: vi.fn(), + } as unknown as RemoteRegistry; + + handler = new WebSocketInputHandler({ + ptyManager: mockPtyManager, + terminalManager: mockTerminalManager, + activityMonitor: mockActivityMonitor, + remoteRegistry: mockRemoteRegistry, + authService: mockAuthService, + isHQMode: true, + }); + }); + + it('should proxy raw WebSocket data to remote for remote sessions', async () => { + const ws = mockWebSocket(); + const sessionId = 'remote-session-1'; + const userId = 'test-user'; + + // Mock remote registration + const mockRemote = { + name: 'remote-server', + url: 'https://remote.example.com', + token: 'remote-token', + }; + vi.mocked(mockRemoteRegistry.getRemoteBySessionId).mockReturnValue(mockRemote); + + await handler.handleConnection(ws, sessionId, userId); + + // Send input that should be proxied + const inputData = Buffer.from('test input'); + ws.emit('message', inputData); + + // Should forward to remote WebSocket, not to ptyManager + expect(mockRemoteWs.send).toHaveBeenCalledWith(inputData); + expect(mockPtyManager.sendInput).not.toHaveBeenCalled(); + }); + + it('should handle local sessions normally in HQ mode', async () => { + const ws = mockWebSocket(); + const sessionId = 'local-session-1'; + const userId = 'test-user'; + + // Mock no remote registration (local session) + vi.mocked(mockRemoteRegistry.getRemoteBySessionId).mockReturnValue(null); + + await handler.handleConnection(ws, sessionId, userId); + + // Send regular input + const inputText = 'local input'; + ws.emit('message', Buffer.from(inputText)); + + // Should use ptyManager for local sessions + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + text: inputText, + }); + expect(mockRemoteWs.send).not.toHaveBeenCalled(); + }); + + it('should handle remote connection failures gracefully', async () => { + const ws = mockWebSocket(); + const sessionId = 'remote-session-error'; + const userId = 'test-user'; + + const mockRemote = { + name: 'remote-server', + url: 'https://remote.example.com', + token: 'remote-token', + }; + vi.mocked(mockRemoteRegistry.getRemoteBySessionId).mockReturnValue(mockRemote); + + // Override the global WebSocket mock to emit error instead of open + global.WebSocket = vi.fn(() => { + setTimeout(() => { + mockRemoteWs.emit('error', new Error('Connection failed')); + }, 0); + return mockRemoteWs; + }) as unknown as new ( + url: string + ) => WebSocket; + + await handler.handleConnection(ws, sessionId, userId); + + // Should close the client connection + expect(ws.close).toHaveBeenCalled(); + }); + + it('should close client connection when remote connection closes', async () => { + const ws = mockWebSocket(); + const sessionId = 'remote-session-close'; + const userId = 'test-user'; + + const mockRemote = { + name: 'remote-server', + url: 'https://remote.example.com', + token: 'remote-token', + }; + vi.mocked(mockRemoteRegistry.getRemoteBySessionId).mockReturnValue(mockRemote); + + await handler.handleConnection(ws, sessionId, userId); + + // Simulate remote close + mockRemoteWs.emit('close'); + + expect(ws.close).toHaveBeenCalled(); + }); + + it('should handle different buffer types for remote forwarding', async () => { + const ws = mockWebSocket(); + const sessionId = 'remote-session-buffers'; + const userId = 'test-user'; + + const mockRemote = { + name: 'remote-server', + url: 'https://remote.example.com', + token: 'remote-token', + }; + vi.mocked(mockRemoteRegistry.getRemoteBySessionId).mockReturnValue(mockRemote); + + await handler.handleConnection(ws, sessionId, userId); + + // Test Buffer + const bufferData = Buffer.from('buffer data'); + ws.emit('message', bufferData); + expect(mockRemoteWs.send).toHaveBeenCalledWith(bufferData); + + // Test Buffer array + const bufferArray = [Buffer.from('part1'), Buffer.from('part2')]; + ws.emit('message', bufferArray); + expect(mockRemoteWs.send).toHaveBeenCalledWith(Buffer.concat(bufferArray)); + + // Test other data types (ArrayBuffer, etc.) + const arrayBuffer = new ArrayBuffer(8); + ws.emit('message', arrayBuffer); + expect(mockRemoteWs.send).toHaveBeenCalledWith(arrayBuffer); + }); + }); + + describe('Connection Lifecycle', () => { + beforeEach(() => { + handler = new WebSocketInputHandler({ + ptyManager: mockPtyManager, + terminalManager: mockTerminalManager, + activityMonitor: mockActivityMonitor, + remoteRegistry: null, + authService: mockAuthService, + isHQMode: false, + }); + }); + + it('should handle connection close event', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-close'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Simulate close event + ws.emit('close'); + + // Should not throw any errors + expect(true).toBe(true); + }); + + it('should handle connection error event', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-error'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Simulate error event + ws.emit('error', new Error('Connection error')); + + // Should not throw any errors + expect(true).toBe(true); + }); + + it('should handle ptyManager.sendInput errors gracefully', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-pty-error'; + const userId = 'test-user'; + + // Mock ptyManager to throw error + vi.mocked(mockPtyManager.sendInput).mockImplementation(() => { + throw new Error('PTY error'); + }); + + await handler.handleConnection(ws, sessionId, userId); + + // Send input that will cause error + ws.emit('message', Buffer.from('test input')); + + // Should not crash the connection + expect(mockPtyManager.sendInput).toHaveBeenCalled(); + }); + }); + + describe('Performance and Edge Cases', () => { + beforeEach(() => { + handler = new WebSocketInputHandler({ + ptyManager: mockPtyManager, + terminalManager: mockTerminalManager, + activityMonitor: mockActivityMonitor, + remoteRegistry: null, + authService: mockAuthService, + isHQMode: false, + }); + }); + + it('should handle large input messages', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-large'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Send large text input + const largeText = 'x'.repeat(100000); + ws.emit('message', Buffer.from(largeText)); + + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + text: largeText, + }); + }); + + it('should handle null bytes in regular text', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-nulls'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Text with embedded null bytes (not at start/end) + const textWithNulls = 'hello\x00world\x00test'; + ws.emit('message', Buffer.from(textWithNulls)); + + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + text: textWithNulls, + }); + }); + + it('should handle Unicode characters correctly', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-unicode'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Unicode text + const unicodeText = 'πŸš€ Hello δΈ–η•Œ 🌍'; + ws.emit('message', Buffer.from(unicodeText)); + + expect(mockPtyManager.sendInput).toHaveBeenCalledWith(sessionId, { + text: unicodeText, + }); + }); + + it('should handle concurrent messages from same connection', async () => { + const ws = mockWebSocket(); + const sessionId = 'test-session-concurrent'; + const userId = 'test-user'; + + await handler.handleConnection(ws, sessionId, userId); + + // Send many messages in quick succession + const messages = Array.from({ length: 100 }, (_, i) => `msg${i}`); + + messages.forEach((msg) => { + ws.emit('message', Buffer.from(msg)); + }); + + expect(mockPtyManager.sendInput).toHaveBeenCalledTimes(100); + + // Verify all messages were processed + messages.forEach((msg, i) => { + expect(mockPtyManager.sendInput).toHaveBeenNthCalledWith(i + 1, sessionId, { + text: msg, + }); + }); + }); + }); +});