feat: implement distributed buffer aggregation for HQ mode

- Enhanced /buffers WebSocket endpoint to aggregate updates from all remotes
- Added remote WebSocket connection management with proper Bearer auth
- Implemented connection pooling and automatic reconnection
- Forward binary buffer messages transparently from remotes to clients
- Track subscriptions per remote and handle cleanup properly
- Support both local and remote sessions through unified interface

This enables real-time terminal viewing across distributed VibeTunnel instances.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Mario Zechner 2025-06-20 12:43:09 +02:00
parent 31d9e3e839
commit 8558c481a0
3 changed files with 180 additions and 17 deletions

View file

@ -14,8 +14,8 @@ export function createSessionProxyMiddleware(
return next(); return next();
} }
// Extract session ID from various possible locations // Extract session ID from params
const sessionId = req.params.sessionId || (req.query.sessionId as string); const sessionId = req.params.sessionId;
if (!sessionId) { if (!sessionId) {
return next(); return next();
} }
@ -46,8 +46,7 @@ export function createSessionProxyMiddleware(
signal: AbortSignal.timeout(30000), // 30 second timeout signal: AbortSignal.timeout(30000), // 30 second timeout
}); });
// Forward the response // Set status code
const data = await response.text();
res.status(response.status); res.status(response.status);
// Copy headers // Copy headers
@ -57,14 +56,37 @@ export function createSessionProxyMiddleware(
} }
}); });
// Send response // Check content type to determine how to forward response
try { const contentType = response.headers.get('content-type') || '';
// Try to parse as JSON if (contentType.includes('application/octet-stream')) {
const jsonData = JSON.parse(data); // Binary data - forward as buffer
res.json(jsonData); const buffer = await response.arrayBuffer();
} catch { res.send(Buffer.from(buffer));
// Send as text if not JSON } else if (contentType.includes('text/event-stream')) {
res.send(data); // SSE - forward as stream
const reader = response.body?.getReader();
if (reader) {
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
res.write(decoder.decode(value, { stream: true }));
}
} finally {
reader.releaseLock();
}
}
res.end();
} else {
// Text or JSON - forward as before
const data = await response.text();
try {
const jsonData = JSON.parse(data);
res.json(jsonData);
} catch {
res.send(data);
}
} }
} catch (error) { } catch (error) {
console.error(`Failed to proxy request to remote ${remote.name}:`, error); console.error(`Failed to proxy request to remote ${remote.name}:`, error);

View file

@ -70,7 +70,7 @@ export class RemoteRegistry {
return Array.from(this.remotes.values()).find((r) => r.url === url); return Array.from(this.remotes.values()).find((r) => r.url === url);
} }
getAllRemotes(): RemoteServer[] { getRemotes(): RemoteServer[] {
return Array.from(this.remotes.values()); return Array.from(this.remotes.values());
} }

View file

@ -7,7 +7,7 @@ import * as os from 'os';
import { PtyService, PtyError } from './pty/index.js'; import { PtyService, PtyError } from './pty/index.js';
import { TerminalManager } from './terminal-manager.js'; import { TerminalManager } from './terminal-manager.js';
import { StreamWatcher } from './stream-watcher.js'; import { StreamWatcher } from './stream-watcher.js';
import { RemoteRegistry } from './remote-registry.js'; import { RemoteRegistry, RemoteServer } from './remote-registry.js';
import { HQClient } from './hq-client.js'; import { HQClient } from './hq-client.js';
import { createSessionProxyMiddleware } from './hq-utils.js'; import { createSessionProxyMiddleware } from './hq-utils.js';
@ -300,7 +300,7 @@ app.get('/api/remotes', (req, res) => {
return res.status(403).json({ error: 'This endpoint is only available in HQ mode' }); return res.status(403).json({ error: 'This endpoint is only available in HQ mode' });
} }
const remotes = remoteRegistry.getAllRemotes(); const remotes = remoteRegistry.getRemotes();
res.json(remotes); res.json(remotes);
}); });
@ -345,7 +345,7 @@ app.get('/api/sessions', async (req, res) => {
// If in HQ mode, fetch sessions from all remotes // If in HQ mode, fetch sessions from all remotes
if (isHQMode && remoteRegistry) { if (isHQMode && remoteRegistry) {
const allRemotes = remoteRegistry.getAllRemotes(); const allRemotes = remoteRegistry.getRemotes();
// Fetch sessions from each remote in parallel // Fetch sessions from each remote in parallel
const remoteSessionPromises = allRemotes.map(async (remote) => { const remoteSessionPromises = allRemotes.map(async (remote) => {
@ -517,7 +517,7 @@ app.post('/api/cleanup-exited', async (req, res) => {
// If in HQ mode, clean up sessions on all remotes // If in HQ mode, clean up sessions on all remotes
if (isHQMode && remoteRegistry) { if (isHQMode && remoteRegistry) {
const allRemotes = remoteRegistry.getAllRemotes(); const allRemotes = remoteRegistry.getRemotes();
// Clean up on each remote in parallel // Clean up on each remote in parallel
const remoteCleanupPromises = allRemotes.map(async (remote) => { const remoteCleanupPromises = allRemotes.map(async (remote) => {
@ -1059,6 +1059,8 @@ const BUFFER_MAGIC_BYTE = 0xbf;
// Handle buffer WebSocket connections // Handle buffer WebSocket connections
function handleBufferWebSocket(ws: WebSocket) { function handleBufferWebSocket(ws: WebSocket) {
const subscriptions = new Map<string, () => void>(); const subscriptions = new Map<string, () => void>();
const remoteConnections = new Map<string, WebSocket>(); // remoteId -> WebSocket connection
const remoteSubscriptions = new Map<string, Set<string>>(); // remoteId -> Set of sessionIds
let pingInterval: NodeJS.Timeout | null = null; let pingInterval: NodeJS.Timeout | null = null;
let lastPong = Date.now(); let lastPong = Date.now();
@ -1076,6 +1078,86 @@ function handleBufferWebSocket(ws: WebSocket) {
ws.send(JSON.stringify({ type: 'ping' })); ws.send(JSON.stringify({ type: 'ping' }));
}, 10000); // Ping every 10 seconds }, 10000); // Ping every 10 seconds
// Helper function to establish remote WebSocket connection
const connectToRemote = async (remote: RemoteServer): Promise<WebSocket> => {
// Check if we already have a connection
const existingConnection = remoteConnections.get(remote.id);
if (existingConnection && existingConnection.readyState === WebSocket.OPEN) {
return existingConnection;
}
// Create new connection
const remoteWsUrl = `${remote.url.replace(/^https:/, 'wss:').replace(/^http:/, 'ws:')}/buffers`;
const headers = {
Authorization: `Bearer ${remote.token}`,
};
console.log(`[BUFFER WS] Connecting to remote ${remote.name} at ${remoteWsUrl}`);
const remoteWs = new WebSocket(remoteWsUrl, { headers });
remoteConnections.set(remote.id, remoteWs);
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Remote connection timeout'));
remoteWs.close();
}, 10000);
remoteWs.on('open', () => {
clearTimeout(timeout);
console.log(`[BUFFER WS] Connected to remote ${remote.name}`);
// Initialize subscription tracking for this remote
if (!remoteSubscriptions.has(remote.id)) {
remoteSubscriptions.set(remote.id, new Set());
}
resolve(remoteWs);
});
remoteWs.on('error', (error) => {
clearTimeout(timeout);
console.error(`[BUFFER WS] Remote ${remote.name} connection error:`, error);
reject(error);
});
remoteWs.on('close', () => {
console.log(`[BUFFER WS] Remote ${remote.name} connection closed`);
remoteConnections.delete(remote.id);
// Attempt to reconnect if we still have subscriptions
const subs = remoteSubscriptions.get(remote.id);
if (subs && subs.size > 0) {
setTimeout(() => {
console.log(`[BUFFER WS] Attempting to reconnect to remote ${remote.name}`);
connectToRemote(remote).catch(console.error);
}, 5000);
}
});
// Forward messages from remote to client
remoteWs.on('message', (data) => {
if (ws.readyState === WebSocket.OPEN) {
// Check if it's a binary buffer message or JSON
if (Buffer.isBuffer(data) && data.length > 0 && data[0] === BUFFER_MAGIC_BYTE) {
// Forward binary buffer as-is
ws.send(data);
} else {
// Handle JSON messages (like ping)
try {
const message = JSON.parse(data.toString());
if (message.type === 'ping') {
remoteWs.send(JSON.stringify({ type: 'pong' }));
}
} catch (_e) {
// Not JSON, ignore
}
}
}
});
});
};
// Handle incoming messages // Handle incoming messages
ws.on('message', async (data) => { ws.on('message', async (data) => {
try { try {
@ -1097,6 +1179,60 @@ function handleBufferWebSocket(ws: WebSocket) {
console.log(`[BUFFER WS] Subscribing to session ${sessionId}`); console.log(`[BUFFER WS] Subscribing to session ${sessionId}`);
// Check if this is a remote session
if (isHQMode && remoteRegistry) {
const remote = remoteRegistry.getRemoteBySessionId(sessionId);
if (remote) {
console.log(`[BUFFER WS] Session ${sessionId} belongs to remote ${remote.name}`);
try {
// Connect to remote if not already connected
const remoteWs = await connectToRemote(remote);
// Subscribe to the session on the remote
remoteWs.send(JSON.stringify({ type: 'subscribe', sessionId }));
// Track this subscription
const remoteSubs = remoteSubscriptions.get(remote.id) || new Set();
remoteSubs.add(sessionId);
remoteSubscriptions.set(remote.id, remoteSubs);
// Create unsubscribe function
const unsubscribe = () => {
const subs = remoteSubscriptions.get(remote.id);
if (subs) {
subs.delete(sessionId);
if (subs.size === 0) {
// No more subscriptions for this remote, close connection
const conn = remoteConnections.get(remote.id);
if (conn) {
conn.close();
remoteConnections.delete(remote.id);
}
remoteSubscriptions.delete(remote.id);
} else if (remoteWs.readyState === WebSocket.OPEN) {
// Unsubscribe from this specific session
remoteWs.send(JSON.stringify({ type: 'unsubscribe', sessionId }));
}
}
};
subscriptions.set(sessionId, unsubscribe);
return;
} catch (error) {
console.error(
`[BUFFER WS] Failed to connect to remote for session ${sessionId}:`,
error
);
ws.send(
JSON.stringify({ type: 'error', message: 'Failed to connect to remote server' })
);
return;
}
}
}
// Local session - use terminalManager
try { try {
// Subscribe to buffer changes // Subscribe to buffer changes
const unsubscribe = await terminalManager.subscribeToBufferChanges( const unsubscribe = await terminalManager.subscribeToBufferChanges(
@ -1157,6 +1293,11 @@ function handleBufferWebSocket(ws: WebSocket) {
subscriptions.forEach((unsubscribe) => unsubscribe()); subscriptions.forEach((unsubscribe) => unsubscribe());
subscriptions.clear(); subscriptions.clear();
// Close all remote connections
remoteConnections.forEach((remoteWs) => remoteWs.close());
remoteConnections.clear();
remoteSubscriptions.clear();
// Clear ping interval // Clear ping interval
if (pingInterval) { if (pingInterval) {
clearInterval(pingInterval); clearInterval(pingInterval);