mirror of
https://github.com/samsonjs/vibetunnel.git
synced 2026-04-27 15:17:38 +00:00
Implement control pipe for external PTY session communication
- Add control-pipe field to session types for external command communication - Create ResizeControlMessage and KillControlMessage interfaces - Update PtyManager to send control messages via pipe for external sessions - Enhance fwd.ts to create and monitor control pipe for resize/kill commands - Support real-time resize operations for external sessions via IPC - Add proper cleanup of control pipes on session exit This enables full PTY control (resize, kill) for sessions created by external tools like fwd.ts through a file-based IPC mechanism. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
0fcddd2194
commit
6c055013af
3 changed files with 170 additions and 16 deletions
109
web/src/fwd.ts
109
web/src/fwd.ts
|
|
@ -46,13 +46,13 @@ async function main() {
|
||||||
|
|
||||||
const monitorOnly = args[0] === '--monitor-only';
|
const monitorOnly = args[0] === '--monitor-only';
|
||||||
const command = monitorOnly ? args.slice(1) : args;
|
const command = monitorOnly ? args.slice(1) : args;
|
||||||
|
|
||||||
if (command.length === 0) {
|
if (command.length === 0) {
|
||||||
console.error('Error: No command specified');
|
console.error('Error: No command specified');
|
||||||
showUsage();
|
showUsage();
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
const cwd = process.cwd();
|
const cwd = process.cwd();
|
||||||
|
|
||||||
console.log(`Starting command: ${command.join(' ')}`);
|
console.log(`Starting command: ${command.join(' ')}`);
|
||||||
|
|
@ -92,6 +92,95 @@ async function main() {
|
||||||
console.log(`Status: ${session.status}`);
|
console.log(`Status: ${session.status}`);
|
||||||
console.log(`Stream output: ${session['stream-out']}`);
|
console.log(`Stream output: ${session['stream-out']}`);
|
||||||
console.log(`Input pipe: ${session.stdin}`);
|
console.log(`Input pipe: ${session.stdin}`);
|
||||||
|
|
||||||
|
// Set up control pipe for external commands (resize, etc.)
|
||||||
|
const controlPipePath = path.join(path.dirname(session.stdin), 'control-pipe');
|
||||||
|
try {
|
||||||
|
// Create control pipe as a regular file (not FIFO) for simplicity
|
||||||
|
if (!fs.existsSync(controlPipePath)) {
|
||||||
|
fs.writeFileSync(controlPipePath, '');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update session info to include control pipe
|
||||||
|
const sessionInfoPath = path.join(path.dirname(session.stdin), 'session.json');
|
||||||
|
if (fs.existsSync(sessionInfoPath)) {
|
||||||
|
const sessionInfo = JSON.parse(fs.readFileSync(sessionInfoPath, 'utf8'));
|
||||||
|
sessionInfo['control-pipe'] = controlPipePath;
|
||||||
|
fs.writeFileSync(sessionInfoPath, JSON.stringify(sessionInfo, null, 2));
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Control pipe: ${controlPipePath}`);
|
||||||
|
|
||||||
|
// Watch for control messages
|
||||||
|
let lastControlPosition = 0;
|
||||||
|
const watchControl = () => {
|
||||||
|
try {
|
||||||
|
if (fs.existsSync(controlPipePath)) {
|
||||||
|
const stats = fs.statSync(controlPipePath);
|
||||||
|
if (stats.size > lastControlPosition) {
|
||||||
|
const fd = fs.openSync(controlPipePath, 'r');
|
||||||
|
const buffer = Buffer.allocUnsafe(stats.size - lastControlPosition);
|
||||||
|
fs.readSync(fd, buffer, 0, buffer.length, lastControlPosition);
|
||||||
|
fs.closeSync(fd);
|
||||||
|
const newData = buffer.toString('utf8');
|
||||||
|
|
||||||
|
const lines = newData.split('\n').filter((line) => line.trim());
|
||||||
|
for (const line of lines) {
|
||||||
|
try {
|
||||||
|
const message = JSON.parse(line);
|
||||||
|
handleControlMessage(message);
|
||||||
|
} catch (_e) {
|
||||||
|
console.warn('Invalid control message:', line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lastControlPosition = stats.size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (_error) {
|
||||||
|
// Control file might be temporarily unavailable
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check for control messages every 200ms
|
||||||
|
const controlInterval = setInterval(watchControl, 200);
|
||||||
|
|
||||||
|
// Handle control messages
|
||||||
|
const handleControlMessage = (message: Record<string, unknown>) => {
|
||||||
|
if (message.cmd === 'resize' && message.cols && message.rows) {
|
||||||
|
console.log(`Received resize command: ${message.cols}x${message.rows}`);
|
||||||
|
// Get current session from PTY service and resize if possible
|
||||||
|
try {
|
||||||
|
ptyService.resizeSession(result.sessionId, message.cols, message.rows);
|
||||||
|
} catch (error) {
|
||||||
|
console.warn('Failed to resize session:', error);
|
||||||
|
}
|
||||||
|
} else if (message.cmd === 'kill') {
|
||||||
|
console.log(`Received kill command: ${message.signal || 'SIGTERM'}`);
|
||||||
|
// The session monitoring will detect the exit and handle cleanup
|
||||||
|
try {
|
||||||
|
ptyService.killSession(result.sessionId, message.signal || 'SIGTERM');
|
||||||
|
} catch (error) {
|
||||||
|
console.warn('Failed to kill session:', error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Clean up control interval on exit
|
||||||
|
process.on('exit', () => {
|
||||||
|
clearInterval(controlInterval);
|
||||||
|
try {
|
||||||
|
if (fs.existsSync(controlPipePath)) {
|
||||||
|
fs.unlinkSync(controlPipePath);
|
||||||
|
}
|
||||||
|
} catch (_e) {
|
||||||
|
// Ignore cleanup errors
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
console.warn('Failed to set up control pipe:', error);
|
||||||
|
}
|
||||||
|
|
||||||
if (monitorOnly) {
|
if (monitorOnly) {
|
||||||
console.log(`Monitor-only mode enabled\n`);
|
console.log(`Monitor-only mode enabled\n`);
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -117,7 +206,7 @@ async function main() {
|
||||||
// Stream PTY output to stdout
|
// Stream PTY output to stdout
|
||||||
const streamOutput = session['stream-out'];
|
const streamOutput = session['stream-out'];
|
||||||
console.log(`Waiting for output stream file: ${streamOutput}`);
|
console.log(`Waiting for output stream file: ${streamOutput}`);
|
||||||
|
|
||||||
// Wait for the stream file to be created
|
// Wait for the stream file to be created
|
||||||
const waitForStreamFile = async (maxWait = 5000) => {
|
const waitForStreamFile = async (maxWait = 5000) => {
|
||||||
const startTime = Date.now();
|
const startTime = Date.now();
|
||||||
|
|
@ -125,7 +214,7 @@ async function main() {
|
||||||
if (streamOutput && fs.existsSync(streamOutput)) {
|
if (streamOutput && fs.existsSync(streamOutput)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
await new Promise(resolve => setTimeout(resolve, 100));
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
@ -135,13 +224,13 @@ async function main() {
|
||||||
console.log('Warning: Stream output file not found, proceeding without real-time output');
|
console.log('Warning: Stream output file not found, proceeding without real-time output');
|
||||||
} else {
|
} else {
|
||||||
console.log('Stream file found, starting output monitoring...');
|
console.log('Stream file found, starting output monitoring...');
|
||||||
|
|
||||||
let lastPosition = 0;
|
let lastPosition = 0;
|
||||||
|
|
||||||
const readNewData = () => {
|
const readNewData = () => {
|
||||||
try {
|
try {
|
||||||
if (!streamOutput || !fs.existsSync(streamOutput)) return;
|
if (!streamOutput || !fs.existsSync(streamOutput)) return;
|
||||||
|
|
||||||
const stats = fs.statSync(streamOutput);
|
const stats = fs.statSync(streamOutput);
|
||||||
if (stats.size > lastPosition) {
|
if (stats.size > lastPosition) {
|
||||||
const fd = fs.openSync(streamOutput, 'r');
|
const fd = fs.openSync(streamOutput, 'r');
|
||||||
|
|
@ -149,7 +238,7 @@ async function main() {
|
||||||
fs.readSync(fd, buffer, 0, buffer.length, lastPosition);
|
fs.readSync(fd, buffer, 0, buffer.length, lastPosition);
|
||||||
fs.closeSync(fd);
|
fs.closeSync(fd);
|
||||||
const chunk = buffer.toString('utf8');
|
const chunk = buffer.toString('utf8');
|
||||||
|
|
||||||
// Parse asciinema format and extract text content
|
// Parse asciinema format and extract text content
|
||||||
const lines = chunk.split('\n');
|
const lines = chunk.split('\n');
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
|
|
@ -165,7 +254,7 @@ async function main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPosition = stats.size;
|
lastPosition = stats.size;
|
||||||
}
|
}
|
||||||
} catch (_error) {
|
} catch (_error) {
|
||||||
|
|
@ -175,7 +264,7 @@ async function main() {
|
||||||
|
|
||||||
// Start monitoring
|
// Start monitoring
|
||||||
const streamInterval = setInterval(readNewData, 50);
|
const streamInterval = setInterval(readNewData, 50);
|
||||||
|
|
||||||
// Clean up on exit
|
// Clean up on exit
|
||||||
process.on('exit', () => {
|
process.on('exit', () => {
|
||||||
clearInterval(streamInterval);
|
clearInterval(streamInterval);
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ import {
|
||||||
SessionInput,
|
SessionInput,
|
||||||
SpecialKey,
|
SpecialKey,
|
||||||
PtyError,
|
PtyError,
|
||||||
|
ResizeControlMessage,
|
||||||
|
KillControlMessage,
|
||||||
} from './types.js';
|
} from './types.js';
|
||||||
import { AsciinemaWriter } from './AsciinemaWriter.js';
|
import { AsciinemaWriter } from './AsciinemaWriter.js';
|
||||||
import { SessionManager } from './SessionManager.js';
|
import { SessionManager } from './SessionManager.js';
|
||||||
|
|
@ -311,6 +313,31 @@ export class PtyManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a control message to an external session
|
||||||
|
*/
|
||||||
|
private sendControlMessage(
|
||||||
|
sessionId: string,
|
||||||
|
message: ResizeControlMessage | KillControlMessage
|
||||||
|
): boolean {
|
||||||
|
const diskSession = this.sessionManager.getSession(sessionId);
|
||||||
|
if (!diskSession || !diskSession['control-pipe']) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const controlPipe = diskSession['control-pipe'];
|
||||||
|
try {
|
||||||
|
if (fs.existsSync(controlPipe)) {
|
||||||
|
const messageStr = JSON.stringify(message) + '\n';
|
||||||
|
fs.writeFileSync(controlPipe, messageStr);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.warn(`Failed to send control message to session ${sessionId}:`, error);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert special key names to escape sequences
|
* Convert special key names to escape sequences
|
||||||
*/
|
*/
|
||||||
|
|
@ -353,11 +380,19 @@ export class PtyManager {
|
||||||
memorySession.ptyProcess.resize(cols, rows);
|
memorySession.ptyProcess.resize(cols, rows);
|
||||||
memorySession.asciinemaWriter?.writeResize(cols, rows);
|
memorySession.asciinemaWriter?.writeResize(cols, rows);
|
||||||
} else {
|
} else {
|
||||||
// For external sessions, we can't directly resize the PTY
|
// For external sessions, try to send resize via control pipe
|
||||||
// but we don't throw an error - the session should handle SIGWINCH automatically
|
const resizeMessage: ResizeControlMessage = {
|
||||||
console.log(
|
cmd: 'resize',
|
||||||
`Cannot resize external session ${sessionId} directly, PTY should handle SIGWINCH automatically`
|
cols,
|
||||||
);
|
rows,
|
||||||
|
};
|
||||||
|
|
||||||
|
const sent = this.sendControlMessage(sessionId, resizeMessage);
|
||||||
|
if (sent) {
|
||||||
|
console.log(`Sent resize command to external session ${sessionId}: ${cols}x${rows}`);
|
||||||
|
} else {
|
||||||
|
console.log(`Cannot resize external session ${sessionId} - no control pipe available`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
throw new PtyError(
|
throw new PtyError(
|
||||||
|
|
@ -397,7 +432,20 @@ export class PtyManager {
|
||||||
// Start with SIGTERM and escalate if needed
|
// Start with SIGTERM and escalate if needed
|
||||||
await this.killSessionWithEscalation(sessionId, memorySession);
|
await this.killSessionWithEscalation(sessionId, memorySession);
|
||||||
} else {
|
} else {
|
||||||
// For external sessions, kill by PID
|
// For external sessions, try control pipe first, then fall back to PID
|
||||||
|
const killMessage: KillControlMessage = {
|
||||||
|
cmd: 'kill',
|
||||||
|
signal,
|
||||||
|
};
|
||||||
|
|
||||||
|
const sentControl = this.sendControlMessage(sessionId, killMessage);
|
||||||
|
if (sentControl) {
|
||||||
|
console.log(`Sent kill command via control pipe to session ${sessionId}`);
|
||||||
|
// Wait a bit for the control message to be processed
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if process is still running, if so, use direct PID kill
|
||||||
if (diskSession.pid && ProcessUtils.isProcessRunning(diskSession.pid)) {
|
if (diskSession.pid && ProcessUtils.isProcessRunning(diskSession.pid)) {
|
||||||
console.log(
|
console.log(
|
||||||
`Killing external session ${sessionId} (PID: ${diskSession.pid}) with ${signal}...`
|
`Killing external session ${sessionId} (PID: ${diskSession.pid}) with ${signal}...`
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,7 @@ export interface SessionEntryWithId {
|
||||||
'stream-out': string;
|
'stream-out': string;
|
||||||
stdin: string;
|
stdin: string;
|
||||||
'notification-stream': string;
|
'notification-stream': string;
|
||||||
|
'control-pipe'?: string;
|
||||||
waiting: boolean;
|
waiting: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -73,6 +74,22 @@ export interface AsciinemaTheme {
|
||||||
|
|
||||||
export type AsciinemaEventType = 'o' | 'i' | 'r' | 'm'; // output, input, resize, marker
|
export type AsciinemaEventType = 'o' | 'i' | 'r' | 'm'; // output, input, resize, marker
|
||||||
|
|
||||||
|
export interface ControlMessage {
|
||||||
|
cmd: string;
|
||||||
|
[key: string]: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ResizeControlMessage extends ControlMessage {
|
||||||
|
cmd: 'resize';
|
||||||
|
cols: number;
|
||||||
|
rows: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface KillControlMessage extends ControlMessage {
|
||||||
|
cmd: 'kill';
|
||||||
|
signal?: string | number;
|
||||||
|
}
|
||||||
|
|
||||||
export interface AsciinemaEvent {
|
export interface AsciinemaEvent {
|
||||||
time: number;
|
time: number;
|
||||||
type: AsciinemaEventType;
|
type: AsciinemaEventType;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue