From fd7a874ee523b3a904977cacd01c6522aab11d5f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 20 Jun 2025 21:12:37 +0200 Subject: [PATCH] native OS event APIs (epoll/kqueue) for true event-driven I/O --- linux/pkg/session/eventloop.go | 193 ++++++++++++++++++ linux/pkg/session/eventloop_darwin.go | 269 ++++++++++++++++++++++++++ linux/pkg/session/eventloop_linux.go | 266 +++++++++++++++++++++++++ linux/pkg/session/eventloop_other.go | 148 ++++++++++++++ linux/pkg/session/pty.go | 163 +++++++++++++++- 5 files changed, 1034 insertions(+), 5 deletions(-) create mode 100644 linux/pkg/session/eventloop.go create mode 100644 linux/pkg/session/eventloop_darwin.go create mode 100644 linux/pkg/session/eventloop_linux.go create mode 100644 linux/pkg/session/eventloop_other.go diff --git a/linux/pkg/session/eventloop.go b/linux/pkg/session/eventloop.go new file mode 100644 index 00000000..b945e1c8 --- /dev/null +++ b/linux/pkg/session/eventloop.go @@ -0,0 +1,193 @@ +package session + +import ( + "fmt" + "io" + "os" + "syscall" +) + +// EventType represents the type of event +type EventType uint32 + +const ( + EventRead EventType = 1 << 0 + EventWrite EventType = 1 << 1 + EventError EventType = 1 << 2 + EventHup EventType = 1 << 3 +) + +// Event represents an I/O event +type Event struct { + FD int + Events EventType + Data interface{} // User data associated with the FD +} + +// EventHandler is called when an event occurs +type EventHandler func(event Event) + +// EventLoop provides platform-independent event-driven I/O +type EventLoop interface { + // Add registers a file descriptor for event monitoring + Add(fd int, events EventType, data interface{}) error + + // Remove unregisters a file descriptor + Remove(fd int) error + + // Modify changes the events to monitor for a file descriptor + Modify(fd int, events EventType) error + + // Run starts the event loop, blocking until Stop is called + Run(handler EventHandler) error + + // RunOnce processes events once with optional timeout (-1 for blocking) + RunOnce(handler EventHandler, timeoutMs int) error + + // Stop terminates the event loop + Stop() error + + // Close releases all resources + Close() error +} + +// NewEventLoop creates a platform-specific event loop +func NewEventLoop() (EventLoop, error) { + return newPlatformEventLoop() +} + +// PTYEventHandler handles PTY I/O events using the event loop +type PTYEventHandler struct { + pty *PTY + eventLoop EventLoop + outputBuffer []byte + handlers map[int]func(Event) +} + +// NewPTYEventHandler creates a new event-driven PTY handler +func NewPTYEventHandler(pty *PTY) (*PTYEventHandler, error) { + eventLoop, err := NewEventLoop() + if err != nil { + return nil, fmt.Errorf("failed to create event loop: %w", err) + } + + handler := &PTYEventHandler{ + pty: pty, + eventLoop: eventLoop, + outputBuffer: make([]byte, 4096), + handlers: make(map[int]func(Event)), + } + + // Register PTY for read events + ptyFD := int(pty.pty.Fd()) + if err := eventLoop.Add(ptyFD, EventRead|EventHup, "pty"); err != nil { + eventLoop.Close() + return nil, fmt.Errorf("failed to add PTY to event loop: %w", err) + } + + // Set up handlers + handler.handlers[ptyFD] = handler.handlePTYEvent + + return handler, nil +} + +// Run starts the event-driven I/O loop +func (h *PTYEventHandler) Run() error { + return h.eventLoop.Run(func(event Event) { + if handler, ok := h.handlers[event.FD]; ok { + handler(event) + } + }) +} + +// handlePTYEvent processes PTY read events +func (h *PTYEventHandler) handlePTYEvent(event Event) { + if event.Events&EventRead != 0 { + // Data available for reading + for { + n, err := h.pty.pty.Read(h.outputBuffer) + if n > 0 { + // Write to stream immediately + if err := h.pty.streamWriter.WriteOutput(h.outputBuffer[:n]); err != nil { + debugLog("[ERROR] Failed to write PTY output: %v", err) + } + } + + if err != nil { + if err == io.EOF || err == syscall.EAGAIN || err == syscall.EWOULDBLOCK { + // No more data available + break + } + // Real error + debugLog("[ERROR] PTY read error: %v", err) + h.eventLoop.Stop() + break + } + + // Continue reading if we filled the buffer + if n < len(h.outputBuffer) { + break + } + } + } + + if event.Events&EventHup != 0 { + // PTY closed + debugLog("[DEBUG] PTY closed (HUP event)") + h.eventLoop.Stop() + } +} + +// AddStdinPipe adds stdin pipe monitoring to the event loop +func (h *PTYEventHandler) AddStdinPipe(stdinPipe *os.File) error { + stdinFD := int(stdinPipe.Fd()) + + // Set non-blocking mode + if err := syscall.SetNonblock(stdinFD, true); err != nil { + return fmt.Errorf("failed to set stdin non-blocking: %w", err) + } + + // Add to event loop + if err := h.eventLoop.Add(stdinFD, EventRead, "stdin"); err != nil { + return fmt.Errorf("failed to add stdin to event loop: %w", err) + } + + // Set up handler + h.handlers[stdinFD] = h.handleStdinEvent + + return nil +} + +// handleStdinEvent processes stdin input events +func (h *PTYEventHandler) handleStdinEvent(event Event) { + if event.Events&EventRead != 0 { + buf := make([]byte, 1024) + n, err := syscall.Read(event.FD, buf) + if n > 0 { + // Write to PTY + if _, err := h.pty.pty.Write(buf[:n]); err != nil { + debugLog("[ERROR] Failed to write to PTY: %v", err) + } + + // Also write to asciinema stream + if err := h.pty.streamWriter.WriteInput(buf[:n]); err != nil { + debugLog("[ERROR] Failed to write input to stream: %v", err) + } + } + + if err != nil && err != syscall.EAGAIN && err != syscall.EWOULDBLOCK { + debugLog("[ERROR] Stdin read error: %v", err) + h.eventLoop.Remove(event.FD) + } + } +} + +// Stop stops the event loop +func (h *PTYEventHandler) Stop() error { + return h.eventLoop.Stop() +} + +// Close cleans up resources +func (h *PTYEventHandler) Close() error { + return h.eventLoop.Close() +} \ No newline at end of file diff --git a/linux/pkg/session/eventloop_darwin.go b/linux/pkg/session/eventloop_darwin.go new file mode 100644 index 00000000..bc9fcf28 --- /dev/null +++ b/linux/pkg/session/eventloop_darwin.go @@ -0,0 +1,269 @@ +//go:build darwin || freebsd || openbsd || netbsd +// +build darwin freebsd openbsd netbsd + +package session + +import ( + "fmt" + "sync" + "syscall" + "time" + + "golang.org/x/sys/unix" +) + +// kqueueEventLoop implements EventLoop using kqueue (macOS/BSD) +type kqueueEventLoop struct { + kq int + mu sync.Mutex + running bool + stopChan chan struct{} + fdData map[int]interface{} +} + +func newPlatformEventLoop() (EventLoop, error) { + kq, err := unix.Kqueue() + if err != nil { + return nil, fmt.Errorf("failed to create kqueue: %w", err) + } + + return &kqueueEventLoop{ + kq: kq, + stopChan: make(chan struct{}), + fdData: make(map[int]interface{}), + }, nil +} + +func (e *kqueueEventLoop) Add(fd int, events EventType, data interface{}) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.fdData[fd] = data + + var kevents []unix.Kevent_t + + if events&EventRead != 0 { + kevents = append(kevents, unix.Kevent_t{ + Ident: uint64(fd), + Filter: unix.EVFILT_READ, + Flags: unix.EV_ADD | unix.EV_ENABLE, + }) + } + + if events&EventWrite != 0 { + kevents = append(kevents, unix.Kevent_t{ + Ident: uint64(fd), + Filter: unix.EVFILT_WRITE, + Flags: unix.EV_ADD | unix.EV_ENABLE, + }) + } + + if len(kevents) > 0 { + _, err := unix.Kevent(e.kq, kevents, nil, nil) + if err != nil { + delete(e.fdData, fd) + return fmt.Errorf("failed to add fd %d to kqueue: %w", fd, err) + } + } + + // Set non-blocking mode + if err := unix.SetNonblock(fd, true); err != nil { + // Not fatal, but log it + debugLog("[WARN] Failed to set non-blocking mode on fd %d: %v", fd, err) + } + + return nil +} + +func (e *kqueueEventLoop) Remove(fd int) error { + e.mu.Lock() + defer e.mu.Unlock() + + delete(e.fdData, fd) + + // Remove both read and write filters + kevents := []unix.Kevent_t{ + { + Ident: uint64(fd), + Filter: unix.EVFILT_READ, + Flags: unix.EV_DELETE, + }, + { + Ident: uint64(fd), + Filter: unix.EVFILT_WRITE, + Flags: unix.EV_DELETE, + }, + } + + _, err := unix.Kevent(e.kq, kevents, nil, nil) + if err != nil && err != syscall.ENOENT { + return fmt.Errorf("failed to remove fd %d from kqueue: %w", fd, err) + } + + return nil +} + +func (e *kqueueEventLoop) Modify(fd int, events EventType) error { + // For kqueue, we need to remove and re-add + if err := e.Remove(fd); err != nil { + return err + } + + e.mu.Lock() + data := e.fdData[fd] + e.mu.Unlock() + + return e.Add(fd, events, data) +} + +func (e *kqueueEventLoop) Run(handler EventHandler) error { + e.mu.Lock() + if e.running { + e.mu.Unlock() + return fmt.Errorf("event loop already running") + } + e.running = true + e.mu.Unlock() + + defer func() { + e.mu.Lock() + e.running = false + e.mu.Unlock() + }() + + events := make([]unix.Kevent_t, 128) + + for { + select { + case <-e.stopChan: + return nil + default: + } + + // Wait for events with 100ms timeout to check for stop + n, err := unix.Kevent(e.kq, nil, events, &unix.Timespec{ + Sec: 0, + Nsec: 100 * 1000 * 1000, // 100ms + }) + + if err != nil { + if err == unix.EINTR { + continue + } + return fmt.Errorf("kevent wait failed: %w", err) + } + + // Process events + for i := 0; i < n; i++ { + event := &events[i] + fd := int(event.Ident) + + e.mu.Lock() + data := e.fdData[fd] + e.mu.Unlock() + + var eventType EventType + + // Convert kqueue events to our EventType + if event.Filter == unix.EVFILT_READ { + eventType |= EventRead + } + if event.Filter == unix.EVFILT_WRITE { + eventType |= EventWrite + } + if event.Flags&unix.EV_EOF != 0 { + eventType |= EventHup + } + if event.Flags&unix.EV_ERROR != 0 { + eventType |= EventError + } + + handler(Event{ + FD: fd, + Events: eventType, + Data: data, + }) + } + } +} + +func (e *kqueueEventLoop) RunOnce(handler EventHandler, timeoutMs int) error { + events := make([]unix.Kevent_t, 128) + + var timeout *unix.Timespec + if timeoutMs >= 0 { + timeout = &unix.Timespec{ + Sec: int64(timeoutMs / 1000), + Nsec: int64((timeoutMs % 1000) * 1000 * 1000), + } + } + + n, err := unix.Kevent(e.kq, nil, events, timeout) + if err != nil { + if err == unix.EINTR { + return nil + } + return fmt.Errorf("kevent wait failed: %w", err) + } + + // Process events + for i := 0; i < n; i++ { + event := &events[i] + fd := int(event.Ident) + + e.mu.Lock() + data := e.fdData[fd] + e.mu.Unlock() + + var eventType EventType + + if event.Filter == unix.EVFILT_READ { + eventType |= EventRead + } + if event.Filter == unix.EVFILT_WRITE { + eventType |= EventWrite + } + if event.Flags&unix.EV_EOF != 0 { + eventType |= EventHup + } + if event.Flags&unix.EV_ERROR != 0 { + eventType |= EventError + } + + handler(Event{ + FD: fd, + Events: eventType, + Data: data, + }) + } + + return nil +} + +func (e *kqueueEventLoop) Stop() error { + close(e.stopChan) + + // Create a new stop channel for future runs + e.stopChan = make(chan struct{}) + + return nil +} + +func (e *kqueueEventLoop) Close() error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.running { + e.Stop() + // Give it a moment to stop + time.Sleep(10 * time.Millisecond) + } + + if e.kq >= 0 { + err := unix.Close(e.kq) + e.kq = -1 + return err + } + + return nil +} \ No newline at end of file diff --git a/linux/pkg/session/eventloop_linux.go b/linux/pkg/session/eventloop_linux.go new file mode 100644 index 00000000..73cf0c74 --- /dev/null +++ b/linux/pkg/session/eventloop_linux.go @@ -0,0 +1,266 @@ +//go:build linux +// +build linux + +package session + +import ( + "fmt" + "sync" + "syscall" + "time" + + "golang.org/x/sys/unix" +) + +// epollEventLoop implements EventLoop using epoll (Linux) +type epollEventLoop struct { + epfd int + mu sync.Mutex + running bool + stopChan chan struct{} + fdData map[int]interface{} +} + +func newPlatformEventLoop() (EventLoop, error) { + epfd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) + if err != nil { + return nil, fmt.Errorf("failed to create epoll: %w", err) + } + + return &epollEventLoop{ + epfd: epfd, + stopChan: make(chan struct{}), + fdData: make(map[int]interface{}), + }, nil +} + +func (e *epollEventLoop) Add(fd int, events EventType, data interface{}) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.fdData[fd] = data + + var epollEvents uint32 + if events&EventRead != 0 { + epollEvents |= unix.EPOLLIN | unix.EPOLLPRI + } + if events&EventWrite != 0 { + epollEvents |= unix.EPOLLOUT + } + if events&EventError != 0 { + epollEvents |= unix.EPOLLERR + } + if events&EventHup != 0 { + epollEvents |= unix.EPOLLHUP | unix.EPOLLRDHUP + } + + // Use edge-triggered mode for better performance + epollEvents |= unix.EPOLLET + + event := unix.EpollEvent{ + Events: epollEvents, + Fd: int32(fd), + } + + if err := unix.EpollCtl(e.epfd, unix.EPOLL_CTL_ADD, fd, &event); err != nil { + delete(e.fdData, fd) + return fmt.Errorf("failed to add fd %d to epoll: %w", fd, err) + } + + // Set non-blocking mode + if err := unix.SetNonblock(fd, true); err != nil { + // Not fatal, but log it + debugLog("[WARN] Failed to set non-blocking mode on fd %d: %v", fd, err) + } + + return nil +} + +func (e *epollEventLoop) Remove(fd int) error { + e.mu.Lock() + defer e.mu.Unlock() + + delete(e.fdData, fd) + + if err := unix.EpollCtl(e.epfd, unix.EPOLL_CTL_DEL, fd, nil); err != nil { + if err != syscall.ENOENT { + return fmt.Errorf("failed to remove fd %d from epoll: %w", fd, err) + } + } + + return nil +} + +func (e *epollEventLoop) Modify(fd int, events EventType) error { + e.mu.Lock() + defer e.mu.Unlock() + + var epollEvents uint32 + if events&EventRead != 0 { + epollEvents |= unix.EPOLLIN | unix.EPOLLPRI + } + if events&EventWrite != 0 { + epollEvents |= unix.EPOLLOUT + } + if events&EventError != 0 { + epollEvents |= unix.EPOLLERR + } + if events&EventHup != 0 { + epollEvents |= unix.EPOLLHUP | unix.EPOLLRDHUP + } + + // Use edge-triggered mode + epollEvents |= unix.EPOLLET + + event := unix.EpollEvent{ + Events: epollEvents, + Fd: int32(fd), + } + + if err := unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &event); err != nil { + return fmt.Errorf("failed to modify fd %d in epoll: %w", fd, err) + } + + return nil +} + +func (e *epollEventLoop) Run(handler EventHandler) error { + e.mu.Lock() + if e.running { + e.mu.Unlock() + return fmt.Errorf("event loop already running") + } + e.running = true + e.mu.Unlock() + + defer func() { + e.mu.Lock() + e.running = false + e.mu.Unlock() + }() + + events := make([]unix.EpollEvent, 128) + + for { + select { + case <-e.stopChan: + return nil + default: + } + + // Wait for events with 100ms timeout to check for stop + n, err := unix.EpollWait(e.epfd, events, 100) + + if err != nil { + if err == unix.EINTR { + continue + } + return fmt.Errorf("epoll wait failed: %w", err) + } + + // Process events + for i := 0; i < n; i++ { + event := &events[i] + fd := int(event.Fd) + + e.mu.Lock() + data := e.fdData[fd] + e.mu.Unlock() + + var eventType EventType + + // Convert epoll events to our EventType + if event.Events&(unix.EPOLLIN|unix.EPOLLPRI) != 0 { + eventType |= EventRead + } + if event.Events&unix.EPOLLOUT != 0 { + eventType |= EventWrite + } + if event.Events&(unix.EPOLLHUP|unix.EPOLLRDHUP) != 0 { + eventType |= EventHup + } + if event.Events&unix.EPOLLERR != 0 { + eventType |= EventError + } + + handler(Event{ + FD: fd, + Events: eventType, + Data: data, + }) + } + } +} + +func (e *epollEventLoop) RunOnce(handler EventHandler, timeoutMs int) error { + events := make([]unix.EpollEvent, 128) + + n, err := unix.EpollWait(e.epfd, events, timeoutMs) + if err != nil { + if err == unix.EINTR { + return nil + } + return fmt.Errorf("epoll wait failed: %w", err) + } + + // Process events + for i := 0; i < n; i++ { + event := &events[i] + fd := int(event.Fd) + + e.mu.Lock() + data := e.fdData[fd] + e.mu.Unlock() + + var eventType EventType + + if event.Events&(unix.EPOLLIN|unix.EPOLLPRI) != 0 { + eventType |= EventRead + } + if event.Events&unix.EPOLLOUT != 0 { + eventType |= EventWrite + } + if event.Events&(unix.EPOLLHUP|unix.EPOLLRDHUP) != 0 { + eventType |= EventHup + } + if event.Events&unix.EPOLLERR != 0 { + eventType |= EventError + } + + handler(Event{ + FD: fd, + Events: eventType, + Data: data, + }) + } + + return nil +} + +func (e *epollEventLoop) Stop() error { + close(e.stopChan) + + // Create a new stop channel for future runs + e.stopChan = make(chan struct{}) + + return nil +} + +func (e *epollEventLoop) Close() error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.running { + e.Stop() + // Give it a moment to stop + time.Sleep(10 * time.Millisecond) + } + + if e.epfd >= 0 { + err := unix.Close(e.epfd) + e.epfd = -1 + return err + } + + return nil +} \ No newline at end of file diff --git a/linux/pkg/session/eventloop_other.go b/linux/pkg/session/eventloop_other.go new file mode 100644 index 00000000..e7da7735 --- /dev/null +++ b/linux/pkg/session/eventloop_other.go @@ -0,0 +1,148 @@ +//go:build !linux && !darwin && !freebsd && !openbsd && !netbsd +// +build !linux,!darwin,!freebsd,!openbsd,!netbsd + +package session + +import ( + "fmt" + "sync" + "time" +) + +// selectEventLoop implements EventLoop using select() as a fallback +type selectEventLoop struct { + mu sync.Mutex + running bool + stopChan chan struct{} + fds map[int]*fdInfo +} + +type fdInfo struct { + fd int + events EventType + data interface{} +} + +func newPlatformEventLoop() (EventLoop, error) { + return &selectEventLoop{ + stopChan: make(chan struct{}), + fds: make(map[int]*fdInfo), + }, nil +} + +func (e *selectEventLoop) Add(fd int, events EventType, data interface{}) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.fds[fd] = &fdInfo{ + fd: fd, + events: events, + data: data, + } + + return nil +} + +func (e *selectEventLoop) Remove(fd int) error { + e.mu.Lock() + defer e.mu.Unlock() + + delete(e.fds, fd) + return nil +} + +func (e *selectEventLoop) Modify(fd int, events EventType) error { + e.mu.Lock() + defer e.mu.Unlock() + + if info, ok := e.fds[fd]; ok { + info.events = events + } + + return nil +} + +func (e *selectEventLoop) Run(handler EventHandler) error { + e.mu.Lock() + if e.running { + e.mu.Unlock() + return fmt.Errorf("event loop already running") + } + e.running = true + e.mu.Unlock() + + defer func() { + e.mu.Lock() + e.running = false + e.mu.Unlock() + }() + + for { + select { + case <-e.stopChan: + return nil + default: + } + + // Use existing select-based polling as fallback + // This is not as efficient as epoll/kqueue but works everywhere + if err := e.RunOnce(handler, 10); err != nil { + return err + } + } +} + +func (e *selectEventLoop) RunOnce(handler EventHandler, timeoutMs int) error { + e.mu.Lock() + fdList := make([]int, 0, len(e.fds)) + for fd := range e.fds { + fdList = append(fdList, fd) + } + e.mu.Unlock() + + if len(fdList) == 0 { + time.Sleep(time.Duration(timeoutMs) * time.Millisecond) + return nil + } + + // Use the existing selectRead function + ready, err := selectRead(fdList, time.Duration(timeoutMs)*time.Millisecond) + if err != nil { + return err + } + + // Process ready file descriptors + for _, fd := range ready { + e.mu.Lock() + info, ok := e.fds[fd] + e.mu.Unlock() + + if ok { + handler(Event{ + FD: fd, + Events: EventRead, // select only supports read events + Data: info.data, + }) + } + } + + return nil +} + +func (e *selectEventLoop) Stop() error { + close(e.stopChan) + e.stopChan = make(chan struct{}) + return nil +} + +func (e *selectEventLoop) Close() error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.running { + e.Stop() + time.Sleep(10 * time.Millisecond) + } + + return nil +} \ No newline at end of file diff --git a/linux/pkg/session/pty.go b/linux/pkg/session/pty.go index 6e9ee23c..973b8a65 100644 --- a/linux/pkg/session/pty.go +++ b/linux/pkg/session/pty.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "os/signal" + "runtime" "strings" "sync" "syscall" @@ -18,9 +19,9 @@ import ( "golang.org/x/term" ) -// useSelectPolling determines whether to use select-based polling -// Enable this for better control FIFO integration -const useSelectPolling = true +// useEventDrivenIO determines whether to use native event-driven I/O +// This uses epoll on Linux and kqueue on macOS for zero-latency I/O +const useEventDrivenIO = true // isShellBuiltin checks if a command is a shell builtin func isShellBuiltin(cmd string) bool { @@ -278,6 +279,153 @@ func (p *PTY) Pid() int { return 0 } +// runEventDriven runs the PTY using native event-driven I/O (epoll/kqueue) +func (p *PTY) runEventDriven() error { + debugLog("[DEBUG] PTY.runEventDriven: Starting event-driven I/O for session %s", p.session.ID[:8]) + + // Create event loop + eventLoop, err := NewEventLoop() + if err != nil { + log.Printf("[ERROR] PTY.runEventDriven: Failed to create event loop: %v", err) + // Fall back to polling + return p.pollWithSelect() + } + defer eventLoop.Close() + + // Set PTY to non-blocking mode + if err := unix.SetNonblock(int(p.pty.Fd()), true); err != nil { + log.Printf("[WARN] PTY.runEventDriven: Failed to set PTY non-blocking: %v", err) + } + + // Add PTY to event loop for reading + ptyFD := int(p.pty.Fd()) + if err := eventLoop.Add(ptyFD, EventRead|EventHup, "pty"); err != nil { + log.Printf("[ERROR] PTY.runEventDriven: Failed to add PTY to event loop: %v", err) + return fmt.Errorf("failed to add PTY to event loop: %w", err) + } + + // Open stdin pipe + stdinPipe, err := os.OpenFile(p.session.StdinPath(), os.O_RDONLY|syscall.O_NONBLOCK, 0) + if err != nil { + log.Printf("[ERROR] PTY.runEventDriven: Failed to open stdin pipe: %v", err) + return fmt.Errorf("failed to open stdin pipe: %w", err) + } + defer stdinPipe.Close() + + // Add stdin pipe to event loop + stdinFD := int(stdinPipe.Fd()) + if err := eventLoop.Add(stdinFD, EventRead, "stdin"); err != nil { + log.Printf("[ERROR] PTY.runEventDriven: Failed to add stdin to event loop: %v", err) + return fmt.Errorf("failed to add stdin to event loop: %w", err) + } + + // Track process exit + exitCh := make(chan error, 1) + go func() { + waitErr := p.cmd.Wait() + + if waitErr != nil { + if exitError, ok := waitErr.(*exec.ExitError); ok { + if ws, ok := exitError.Sys().(syscall.WaitStatus); ok { + exitCode := ws.ExitStatus() + p.session.info.ExitCode = &exitCode + } + } + } else { + exitCode := 0 + p.session.info.ExitCode = &exitCode + } + + p.session.UpdateStatus() + + // Close the stream writer to finalize the recording + if err := p.streamWriter.Close(); err != nil { + log.Printf("[ERROR] PTY.runEventDriven: Failed to close stream writer: %v", err) + } + + eventLoop.Stop() + exitCh <- waitErr + }() + + // Buffers for I/O + ptyBuf := make([]byte, 4096) + stdinBuf := make([]byte, 1024) + + debugLog("[DEBUG] PTY.runEventDriven: Starting event loop") + + // Run the event loop + err = eventLoop.Run(func(event Event) { + switch event.Data.(string) { + case "pty": + if event.Events&EventRead != 0 { + // Read all available data + for { + n, err := syscall.Read(event.FD, ptyBuf) + if n > 0 { + if err := p.streamWriter.WriteOutput(ptyBuf[:n]); err != nil { + log.Printf("[ERROR] PTY.runEventDriven: Failed to write output: %v", err) + } + } + + if err != nil { + if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK { + // No more data available + break + } + if err != io.EOF { + log.Printf("[ERROR] PTY.runEventDriven: PTY read error: %v", err) + } + eventLoop.Stop() + break + } + + // If we read less than buffer size, no more data + if n < len(ptyBuf) { + break + } + } + } + + if event.Events&EventHup != 0 { + debugLog("[DEBUG] PTY.runEventDriven: PTY closed (HUP)") + eventLoop.Stop() + } + + case "stdin": + if event.Events&EventRead != 0 { + // Read from stdin pipe + n, err := syscall.Read(event.FD, stdinBuf) + if n > 0 { + if _, err := p.pty.Write(stdinBuf[:n]); err != nil { + log.Printf("[ERROR] PTY.runEventDriven: Failed to write to PTY: %v", err) + } + + if err := p.streamWriter.WriteInput(stdinBuf[:n]); err != nil { + log.Printf("[ERROR] PTY.runEventDriven: Failed to write input to stream: %v", err) + } + } + + if err != nil && err != syscall.EAGAIN && err != syscall.EWOULDBLOCK { + if err != io.EOF { + log.Printf("[ERROR] PTY.runEventDriven: Stdin read error: %v", err) + } + eventLoop.Remove(event.FD) + } + } + } + }) + + if err != nil { + log.Printf("[ERROR] PTY.runEventDriven: Event loop error: %v", err) + } + + // Wait for process exit + result := <-exitCh + + debugLog("[DEBUG] PTY.runEventDriven: Completed with result: %v", result) + return result +} + func (p *PTY) Run() error { defer func() { if err := p.Close(); err != nil { @@ -352,8 +500,13 @@ func (p *PTY) Run() error { } }() - // Use select-based polling if available - if useSelectPolling { + // Use event-driven I/O if available + if useEventDrivenIO { + return p.runEventDriven() + } + + // Use select-based polling as fallback + if runtime.GOOS == "linux" || runtime.GOOS == "darwin" { return p.pollWithSelect() }