Refactor socket client API: Remove HTTP fallback and add type-safe message handling

- Remove HTTP fallback and PID file management from socket-api-client
- Delete ServerPidManager utility class
- Add type-safe sendMessage and sendMessageWithResponse methods to VibeTunnelSocketClient
- Add MessagePayloadMap for compile-time type safety of message payloads
- Refactor SocketApiClient to use clean API instead of brittle type casting
- Remove backwards compatibility code - only emit events with MessageType enum names
- Simplify message handling and response listeners
- Update buildMessage to properly handle CONTROL_CMD messages
This commit is contained in:
Peter Steinberger 2025-07-28 11:40:13 +02:00
parent 9d7fe36699
commit 2b20fa9555
4 changed files with 167 additions and 130 deletions

View file

@ -141,6 +141,13 @@ if [ -n "$VIBETUNNEL_PREFER_DERIVED_DATA" ] && [ -n "$VIBETUNNEL_BIN" ]; then
fi fi
fi fi
# Handle safe commands first that work both inside and outside sessions
# This must come BEFORE the session check to avoid the recursive session error
if [[ "$1" == "status" || "$1" == "version" || "$1" == "--version" ]]; then
# These commands can run safely inside or outside a session
exec "$VIBETUNNEL_BIN" "$@"
fi
# Check if we're already inside a VibeTunnel session # Check if we're already inside a VibeTunnel session
if [ -n "$VIBETUNNEL_SESSION_ID" ]; then if [ -n "$VIBETUNNEL_SESSION_ID" ]; then
# Special case: handle 'vt title' command inside a session # Special case: handle 'vt title' command inside a session
@ -159,6 +166,7 @@ if [ -n "$VIBETUNNEL_SESSION_ID" ]; then
exit 1 exit 1
fi fi
# For all other commands, block recursive sessions
echo "Error: Already inside a VibeTunnel session (ID: $VIBETUNNEL_SESSION_ID). Recursive VibeTunnel sessions are not supported." >&2 echo "Error: Already inside a VibeTunnel session (ID: $VIBETUNNEL_SESSION_ID). Recursive VibeTunnel sessions are not supported." >&2
echo "If you need to run commands, use them directly without the 'vt' prefix." >&2 echo "If you need to run commands, use them directly without the 'vt' prefix." >&2
exit 1 exit 1
@ -365,6 +373,7 @@ if [[ $# -eq 0 || "$1" == "--help" || "$1" == "-h" ]]; then
exit 0 exit 0
fi fi
# Handle 'vt title' command when not inside a session # Handle 'vt title' command when not inside a session
if [[ "$1" == "title" ]]; then if [[ "$1" == "title" ]]; then
echo "Error: 'vt title' can only be used inside a VibeTunnel session." >&2 echo "Error: 'vt title' can only be used inside a VibeTunnel session." >&2
@ -372,12 +381,6 @@ if [[ "$1" == "title" ]]; then
exit 1 exit 1
fi fi
# Handle 'vt status' command
if [[ "$1" == "status" ]]; then
# Use vibetunnel CLI to show status via socket
exec "$VIBETUNNEL_BIN" status
fi
# Handle 'vt follow' command # Handle 'vt follow' command
if [[ "$1" == "follow" ]]; then if [[ "$1" == "follow" ]]; then
# Detect if we're in a worktree # Detect if we're in a worktree

View file

@ -6,12 +6,20 @@ import { EventEmitter } from 'events';
import * as net from 'net'; import * as net from 'net';
import { createLogger } from '../utils/logger.js'; import { createLogger } from '../utils/logger.js';
import { import {
type ControlCommand,
type ErrorMessage, type ErrorMessage,
frameMessage,
type GitEventNotify,
type GitFollowRequest,
type KillCommand,
MessageBuilder, MessageBuilder,
MessageParser, MessageParser,
type MessagePayload,
MessageType, MessageType,
parsePayload, parsePayload,
type ResizeCommand,
type StatusUpdate, type StatusUpdate,
type UpdateTitleCommand,
} from './socket-protocol.js'; } from './socket-protocol.js';
const logger = createLogger('socket-client'); const logger = createLogger('socket-client');
@ -20,8 +28,8 @@ export interface SocketClientEvents {
connect: () => void; connect: () => void;
disconnect: (error?: Error) => void; disconnect: (error?: Error) => void;
error: (error: Error) => void; error: (error: Error) => void;
status: (status: StatusUpdate) => void; // Message-specific events are emitted using MessageType enum names
serverError: (error: ErrorMessage) => void; // e.g., 'STATUS_UPDATE', 'ERROR', 'HEARTBEAT', etc.
} }
/** /**
@ -156,23 +164,14 @@ export class VibeTunnelSocketClient extends EventEmitter {
try { try {
const data = parsePayload(type, payload); const data = parsePayload(type, payload);
switch (type) { // Emit event with message type enum name
case MessageType.STATUS_UPDATE: this.emit(MessageType[type], data);
this.emit('status', data as StatusUpdate);
break;
case MessageType.ERROR: // Handle heartbeat
this.emit('serverError', data as ErrorMessage); if (type === MessageType.HEARTBEAT) {
break; this.lastHeartbeat = Date.now();
// Echo heartbeat back
case MessageType.HEARTBEAT: this.sendHeartbeat();
this.lastHeartbeat = Date.now();
// Echo heartbeat back
this.sendHeartbeat();
break;
default:
logger.debug(`Received unexpected message type: ${type}`);
} }
} catch (error) { } catch (error) {
logger.error('Failed to parse message:', error); logger.error('Failed to parse message:', error);
@ -261,6 +260,104 @@ export class VibeTunnelSocketClient extends EventEmitter {
return this.send(MessageBuilder.status(app, status, extra)); return this.send(MessageBuilder.status(app, status, extra));
} }
/**
* Send a message with type-safe payload
*/
public sendMessage<T extends MessageType>(type: T, payload: MessagePayload<T>): boolean {
const message = this.buildMessage(type, payload);
return this.send(message);
}
/**
* Send a message and wait for a response
*/
public async sendMessageWithResponse<TRequest extends MessageType, TResponse extends MessageType>(
requestType: TRequest,
payload: MessagePayload<TRequest>,
responseType: TResponse,
timeout = 5000
): Promise<MessagePayload<TResponse>> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.off(MessageType[responseType], handleResponse);
this.off('error', handleError);
reject(new Error(`Request timeout waiting for ${MessageType[responseType]}`));
}, timeout);
const handleResponse = (data: MessagePayload<TResponse>) => {
clearTimeout(timer);
this.off('error', handleError);
resolve(data);
};
const handleError = (error: Error | ErrorMessage) => {
clearTimeout(timer);
this.off(MessageType[responseType], handleResponse);
if ('message' in error) {
reject(new Error(error.message));
} else {
reject(error);
}
};
// Listen for response
this.once(MessageType[responseType], handleResponse);
this.once('error', handleError);
const sent = this.sendMessage(requestType, payload);
if (!sent) {
clearTimeout(timer);
this.off(MessageType[responseType], handleResponse);
this.off('error', handleError);
reject(new Error('Failed to send message'));
}
});
}
/**
* Build a message buffer from type and payload
*/
private buildMessage<T extends MessageType>(type: T, payload: MessagePayload<T>): Buffer {
switch (type) {
case MessageType.STDIN_DATA:
return MessageBuilder.stdin(payload as string);
case MessageType.CONTROL_CMD: {
const cmd = payload as ControlCommand;
switch (cmd.cmd) {
case 'resize':
return MessageBuilder.resize((cmd as ResizeCommand).cols, (cmd as ResizeCommand).rows);
case 'kill':
return MessageBuilder.kill((cmd as KillCommand).signal);
case 'reset-size':
return MessageBuilder.resetSize();
case 'update-title':
return MessageBuilder.updateTitle((cmd as UpdateTitleCommand).title);
default:
// For generic control commands, use frameMessage directly
return frameMessage(MessageType.CONTROL_CMD, cmd);
}
}
case MessageType.STATUS_UPDATE: {
const statusPayload = payload as StatusUpdate;
return MessageBuilder.status(
statusPayload.app,
statusPayload.status,
statusPayload.extra as Record<string, unknown> | undefined
);
}
case MessageType.HEARTBEAT:
return MessageBuilder.heartbeat();
case MessageType.STATUS_REQUEST:
return MessageBuilder.statusRequest();
case MessageType.GIT_FOLLOW_REQUEST:
return MessageBuilder.gitFollowRequest(payload as GitFollowRequest);
case MessageType.GIT_EVENT_NOTIFY:
return MessageBuilder.gitEventNotify(payload as GitEventNotify);
default:
throw new Error(`Unsupported message type: ${type}`);
}
}
/** /**
* Send heartbeat * Send heartbeat
*/ */

View file

@ -136,6 +136,30 @@ export interface GitEventAck {
handled: boolean; handled: boolean;
} }
/**
* Type-safe mapping of message types to their payload types
*/
export type MessagePayloadMap = {
[MessageType.STDIN_DATA]: string;
[MessageType.CONTROL_CMD]: ControlCommand;
[MessageType.STATUS_UPDATE]: StatusUpdate;
[MessageType.HEARTBEAT]: Record<string, never>;
[MessageType.ERROR]: ErrorMessage;
[MessageType.STATUS_REQUEST]: StatusRequest;
[MessageType.STATUS_RESPONSE]: StatusResponse;
[MessageType.GIT_FOLLOW_REQUEST]: GitFollowRequest;
[MessageType.GIT_FOLLOW_RESPONSE]: GitFollowResponse;
[MessageType.GIT_EVENT_NOTIFY]: GitEventNotify;
[MessageType.GIT_EVENT_ACK]: GitEventAck;
};
/**
* Get the payload type for a given message type
*/
export type MessagePayload<T extends MessageType> = T extends keyof MessagePayloadMap
? MessagePayloadMap[T]
: never;
/** /**
* Frame a message for transmission * Frame a message for transmission
*/ */

View file

@ -12,6 +12,7 @@ import {
type GitEventNotify, type GitEventNotify,
type GitFollowRequest, type GitFollowRequest,
type GitFollowResponse, type GitFollowResponse,
type MessagePayload,
MessageType, MessageType,
} from './pty/socket-protocol.js'; } from './pty/socket-protocol.js';
import { createLogger } from './utils/logger.js'; import { createLogger } from './utils/logger.js';
@ -52,9 +53,9 @@ export class SocketApiClient {
/** /**
* Send a request and wait for response * Send a request and wait for response
*/ */
private async sendRequest<TRequest, TResponse>( private async sendRequest<TRequest extends MessageType, TResponse>(
type: MessageType, type: TRequest,
payload: TRequest, payload: MessagePayload<TRequest>,
responseType: MessageType, responseType: MessageType,
timeout = 5000 timeout = 5000
): Promise<TResponse> { ): Promise<TResponse> {
@ -64,106 +65,18 @@ export class SocketApiClient {
const client = new VibeTunnelSocketClient(this.controlSocketPath); const client = new VibeTunnelSocketClient(this.controlSocketPath);
return new Promise((resolve, reject) => { try {
const timer = setTimeout(() => { await client.connect();
client.disconnect(); const response = await client.sendMessageWithResponse(type, payload, responseType, timeout);
reject(new Error('Request timeout')); return response as TResponse;
}, timeout); } catch (error) {
if (error instanceof Error && error.message.includes('ENOENT')) {
let responseReceived = false; throw new Error('VibeTunnel server is not running');
}
client.on('error', (error) => { throw error;
clearTimeout(timer); } finally {
if (!responseReceived) { client.disconnect();
reject(error); }
}
});
// Handle the specific response type we're expecting
const handleMessage = (msgType: MessageType, data: unknown) => {
if (msgType === responseType) {
responseReceived = true;
clearTimeout(timer);
client.disconnect();
resolve(data as TResponse);
} else if (msgType === MessageType.ERROR) {
responseReceived = true;
clearTimeout(timer);
client.disconnect();
reject(new Error((data as { message?: string }).message || 'Server error'));
}
};
// Override the handleMessage method to intercept messages
(client as unknown as { handleMessage: typeof handleMessage }).handleMessage = handleMessage;
client
.connect()
.then(() => {
// Send the request
let message: unknown;
switch (type) {
case MessageType.STATUS_REQUEST:
message = (client as unknown as { send: (msg: unknown) => unknown }).send(
(
client as unknown as {
constructor: {
prototype: {
constructor: {
MessageBuilder: Record<string, (...args: unknown[]) => unknown>;
};
};
};
}
).constructor.prototype.constructor.MessageBuilder.statusRequest()
);
break;
case MessageType.GIT_FOLLOW_REQUEST:
message = (client as unknown as { send: (msg: unknown) => unknown }).send(
(
client as unknown as {
constructor: {
prototype: {
constructor: {
MessageBuilder: Record<string, (...args: unknown[]) => unknown>;
};
};
};
}
).constructor.prototype.constructor.MessageBuilder.gitFollowRequest(payload)
);
break;
case MessageType.GIT_EVENT_NOTIFY:
message = (client as unknown as { send: (msg: unknown) => unknown }).send(
(
client as unknown as {
constructor: {
prototype: {
constructor: {
MessageBuilder: Record<string, (...args: unknown[]) => unknown>;
};
};
};
}
).constructor.prototype.constructor.MessageBuilder.gitEventNotify(payload)
);
break;
default:
clearTimeout(timer);
reject(new Error(`Unsupported message type: ${type}`));
return;
}
if (!message) {
clearTimeout(timer);
reject(new Error('Failed to send request'));
}
})
.catch((error) => {
clearTimeout(timer);
reject(error);
});
});
} }
/** /**
@ -176,7 +89,7 @@ export class SocketApiClient {
try { try {
// Send STATUS_REQUEST and wait for STATUS_RESPONSE // Send STATUS_REQUEST and wait for STATUS_RESPONSE
const response = await this.sendRequest<Record<string, never>, ServerStatus>( const response = await this.sendRequest<MessageType.STATUS_REQUEST, ServerStatus>(
MessageType.STATUS_REQUEST, MessageType.STATUS_REQUEST,
{}, {},
MessageType.STATUS_RESPONSE MessageType.STATUS_RESPONSE
@ -192,7 +105,7 @@ export class SocketApiClient {
* Enable or disable Git follow mode * Enable or disable Git follow mode
*/ */
async setFollowMode(request: GitFollowRequest): Promise<GitFollowResponse> { async setFollowMode(request: GitFollowRequest): Promise<GitFollowResponse> {
return this.sendRequest<GitFollowRequest, GitFollowResponse>( return this.sendRequest<MessageType.GIT_FOLLOW_REQUEST, GitFollowResponse>(
MessageType.GIT_FOLLOW_REQUEST, MessageType.GIT_FOLLOW_REQUEST,
request, request,
MessageType.GIT_FOLLOW_RESPONSE MessageType.GIT_FOLLOW_RESPONSE
@ -203,7 +116,7 @@ export class SocketApiClient {
* Send Git event notification * Send Git event notification
*/ */
async sendGitEvent(event: GitEventNotify): Promise<GitEventAck> { async sendGitEvent(event: GitEventNotify): Promise<GitEventAck> {
return this.sendRequest<GitEventNotify, GitEventAck>( return this.sendRequest<MessageType.GIT_EVENT_NOTIFY, GitEventAck>(
MessageType.GIT_EVENT_NOTIFY, MessageType.GIT_EVENT_NOTIFY,
event, event,
MessageType.GIT_EVENT_ACK MessageType.GIT_EVENT_ACK