mirror of
https://github.com/samsonjs/vibetunnel.git
synced 2026-06-29 05:39:31 +00:00
native OS event APIs (epoll/kqueue) for true event-driven I/O
This commit is contained in:
parent
e32a127d30
commit
fd7a874ee5
5 changed files with 1034 additions and 5 deletions
193
linux/pkg/session/eventloop.go
Normal file
193
linux/pkg/session/eventloop.go
Normal file
|
|
@ -0,0 +1,193 @@
|
|||
package session
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// EventType represents the type of event
|
||||
type EventType uint32
|
||||
|
||||
const (
|
||||
EventRead EventType = 1 << 0
|
||||
EventWrite EventType = 1 << 1
|
||||
EventError EventType = 1 << 2
|
||||
EventHup EventType = 1 << 3
|
||||
)
|
||||
|
||||
// Event represents an I/O event
|
||||
type Event struct {
|
||||
FD int
|
||||
Events EventType
|
||||
Data interface{} // User data associated with the FD
|
||||
}
|
||||
|
||||
// EventHandler is called when an event occurs
|
||||
type EventHandler func(event Event)
|
||||
|
||||
// EventLoop provides platform-independent event-driven I/O
|
||||
type EventLoop interface {
|
||||
// Add registers a file descriptor for event monitoring
|
||||
Add(fd int, events EventType, data interface{}) error
|
||||
|
||||
// Remove unregisters a file descriptor
|
||||
Remove(fd int) error
|
||||
|
||||
// Modify changes the events to monitor for a file descriptor
|
||||
Modify(fd int, events EventType) error
|
||||
|
||||
// Run starts the event loop, blocking until Stop is called
|
||||
Run(handler EventHandler) error
|
||||
|
||||
// RunOnce processes events once with optional timeout (-1 for blocking)
|
||||
RunOnce(handler EventHandler, timeoutMs int) error
|
||||
|
||||
// Stop terminates the event loop
|
||||
Stop() error
|
||||
|
||||
// Close releases all resources
|
||||
Close() error
|
||||
}
|
||||
|
||||
// NewEventLoop creates a platform-specific event loop
|
||||
func NewEventLoop() (EventLoop, error) {
|
||||
return newPlatformEventLoop()
|
||||
}
|
||||
|
||||
// PTYEventHandler handles PTY I/O events using the event loop
|
||||
type PTYEventHandler struct {
|
||||
pty *PTY
|
||||
eventLoop EventLoop
|
||||
outputBuffer []byte
|
||||
handlers map[int]func(Event)
|
||||
}
|
||||
|
||||
// NewPTYEventHandler creates a new event-driven PTY handler
|
||||
func NewPTYEventHandler(pty *PTY) (*PTYEventHandler, error) {
|
||||
eventLoop, err := NewEventLoop()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create event loop: %w", err)
|
||||
}
|
||||
|
||||
handler := &PTYEventHandler{
|
||||
pty: pty,
|
||||
eventLoop: eventLoop,
|
||||
outputBuffer: make([]byte, 4096),
|
||||
handlers: make(map[int]func(Event)),
|
||||
}
|
||||
|
||||
// Register PTY for read events
|
||||
ptyFD := int(pty.pty.Fd())
|
||||
if err := eventLoop.Add(ptyFD, EventRead|EventHup, "pty"); err != nil {
|
||||
eventLoop.Close()
|
||||
return nil, fmt.Errorf("failed to add PTY to event loop: %w", err)
|
||||
}
|
||||
|
||||
// Set up handlers
|
||||
handler.handlers[ptyFD] = handler.handlePTYEvent
|
||||
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
// Run starts the event-driven I/O loop
|
||||
func (h *PTYEventHandler) Run() error {
|
||||
return h.eventLoop.Run(func(event Event) {
|
||||
if handler, ok := h.handlers[event.FD]; ok {
|
||||
handler(event)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// handlePTYEvent processes PTY read events
|
||||
func (h *PTYEventHandler) handlePTYEvent(event Event) {
|
||||
if event.Events&EventRead != 0 {
|
||||
// Data available for reading
|
||||
for {
|
||||
n, err := h.pty.pty.Read(h.outputBuffer)
|
||||
if n > 0 {
|
||||
// Write to stream immediately
|
||||
if err := h.pty.streamWriter.WriteOutput(h.outputBuffer[:n]); err != nil {
|
||||
debugLog("[ERROR] Failed to write PTY output: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err == io.EOF || err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
|
||||
// No more data available
|
||||
break
|
||||
}
|
||||
// Real error
|
||||
debugLog("[ERROR] PTY read error: %v", err)
|
||||
h.eventLoop.Stop()
|
||||
break
|
||||
}
|
||||
|
||||
// Continue reading if we filled the buffer
|
||||
if n < len(h.outputBuffer) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if event.Events&EventHup != 0 {
|
||||
// PTY closed
|
||||
debugLog("[DEBUG] PTY closed (HUP event)")
|
||||
h.eventLoop.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// AddStdinPipe adds stdin pipe monitoring to the event loop
|
||||
func (h *PTYEventHandler) AddStdinPipe(stdinPipe *os.File) error {
|
||||
stdinFD := int(stdinPipe.Fd())
|
||||
|
||||
// Set non-blocking mode
|
||||
if err := syscall.SetNonblock(stdinFD, true); err != nil {
|
||||
return fmt.Errorf("failed to set stdin non-blocking: %w", err)
|
||||
}
|
||||
|
||||
// Add to event loop
|
||||
if err := h.eventLoop.Add(stdinFD, EventRead, "stdin"); err != nil {
|
||||
return fmt.Errorf("failed to add stdin to event loop: %w", err)
|
||||
}
|
||||
|
||||
// Set up handler
|
||||
h.handlers[stdinFD] = h.handleStdinEvent
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleStdinEvent processes stdin input events
|
||||
func (h *PTYEventHandler) handleStdinEvent(event Event) {
|
||||
if event.Events&EventRead != 0 {
|
||||
buf := make([]byte, 1024)
|
||||
n, err := syscall.Read(event.FD, buf)
|
||||
if n > 0 {
|
||||
// Write to PTY
|
||||
if _, err := h.pty.pty.Write(buf[:n]); err != nil {
|
||||
debugLog("[ERROR] Failed to write to PTY: %v", err)
|
||||
}
|
||||
|
||||
// Also write to asciinema stream
|
||||
if err := h.pty.streamWriter.WriteInput(buf[:n]); err != nil {
|
||||
debugLog("[ERROR] Failed to write input to stream: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil && err != syscall.EAGAIN && err != syscall.EWOULDBLOCK {
|
||||
debugLog("[ERROR] Stdin read error: %v", err)
|
||||
h.eventLoop.Remove(event.FD)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the event loop
|
||||
func (h *PTYEventHandler) Stop() error {
|
||||
return h.eventLoop.Stop()
|
||||
}
|
||||
|
||||
// Close cleans up resources
|
||||
func (h *PTYEventHandler) Close() error {
|
||||
return h.eventLoop.Close()
|
||||
}
|
||||
269
linux/pkg/session/eventloop_darwin.go
Normal file
269
linux/pkg/session/eventloop_darwin.go
Normal file
|
|
@ -0,0 +1,269 @@
|
|||
//go:build darwin || freebsd || openbsd || netbsd
|
||||
// +build darwin freebsd openbsd netbsd
|
||||
|
||||
package session
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// kqueueEventLoop implements EventLoop using kqueue (macOS/BSD)
|
||||
type kqueueEventLoop struct {
|
||||
kq int
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
stopChan chan struct{}
|
||||
fdData map[int]interface{}
|
||||
}
|
||||
|
||||
func newPlatformEventLoop() (EventLoop, error) {
|
||||
kq, err := unix.Kqueue()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create kqueue: %w", err)
|
||||
}
|
||||
|
||||
return &kqueueEventLoop{
|
||||
kq: kq,
|
||||
stopChan: make(chan struct{}),
|
||||
fdData: make(map[int]interface{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *kqueueEventLoop) Add(fd int, events EventType, data interface{}) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
e.fdData[fd] = data
|
||||
|
||||
var kevents []unix.Kevent_t
|
||||
|
||||
if events&EventRead != 0 {
|
||||
kevents = append(kevents, unix.Kevent_t{
|
||||
Ident: uint64(fd),
|
||||
Filter: unix.EVFILT_READ,
|
||||
Flags: unix.EV_ADD | unix.EV_ENABLE,
|
||||
})
|
||||
}
|
||||
|
||||
if events&EventWrite != 0 {
|
||||
kevents = append(kevents, unix.Kevent_t{
|
||||
Ident: uint64(fd),
|
||||
Filter: unix.EVFILT_WRITE,
|
||||
Flags: unix.EV_ADD | unix.EV_ENABLE,
|
||||
})
|
||||
}
|
||||
|
||||
if len(kevents) > 0 {
|
||||
_, err := unix.Kevent(e.kq, kevents, nil, nil)
|
||||
if err != nil {
|
||||
delete(e.fdData, fd)
|
||||
return fmt.Errorf("failed to add fd %d to kqueue: %w", fd, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Set non-blocking mode
|
||||
if err := unix.SetNonblock(fd, true); err != nil {
|
||||
// Not fatal, but log it
|
||||
debugLog("[WARN] Failed to set non-blocking mode on fd %d: %v", fd, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *kqueueEventLoop) Remove(fd int) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
delete(e.fdData, fd)
|
||||
|
||||
// Remove both read and write filters
|
||||
kevents := []unix.Kevent_t{
|
||||
{
|
||||
Ident: uint64(fd),
|
||||
Filter: unix.EVFILT_READ,
|
||||
Flags: unix.EV_DELETE,
|
||||
},
|
||||
{
|
||||
Ident: uint64(fd),
|
||||
Filter: unix.EVFILT_WRITE,
|
||||
Flags: unix.EV_DELETE,
|
||||
},
|
||||
}
|
||||
|
||||
_, err := unix.Kevent(e.kq, kevents, nil, nil)
|
||||
if err != nil && err != syscall.ENOENT {
|
||||
return fmt.Errorf("failed to remove fd %d from kqueue: %w", fd, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *kqueueEventLoop) Modify(fd int, events EventType) error {
|
||||
// For kqueue, we need to remove and re-add
|
||||
if err := e.Remove(fd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.mu.Lock()
|
||||
data := e.fdData[fd]
|
||||
e.mu.Unlock()
|
||||
|
||||
return e.Add(fd, events, data)
|
||||
}
|
||||
|
||||
func (e *kqueueEventLoop) Run(handler EventHandler) error {
|
||||
e.mu.Lock()
|
||||
if e.running {
|
||||
e.mu.Unlock()
|
||||
return fmt.Errorf("event loop already running")
|
||||
}
|
||||
e.running = true
|
||||
e.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
e.mu.Lock()
|
||||
e.running = false
|
||||
e.mu.Unlock()
|
||||
}()
|
||||
|
||||
events := make([]unix.Kevent_t, 128)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-e.stopChan:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Wait for events with 100ms timeout to check for stop
|
||||
n, err := unix.Kevent(e.kq, nil, events, &unix.Timespec{
|
||||
Sec: 0,
|
||||
Nsec: 100 * 1000 * 1000, // 100ms
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if err == unix.EINTR {
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("kevent wait failed: %w", err)
|
||||
}
|
||||
|
||||
// Process events
|
||||
for i := 0; i < n; i++ {
|
||||
event := &events[i]
|
||||
fd := int(event.Ident)
|
||||
|
||||
e.mu.Lock()
|
||||
data := e.fdData[fd]
|
||||
e.mu.Unlock()
|
||||
|
||||
var eventType EventType
|
||||
|
||||
// Convert kqueue events to our EventType
|
||||
if event.Filter == unix.EVFILT_READ {
|
||||
eventType |= EventRead
|
||||
}
|
||||
if event.Filter == unix.EVFILT_WRITE {
|
||||
eventType |= EventWrite
|
||||
}
|
||||
if event.Flags&unix.EV_EOF != 0 {
|
||||
eventType |= EventHup
|
||||
}
|
||||
if event.Flags&unix.EV_ERROR != 0 {
|
||||
eventType |= EventError
|
||||
}
|
||||
|
||||
handler(Event{
|
||||
FD: fd,
|
||||
Events: eventType,
|
||||
Data: data,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *kqueueEventLoop) RunOnce(handler EventHandler, timeoutMs int) error {
|
||||
events := make([]unix.Kevent_t, 128)
|
||||
|
||||
var timeout *unix.Timespec
|
||||
if timeoutMs >= 0 {
|
||||
timeout = &unix.Timespec{
|
||||
Sec: int64(timeoutMs / 1000),
|
||||
Nsec: int64((timeoutMs % 1000) * 1000 * 1000),
|
||||
}
|
||||
}
|
||||
|
||||
n, err := unix.Kevent(e.kq, nil, events, timeout)
|
||||
if err != nil {
|
||||
if err == unix.EINTR {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("kevent wait failed: %w", err)
|
||||
}
|
||||
|
||||
// Process events
|
||||
for i := 0; i < n; i++ {
|
||||
event := &events[i]
|
||||
fd := int(event.Ident)
|
||||
|
||||
e.mu.Lock()
|
||||
data := e.fdData[fd]
|
||||
e.mu.Unlock()
|
||||
|
||||
var eventType EventType
|
||||
|
||||
if event.Filter == unix.EVFILT_READ {
|
||||
eventType |= EventRead
|
||||
}
|
||||
if event.Filter == unix.EVFILT_WRITE {
|
||||
eventType |= EventWrite
|
||||
}
|
||||
if event.Flags&unix.EV_EOF != 0 {
|
||||
eventType |= EventHup
|
||||
}
|
||||
if event.Flags&unix.EV_ERROR != 0 {
|
||||
eventType |= EventError
|
||||
}
|
||||
|
||||
handler(Event{
|
||||
FD: fd,
|
||||
Events: eventType,
|
||||
Data: data,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *kqueueEventLoop) Stop() error {
|
||||
close(e.stopChan)
|
||||
|
||||
// Create a new stop channel for future runs
|
||||
e.stopChan = make(chan struct{})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *kqueueEventLoop) Close() error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if e.running {
|
||||
e.Stop()
|
||||
// Give it a moment to stop
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if e.kq >= 0 {
|
||||
err := unix.Close(e.kq)
|
||||
e.kq = -1
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
266
linux/pkg/session/eventloop_linux.go
Normal file
266
linux/pkg/session/eventloop_linux.go
Normal file
|
|
@ -0,0 +1,266 @@
|
|||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package session
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// epollEventLoop implements EventLoop using epoll (Linux)
|
||||
type epollEventLoop struct {
|
||||
epfd int
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
stopChan chan struct{}
|
||||
fdData map[int]interface{}
|
||||
}
|
||||
|
||||
func newPlatformEventLoop() (EventLoop, error) {
|
||||
epfd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create epoll: %w", err)
|
||||
}
|
||||
|
||||
return &epollEventLoop{
|
||||
epfd: epfd,
|
||||
stopChan: make(chan struct{}),
|
||||
fdData: make(map[int]interface{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *epollEventLoop) Add(fd int, events EventType, data interface{}) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
e.fdData[fd] = data
|
||||
|
||||
var epollEvents uint32
|
||||
if events&EventRead != 0 {
|
||||
epollEvents |= unix.EPOLLIN | unix.EPOLLPRI
|
||||
}
|
||||
if events&EventWrite != 0 {
|
||||
epollEvents |= unix.EPOLLOUT
|
||||
}
|
||||
if events&EventError != 0 {
|
||||
epollEvents |= unix.EPOLLERR
|
||||
}
|
||||
if events&EventHup != 0 {
|
||||
epollEvents |= unix.EPOLLHUP | unix.EPOLLRDHUP
|
||||
}
|
||||
|
||||
// Use edge-triggered mode for better performance
|
||||
epollEvents |= unix.EPOLLET
|
||||
|
||||
event := unix.EpollEvent{
|
||||
Events: epollEvents,
|
||||
Fd: int32(fd),
|
||||
}
|
||||
|
||||
if err := unix.EpollCtl(e.epfd, unix.EPOLL_CTL_ADD, fd, &event); err != nil {
|
||||
delete(e.fdData, fd)
|
||||
return fmt.Errorf("failed to add fd %d to epoll: %w", fd, err)
|
||||
}
|
||||
|
||||
// Set non-blocking mode
|
||||
if err := unix.SetNonblock(fd, true); err != nil {
|
||||
// Not fatal, but log it
|
||||
debugLog("[WARN] Failed to set non-blocking mode on fd %d: %v", fd, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *epollEventLoop) Remove(fd int) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
delete(e.fdData, fd)
|
||||
|
||||
if err := unix.EpollCtl(e.epfd, unix.EPOLL_CTL_DEL, fd, nil); err != nil {
|
||||
if err != syscall.ENOENT {
|
||||
return fmt.Errorf("failed to remove fd %d from epoll: %w", fd, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *epollEventLoop) Modify(fd int, events EventType) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
var epollEvents uint32
|
||||
if events&EventRead != 0 {
|
||||
epollEvents |= unix.EPOLLIN | unix.EPOLLPRI
|
||||
}
|
||||
if events&EventWrite != 0 {
|
||||
epollEvents |= unix.EPOLLOUT
|
||||
}
|
||||
if events&EventError != 0 {
|
||||
epollEvents |= unix.EPOLLERR
|
||||
}
|
||||
if events&EventHup != 0 {
|
||||
epollEvents |= unix.EPOLLHUP | unix.EPOLLRDHUP
|
||||
}
|
||||
|
||||
// Use edge-triggered mode
|
||||
epollEvents |= unix.EPOLLET
|
||||
|
||||
event := unix.EpollEvent{
|
||||
Events: epollEvents,
|
||||
Fd: int32(fd),
|
||||
}
|
||||
|
||||
if err := unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &event); err != nil {
|
||||
return fmt.Errorf("failed to modify fd %d in epoll: %w", fd, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *epollEventLoop) Run(handler EventHandler) error {
|
||||
e.mu.Lock()
|
||||
if e.running {
|
||||
e.mu.Unlock()
|
||||
return fmt.Errorf("event loop already running")
|
||||
}
|
||||
e.running = true
|
||||
e.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
e.mu.Lock()
|
||||
e.running = false
|
||||
e.mu.Unlock()
|
||||
}()
|
||||
|
||||
events := make([]unix.EpollEvent, 128)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-e.stopChan:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Wait for events with 100ms timeout to check for stop
|
||||
n, err := unix.EpollWait(e.epfd, events, 100)
|
||||
|
||||
if err != nil {
|
||||
if err == unix.EINTR {
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("epoll wait failed: %w", err)
|
||||
}
|
||||
|
||||
// Process events
|
||||
for i := 0; i < n; i++ {
|
||||
event := &events[i]
|
||||
fd := int(event.Fd)
|
||||
|
||||
e.mu.Lock()
|
||||
data := e.fdData[fd]
|
||||
e.mu.Unlock()
|
||||
|
||||
var eventType EventType
|
||||
|
||||
// Convert epoll events to our EventType
|
||||
if event.Events&(unix.EPOLLIN|unix.EPOLLPRI) != 0 {
|
||||
eventType |= EventRead
|
||||
}
|
||||
if event.Events&unix.EPOLLOUT != 0 {
|
||||
eventType |= EventWrite
|
||||
}
|
||||
if event.Events&(unix.EPOLLHUP|unix.EPOLLRDHUP) != 0 {
|
||||
eventType |= EventHup
|
||||
}
|
||||
if event.Events&unix.EPOLLERR != 0 {
|
||||
eventType |= EventError
|
||||
}
|
||||
|
||||
handler(Event{
|
||||
FD: fd,
|
||||
Events: eventType,
|
||||
Data: data,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *epollEventLoop) RunOnce(handler EventHandler, timeoutMs int) error {
|
||||
events := make([]unix.EpollEvent, 128)
|
||||
|
||||
n, err := unix.EpollWait(e.epfd, events, timeoutMs)
|
||||
if err != nil {
|
||||
if err == unix.EINTR {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("epoll wait failed: %w", err)
|
||||
}
|
||||
|
||||
// Process events
|
||||
for i := 0; i < n; i++ {
|
||||
event := &events[i]
|
||||
fd := int(event.Fd)
|
||||
|
||||
e.mu.Lock()
|
||||
data := e.fdData[fd]
|
||||
e.mu.Unlock()
|
||||
|
||||
var eventType EventType
|
||||
|
||||
if event.Events&(unix.EPOLLIN|unix.EPOLLPRI) != 0 {
|
||||
eventType |= EventRead
|
||||
}
|
||||
if event.Events&unix.EPOLLOUT != 0 {
|
||||
eventType |= EventWrite
|
||||
}
|
||||
if event.Events&(unix.EPOLLHUP|unix.EPOLLRDHUP) != 0 {
|
||||
eventType |= EventHup
|
||||
}
|
||||
if event.Events&unix.EPOLLERR != 0 {
|
||||
eventType |= EventError
|
||||
}
|
||||
|
||||
handler(Event{
|
||||
FD: fd,
|
||||
Events: eventType,
|
||||
Data: data,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *epollEventLoop) Stop() error {
|
||||
close(e.stopChan)
|
||||
|
||||
// Create a new stop channel for future runs
|
||||
e.stopChan = make(chan struct{})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *epollEventLoop) Close() error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if e.running {
|
||||
e.Stop()
|
||||
// Give it a moment to stop
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if e.epfd >= 0 {
|
||||
err := unix.Close(e.epfd)
|
||||
e.epfd = -1
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
148
linux/pkg/session/eventloop_other.go
Normal file
148
linux/pkg/session/eventloop_other.go
Normal file
|
|
@ -0,0 +1,148 @@
|
|||
//go:build !linux && !darwin && !freebsd && !openbsd && !netbsd
|
||||
// +build !linux,!darwin,!freebsd,!openbsd,!netbsd
|
||||
|
||||
package session
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// selectEventLoop implements EventLoop using select() as a fallback
|
||||
type selectEventLoop struct {
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
stopChan chan struct{}
|
||||
fds map[int]*fdInfo
|
||||
}
|
||||
|
||||
type fdInfo struct {
|
||||
fd int
|
||||
events EventType
|
||||
data interface{}
|
||||
}
|
||||
|
||||
func newPlatformEventLoop() (EventLoop, error) {
|
||||
return &selectEventLoop{
|
||||
stopChan: make(chan struct{}),
|
||||
fds: make(map[int]*fdInfo),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *selectEventLoop) Add(fd int, events EventType, data interface{}) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
e.fds[fd] = &fdInfo{
|
||||
fd: fd,
|
||||
events: events,
|
||||
data: data,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *selectEventLoop) Remove(fd int) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
delete(e.fds, fd)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *selectEventLoop) Modify(fd int, events EventType) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if info, ok := e.fds[fd]; ok {
|
||||
info.events = events
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *selectEventLoop) Run(handler EventHandler) error {
|
||||
e.mu.Lock()
|
||||
if e.running {
|
||||
e.mu.Unlock()
|
||||
return fmt.Errorf("event loop already running")
|
||||
}
|
||||
e.running = true
|
||||
e.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
e.mu.Lock()
|
||||
e.running = false
|
||||
e.mu.Unlock()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-e.stopChan:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Use existing select-based polling as fallback
|
||||
// This is not as efficient as epoll/kqueue but works everywhere
|
||||
if err := e.RunOnce(handler, 10); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *selectEventLoop) RunOnce(handler EventHandler, timeoutMs int) error {
|
||||
e.mu.Lock()
|
||||
fdList := make([]int, 0, len(e.fds))
|
||||
for fd := range e.fds {
|
||||
fdList = append(fdList, fd)
|
||||
}
|
||||
e.mu.Unlock()
|
||||
|
||||
if len(fdList) == 0 {
|
||||
time.Sleep(time.Duration(timeoutMs) * time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Use the existing selectRead function
|
||||
ready, err := selectRead(fdList, time.Duration(timeoutMs)*time.Millisecond)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Process ready file descriptors
|
||||
for _, fd := range ready {
|
||||
e.mu.Lock()
|
||||
info, ok := e.fds[fd]
|
||||
e.mu.Unlock()
|
||||
|
||||
if ok {
|
||||
handler(Event{
|
||||
FD: fd,
|
||||
Events: EventRead, // select only supports read events
|
||||
Data: info.data,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *selectEventLoop) Stop() error {
|
||||
close(e.stopChan)
|
||||
e.stopChan = make(chan struct{})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *selectEventLoop) Close() error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if e.running {
|
||||
e.Stop()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
|
@ -18,9 +19,9 @@ import (
|
|||
"golang.org/x/term"
|
||||
)
|
||||
|
||||
// useSelectPolling determines whether to use select-based polling
|
||||
// Enable this for better control FIFO integration
|
||||
const useSelectPolling = true
|
||||
// useEventDrivenIO determines whether to use native event-driven I/O
|
||||
// This uses epoll on Linux and kqueue on macOS for zero-latency I/O
|
||||
const useEventDrivenIO = true
|
||||
|
||||
// isShellBuiltin checks if a command is a shell builtin
|
||||
func isShellBuiltin(cmd string) bool {
|
||||
|
|
@ -278,6 +279,153 @@ func (p *PTY) Pid() int {
|
|||
return 0
|
||||
}
|
||||
|
||||
// runEventDriven runs the PTY using native event-driven I/O (epoll/kqueue)
|
||||
func (p *PTY) runEventDriven() error {
|
||||
debugLog("[DEBUG] PTY.runEventDriven: Starting event-driven I/O for session %s", p.session.ID[:8])
|
||||
|
||||
// Create event loop
|
||||
eventLoop, err := NewEventLoop()
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Failed to create event loop: %v", err)
|
||||
// Fall back to polling
|
||||
return p.pollWithSelect()
|
||||
}
|
||||
defer eventLoop.Close()
|
||||
|
||||
// Set PTY to non-blocking mode
|
||||
if err := unix.SetNonblock(int(p.pty.Fd()), true); err != nil {
|
||||
log.Printf("[WARN] PTY.runEventDriven: Failed to set PTY non-blocking: %v", err)
|
||||
}
|
||||
|
||||
// Add PTY to event loop for reading
|
||||
ptyFD := int(p.pty.Fd())
|
||||
if err := eventLoop.Add(ptyFD, EventRead|EventHup, "pty"); err != nil {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Failed to add PTY to event loop: %v", err)
|
||||
return fmt.Errorf("failed to add PTY to event loop: %w", err)
|
||||
}
|
||||
|
||||
// Open stdin pipe
|
||||
stdinPipe, err := os.OpenFile(p.session.StdinPath(), os.O_RDONLY|syscall.O_NONBLOCK, 0)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Failed to open stdin pipe: %v", err)
|
||||
return fmt.Errorf("failed to open stdin pipe: %w", err)
|
||||
}
|
||||
defer stdinPipe.Close()
|
||||
|
||||
// Add stdin pipe to event loop
|
||||
stdinFD := int(stdinPipe.Fd())
|
||||
if err := eventLoop.Add(stdinFD, EventRead, "stdin"); err != nil {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Failed to add stdin to event loop: %v", err)
|
||||
return fmt.Errorf("failed to add stdin to event loop: %w", err)
|
||||
}
|
||||
|
||||
// Track process exit
|
||||
exitCh := make(chan error, 1)
|
||||
go func() {
|
||||
waitErr := p.cmd.Wait()
|
||||
|
||||
if waitErr != nil {
|
||||
if exitError, ok := waitErr.(*exec.ExitError); ok {
|
||||
if ws, ok := exitError.Sys().(syscall.WaitStatus); ok {
|
||||
exitCode := ws.ExitStatus()
|
||||
p.session.info.ExitCode = &exitCode
|
||||
}
|
||||
}
|
||||
} else {
|
||||
exitCode := 0
|
||||
p.session.info.ExitCode = &exitCode
|
||||
}
|
||||
|
||||
p.session.UpdateStatus()
|
||||
|
||||
// Close the stream writer to finalize the recording
|
||||
if err := p.streamWriter.Close(); err != nil {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Failed to close stream writer: %v", err)
|
||||
}
|
||||
|
||||
eventLoop.Stop()
|
||||
exitCh <- waitErr
|
||||
}()
|
||||
|
||||
// Buffers for I/O
|
||||
ptyBuf := make([]byte, 4096)
|
||||
stdinBuf := make([]byte, 1024)
|
||||
|
||||
debugLog("[DEBUG] PTY.runEventDriven: Starting event loop")
|
||||
|
||||
// Run the event loop
|
||||
err = eventLoop.Run(func(event Event) {
|
||||
switch event.Data.(string) {
|
||||
case "pty":
|
||||
if event.Events&EventRead != 0 {
|
||||
// Read all available data
|
||||
for {
|
||||
n, err := syscall.Read(event.FD, ptyBuf)
|
||||
if n > 0 {
|
||||
if err := p.streamWriter.WriteOutput(ptyBuf[:n]); err != nil {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Failed to write output: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
|
||||
// No more data available
|
||||
break
|
||||
}
|
||||
if err != io.EOF {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: PTY read error: %v", err)
|
||||
}
|
||||
eventLoop.Stop()
|
||||
break
|
||||
}
|
||||
|
||||
// If we read less than buffer size, no more data
|
||||
if n < len(ptyBuf) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if event.Events&EventHup != 0 {
|
||||
debugLog("[DEBUG] PTY.runEventDriven: PTY closed (HUP)")
|
||||
eventLoop.Stop()
|
||||
}
|
||||
|
||||
case "stdin":
|
||||
if event.Events&EventRead != 0 {
|
||||
// Read from stdin pipe
|
||||
n, err := syscall.Read(event.FD, stdinBuf)
|
||||
if n > 0 {
|
||||
if _, err := p.pty.Write(stdinBuf[:n]); err != nil {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Failed to write to PTY: %v", err)
|
||||
}
|
||||
|
||||
if err := p.streamWriter.WriteInput(stdinBuf[:n]); err != nil {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Failed to write input to stream: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil && err != syscall.EAGAIN && err != syscall.EWOULDBLOCK {
|
||||
if err != io.EOF {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Stdin read error: %v", err)
|
||||
}
|
||||
eventLoop.Remove(event.FD)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] PTY.runEventDriven: Event loop error: %v", err)
|
||||
}
|
||||
|
||||
// Wait for process exit
|
||||
result := <-exitCh
|
||||
|
||||
debugLog("[DEBUG] PTY.runEventDriven: Completed with result: %v", result)
|
||||
return result
|
||||
}
|
||||
|
||||
func (p *PTY) Run() error {
|
||||
defer func() {
|
||||
if err := p.Close(); err != nil {
|
||||
|
|
@ -352,8 +500,13 @@ func (p *PTY) Run() error {
|
|||
}
|
||||
}()
|
||||
|
||||
// Use select-based polling if available
|
||||
if useSelectPolling {
|
||||
// Use event-driven I/O if available
|
||||
if useEventDrivenIO {
|
||||
return p.runEventDriven()
|
||||
}
|
||||
|
||||
// Use select-based polling as fallback
|
||||
if runtime.GOOS == "linux" || runtime.GOOS == "darwin" {
|
||||
return p.pollWithSelect()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue