Implement cross-platform control pipe and direct PTY access for fwd.ts

- Add platform detection for Windows vs Unix FIFO handling
- Implement polling fallback for Windows control pipes
- Add direct PTY process access for faster keyboard input
- Fix duplicate cleanup handlers and formatting issues

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Mario Zechner 2025-06-19 06:35:33 +02:00
parent eaf161706c
commit 64e55147e5

View file

@ -88,6 +88,21 @@ async function main() {
throw new Error('Session not found after creation');
}
// Get direct access to PTY process for faster input (if using node-pty)
let directPtyProcess: any = null;
if (ptyService.isUsingNodePty()) {
try {
const ptyManager = (ptyService as any).ptyManager;
const internalSession = ptyManager?.sessions?.get(result.sessionId);
directPtyProcess = internalSession?.ptyProcess;
if (directPtyProcess) {
console.log('Got direct PTY process access for faster input');
}
} catch (error) {
console.warn('Could not get direct PTY access, using fallback:', error);
}
}
console.log(`PID: ${session.pid}`);
console.log(`Status: ${session.status}`);
console.log(`Stream output: ${session['stream-out']}`);
@ -96,14 +111,31 @@ async function main() {
// Set up control FIFO for external commands (resize, etc.)
const controlPath = path.join(path.dirname(session.stdin), 'control');
try {
// Create control FIFO (like stdin)
// Create control pipe (FIFO on Unix, regular file on Windows)
const isWindows = process.platform === 'win32';
let useFifo = false;
if (!fs.existsSync(controlPath)) {
const { spawnSync } = require('child_process');
const result = spawnSync('mkfifo', [controlPath], { stdio: 'ignore' });
if (result.status !== 0) {
// Fallback to regular file if mkfifo fails
if (!isWindows) {
const { spawnSync } = require('child_process');
const result = spawnSync('mkfifo', [controlPath], { stdio: 'ignore' });
if (result.status === 0) {
useFifo = true;
}
}
if (!useFifo) {
// Fallback to regular file (Windows or if mkfifo fails)
fs.writeFileSync(controlPath, '');
}
} else {
// Check if existing file is a FIFO
try {
const stats = fs.statSync(controlPath);
useFifo = stats.isFIFO();
} catch (_e) {
useFifo = false;
}
}
// Update session info to include control pipe
@ -114,38 +146,105 @@ async function main() {
fs.writeFileSync(sessionInfoPath, JSON.stringify(sessionInfo, null, 2));
}
console.log(`Control FIFO: ${controlPath}`);
console.log(`Control ${useFifo ? 'FIFO' : 'file'}: ${controlPath}`);
// Open control FIFO for both read and write (like stdin) to keep it open
const controlFd = fs.openSync(controlPath, 'r+');
const controlStream = fs.createReadStream('', { fd: controlFd, encoding: 'utf8' });
if (useFifo) {
// Unix FIFO approach
const controlFd = fs.openSync(controlPath, 'r+');
const controlStream = fs.createReadStream('', { fd: controlFd, encoding: 'utf8' });
controlStream.on('data', (chunk: string | Buffer) => {
const data = chunk.toString('utf8');
const lines = data.split('\n');
for (const line of lines) {
if (line.trim()) {
try {
const message = JSON.parse(line);
handleControlMessage(message);
} catch (_e) {
console.warn('Invalid control message:', line);
controlStream.on('data', (chunk: string | Buffer) => {
const data = chunk.toString('utf8');
const lines = data.split('\n');
for (const line of lines) {
if (line.trim()) {
try {
const message = JSON.parse(line);
handleControlMessage(message);
} catch (_e) {
console.warn('Invalid control message:', line);
}
}
}
}
});
});
controlStream.on('error', (error) => {
console.warn('Control FIFO stream error:', error);
});
controlStream.on('error', (error) => {
console.warn('Control FIFO stream error:', error);
});
controlStream.on('end', () => {
console.log('Control FIFO stream ended');
});
controlStream.on('end', () => {
console.log('Control FIFO stream ended');
});
// Clean up control stream on exit
process.on('exit', () => {
try {
controlStream.destroy();
fs.closeSync(controlFd);
if (fs.existsSync(controlPath)) {
fs.unlinkSync(controlPath);
}
} catch (_e) {
// Ignore cleanup errors
}
});
} else {
// Windows/fallback polling approach
let lastControlPosition = 0;
const pollControl = () => {
try {
if (fs.existsSync(controlPath)) {
const stats = fs.statSync(controlPath);
if (stats.size > lastControlPosition) {
const fd = fs.openSync(controlPath, 'r');
const buffer = Buffer.allocUnsafe(stats.size - lastControlPosition);
fs.readSync(fd, buffer, 0, buffer.length, lastControlPosition);
fs.closeSync(fd);
const data = buffer.toString('utf8');
const lines = data.split('\n');
for (const line of lines) {
if (line.trim()) {
try {
const message = JSON.parse(line);
handleControlMessage(message);
} catch (_e) {
console.warn('Invalid control message:', line);
}
}
}
lastControlPosition = stats.size;
}
}
} catch (_error) {
// Control file might be temporarily unavailable
}
};
// Poll every 100ms on Windows
const controlInterval = setInterval(pollControl, 100);
// Clean up control polling on exit
process.on('exit', () => {
clearInterval(controlInterval);
try {
if (fs.existsSync(controlPath)) {
fs.unlinkSync(controlPath);
}
} catch (_e) {
// Ignore cleanup errors
}
});
}
// Handle control messages
const handleControlMessage = (message: Record<string, unknown>) => {
if (message.cmd === 'resize' && typeof message.cols === 'number' && typeof message.rows === 'number') {
if (
message.cmd === 'resize' &&
typeof message.cols === 'number' &&
typeof message.rows === 'number'
) {
console.log(`Received resize command: ${message.cols}x${message.rows}`);
// Get current session from PTY service and resize if possible
try {
@ -154,7 +253,10 @@ async function main() {
console.warn('Failed to resize session:', error);
}
} else if (message.cmd === 'kill') {
const signal = typeof message.signal === 'string' || typeof message.signal === 'number' ? message.signal : 'SIGTERM';
const signal =
typeof message.signal === 'string' || typeof message.signal === 'number'
? message.signal
: 'SIGTERM';
console.log(`Received kill command: ${signal}`);
// The session monitoring will detect the exit and handle cleanup
try {
@ -164,19 +266,6 @@ async function main() {
}
}
};
// Clean up control stream on exit
process.on('exit', () => {
try {
controlStream.destroy();
fs.closeSync(controlFd);
if (fs.existsSync(controlPath)) {
fs.unlinkSync(controlPath);
}
} catch (_e) {
// Ignore cleanup errors
}
});
} catch (error) {
console.warn('Failed to set up control pipe:', error);
}
@ -193,10 +282,16 @@ async function main() {
process.stdin.resume();
process.stdin.setEncoding('utf8');
// Forward stdin to PTY
// Forward stdin to PTY (use direct access for speed if available)
process.stdin.on('data', (data: string) => {
try {
ptyService.sendInput(result.sessionId, { text: data });
if (directPtyProcess) {
// Direct write to PTY for maximum speed
directPtyProcess.write(data);
} else {
// Fallback to PTY service
ptyService.sendInput(result.sessionId, { text: data });
}
} catch (error) {
console.error('Failed to send input:', error);
}