From 8558c481a04d4cd52fe1d36f60cd3fb64a87c80b Mon Sep 17 00:00:00 2001 From: Mario Zechner Date: Fri, 20 Jun 2025 12:43:09 +0200 Subject: [PATCH] feat: implement distributed buffer aggregation for HQ mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Enhanced /buffers WebSocket endpoint to aggregate updates from all remotes - Added remote WebSocket connection management with proper Bearer auth - Implemented connection pooling and automatic reconnection - Forward binary buffer messages transparently from remotes to clients - Track subscriptions per remote and handle cleanup properly - Support both local and remote sessions through unified interface This enables real-time terminal viewing across distributed VibeTunnel instances. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- web/src/hq-utils.ts | 46 +++++++++--- web/src/remote-registry.ts | 2 +- web/src/server.ts | 149 ++++++++++++++++++++++++++++++++++++- 3 files changed, 180 insertions(+), 17 deletions(-) diff --git a/web/src/hq-utils.ts b/web/src/hq-utils.ts index c8cf928d..9eb707f9 100644 --- a/web/src/hq-utils.ts +++ b/web/src/hq-utils.ts @@ -14,8 +14,8 @@ export function createSessionProxyMiddleware( return next(); } - // Extract session ID from various possible locations - const sessionId = req.params.sessionId || (req.query.sessionId as string); + // Extract session ID from params + const sessionId = req.params.sessionId; if (!sessionId) { return next(); } @@ -46,8 +46,7 @@ export function createSessionProxyMiddleware( signal: AbortSignal.timeout(30000), // 30 second timeout }); - // Forward the response - const data = await response.text(); + // Set status code res.status(response.status); // Copy headers @@ -57,14 +56,37 @@ export function createSessionProxyMiddleware( } }); - // Send response - try { - // Try to parse as JSON - const jsonData = JSON.parse(data); - res.json(jsonData); - } catch { - // Send as text if not JSON - res.send(data); + // Check content type to determine how to forward response + const contentType = response.headers.get('content-type') || ''; + if (contentType.includes('application/octet-stream')) { + // Binary data - forward as buffer + const buffer = await response.arrayBuffer(); + res.send(Buffer.from(buffer)); + } else if (contentType.includes('text/event-stream')) { + // SSE - forward as stream + const reader = response.body?.getReader(); + if (reader) { + const decoder = new TextDecoder(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + res.write(decoder.decode(value, { stream: true })); + } + } finally { + reader.releaseLock(); + } + } + res.end(); + } else { + // Text or JSON - forward as before + const data = await response.text(); + try { + const jsonData = JSON.parse(data); + res.json(jsonData); + } catch { + res.send(data); + } } } catch (error) { console.error(`Failed to proxy request to remote ${remote.name}:`, error); diff --git a/web/src/remote-registry.ts b/web/src/remote-registry.ts index ea85dcf8..e8ddfc6d 100644 --- a/web/src/remote-registry.ts +++ b/web/src/remote-registry.ts @@ -70,7 +70,7 @@ export class RemoteRegistry { return Array.from(this.remotes.values()).find((r) => r.url === url); } - getAllRemotes(): RemoteServer[] { + getRemotes(): RemoteServer[] { return Array.from(this.remotes.values()); } diff --git a/web/src/server.ts b/web/src/server.ts index d1527850..de9465bf 100644 --- a/web/src/server.ts +++ b/web/src/server.ts @@ -7,7 +7,7 @@ import * as os from 'os'; import { PtyService, PtyError } from './pty/index.js'; import { TerminalManager } from './terminal-manager.js'; import { StreamWatcher } from './stream-watcher.js'; -import { RemoteRegistry } from './remote-registry.js'; +import { RemoteRegistry, RemoteServer } from './remote-registry.js'; import { HQClient } from './hq-client.js'; import { createSessionProxyMiddleware } from './hq-utils.js'; @@ -300,7 +300,7 @@ app.get('/api/remotes', (req, res) => { return res.status(403).json({ error: 'This endpoint is only available in HQ mode' }); } - const remotes = remoteRegistry.getAllRemotes(); + const remotes = remoteRegistry.getRemotes(); res.json(remotes); }); @@ -345,7 +345,7 @@ app.get('/api/sessions', async (req, res) => { // If in HQ mode, fetch sessions from all remotes if (isHQMode && remoteRegistry) { - const allRemotes = remoteRegistry.getAllRemotes(); + const allRemotes = remoteRegistry.getRemotes(); // Fetch sessions from each remote in parallel const remoteSessionPromises = allRemotes.map(async (remote) => { @@ -517,7 +517,7 @@ app.post('/api/cleanup-exited', async (req, res) => { // If in HQ mode, clean up sessions on all remotes if (isHQMode && remoteRegistry) { - const allRemotes = remoteRegistry.getAllRemotes(); + const allRemotes = remoteRegistry.getRemotes(); // Clean up on each remote in parallel const remoteCleanupPromises = allRemotes.map(async (remote) => { @@ -1059,6 +1059,8 @@ const BUFFER_MAGIC_BYTE = 0xbf; // Handle buffer WebSocket connections function handleBufferWebSocket(ws: WebSocket) { const subscriptions = new Map void>(); + const remoteConnections = new Map(); // remoteId -> WebSocket connection + const remoteSubscriptions = new Map>(); // remoteId -> Set of sessionIds let pingInterval: NodeJS.Timeout | null = null; let lastPong = Date.now(); @@ -1076,6 +1078,86 @@ function handleBufferWebSocket(ws: WebSocket) { ws.send(JSON.stringify({ type: 'ping' })); }, 10000); // Ping every 10 seconds + // Helper function to establish remote WebSocket connection + const connectToRemote = async (remote: RemoteServer): Promise => { + // Check if we already have a connection + const existingConnection = remoteConnections.get(remote.id); + if (existingConnection && existingConnection.readyState === WebSocket.OPEN) { + return existingConnection; + } + + // Create new connection + const remoteWsUrl = `${remote.url.replace(/^https:/, 'wss:').replace(/^http:/, 'ws:')}/buffers`; + const headers = { + Authorization: `Bearer ${remote.token}`, + }; + + console.log(`[BUFFER WS] Connecting to remote ${remote.name} at ${remoteWsUrl}`); + + const remoteWs = new WebSocket(remoteWsUrl, { headers }); + remoteConnections.set(remote.id, remoteWs); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error('Remote connection timeout')); + remoteWs.close(); + }, 10000); + + remoteWs.on('open', () => { + clearTimeout(timeout); + console.log(`[BUFFER WS] Connected to remote ${remote.name}`); + + // Initialize subscription tracking for this remote + if (!remoteSubscriptions.has(remote.id)) { + remoteSubscriptions.set(remote.id, new Set()); + } + + resolve(remoteWs); + }); + + remoteWs.on('error', (error) => { + clearTimeout(timeout); + console.error(`[BUFFER WS] Remote ${remote.name} connection error:`, error); + reject(error); + }); + + remoteWs.on('close', () => { + console.log(`[BUFFER WS] Remote ${remote.name} connection closed`); + remoteConnections.delete(remote.id); + + // Attempt to reconnect if we still have subscriptions + const subs = remoteSubscriptions.get(remote.id); + if (subs && subs.size > 0) { + setTimeout(() => { + console.log(`[BUFFER WS] Attempting to reconnect to remote ${remote.name}`); + connectToRemote(remote).catch(console.error); + }, 5000); + } + }); + + // Forward messages from remote to client + remoteWs.on('message', (data) => { + if (ws.readyState === WebSocket.OPEN) { + // Check if it's a binary buffer message or JSON + if (Buffer.isBuffer(data) && data.length > 0 && data[0] === BUFFER_MAGIC_BYTE) { + // Forward binary buffer as-is + ws.send(data); + } else { + // Handle JSON messages (like ping) + try { + const message = JSON.parse(data.toString()); + if (message.type === 'ping') { + remoteWs.send(JSON.stringify({ type: 'pong' })); + } + } catch (_e) { + // Not JSON, ignore + } + } + } + }); + }); + }; + // Handle incoming messages ws.on('message', async (data) => { try { @@ -1097,6 +1179,60 @@ function handleBufferWebSocket(ws: WebSocket) { console.log(`[BUFFER WS] Subscribing to session ${sessionId}`); + // Check if this is a remote session + if (isHQMode && remoteRegistry) { + const remote = remoteRegistry.getRemoteBySessionId(sessionId); + if (remote) { + console.log(`[BUFFER WS] Session ${sessionId} belongs to remote ${remote.name}`); + + try { + // Connect to remote if not already connected + const remoteWs = await connectToRemote(remote); + + // Subscribe to the session on the remote + remoteWs.send(JSON.stringify({ type: 'subscribe', sessionId })); + + // Track this subscription + const remoteSubs = remoteSubscriptions.get(remote.id) || new Set(); + remoteSubs.add(sessionId); + remoteSubscriptions.set(remote.id, remoteSubs); + + // Create unsubscribe function + const unsubscribe = () => { + const subs = remoteSubscriptions.get(remote.id); + if (subs) { + subs.delete(sessionId); + if (subs.size === 0) { + // No more subscriptions for this remote, close connection + const conn = remoteConnections.get(remote.id); + if (conn) { + conn.close(); + remoteConnections.delete(remote.id); + } + remoteSubscriptions.delete(remote.id); + } else if (remoteWs.readyState === WebSocket.OPEN) { + // Unsubscribe from this specific session + remoteWs.send(JSON.stringify({ type: 'unsubscribe', sessionId })); + } + } + }; + + subscriptions.set(sessionId, unsubscribe); + return; + } catch (error) { + console.error( + `[BUFFER WS] Failed to connect to remote for session ${sessionId}:`, + error + ); + ws.send( + JSON.stringify({ type: 'error', message: 'Failed to connect to remote server' }) + ); + return; + } + } + } + + // Local session - use terminalManager try { // Subscribe to buffer changes const unsubscribe = await terminalManager.subscribeToBufferChanges( @@ -1157,6 +1293,11 @@ function handleBufferWebSocket(ws: WebSocket) { subscriptions.forEach((unsubscribe) => unsubscribe()); subscriptions.clear(); + // Close all remote connections + remoteConnections.forEach((remoteWs) => remoteWs.close()); + remoteConnections.clear(); + remoteSubscriptions.clear(); + // Clear ping interval if (pingInterval) { clearInterval(pingInterval);