diff --git a/linux/pkg/terminal/buffer_test.go b/linux/pkg/terminal/buffer_test.go new file mode 100644 index 00000000..15bfb19e --- /dev/null +++ b/linux/pkg/terminal/buffer_test.go @@ -0,0 +1,118 @@ +package terminal + +import ( + "testing" +) + +func TestTerminalBuffer(t *testing.T) { + // Create a 80x24 terminal buffer + buffer := NewTerminalBuffer(80, 24) + + // Test writing simple text + text := "Hello, World!" + n, err := buffer.Write([]byte(text)) + if err != nil { + t.Fatalf("Failed to write to buffer: %v", err) + } + if n != len(text) { + t.Errorf("Expected to write %d bytes, wrote %d", len(text), n) + } + + // Get snapshot + snapshot := buffer.GetSnapshot() + if snapshot.Cols != 80 || snapshot.Rows != 24 { + t.Errorf("Unexpected dimensions: %dx%d", snapshot.Cols, snapshot.Rows) + } + + // Check that text was written + firstLine := snapshot.Cells[0] + for i, ch := range text { + if i >= len(firstLine) { + break + } + if firstLine[i].Char != ch { + t.Errorf("Expected char %c at position %d, got %c", ch, i, firstLine[i].Char) + } + } + + // Test cursor movement + buffer.Write([]byte("\r\n")) + snapshot = buffer.GetSnapshot() + if snapshot.CursorY != 1 || snapshot.CursorX != 0 { + t.Errorf("Expected cursor at (0,1), got (%d,%d)", snapshot.CursorX, snapshot.CursorY) + } + + // Test ANSI escape sequences + buffer.Write([]byte("\x1b[2J")) // Clear screen + snapshot = buffer.GetSnapshot() + + // All cells should be spaces + for y := 0; y < snapshot.Rows; y++ { + for x := 0; x < snapshot.Cols; x++ { + if snapshot.Cells[y][x].Char != ' ' { + t.Errorf("Expected space at (%d,%d), got %c", x, y, snapshot.Cells[y][x].Char) + } + } + } + + // Test resize + buffer.Resize(120, 30) + snapshot = buffer.GetSnapshot() + if snapshot.Cols != 120 || snapshot.Rows != 30 { + t.Errorf("Resize failed: expected 120x30, got %dx%d", snapshot.Cols, snapshot.Rows) + } +} + +func TestAnsiParser(t *testing.T) { + parser := NewAnsiParser() + + var printedChars []rune + var executedBytes []byte + var csiCalls []string + + parser.OnPrint = func(r rune) { + printedChars = append(printedChars, r) + } + + parser.OnExecute = func(b byte) { + executedBytes = append(executedBytes, b) + } + + parser.OnCsi = func(params []int, intermediate []byte, final byte) { + csiCalls = append(csiCalls, string(final)) + } + + // Test simple text + parser.Parse([]byte("Hello")) + if string(printedChars) != "Hello" { + t.Errorf("Expected 'Hello', got '%s'", string(printedChars)) + } + + // Test control characters + printedChars = nil + parser.Parse([]byte("\r\n")) + if len(executedBytes) != 2 || executedBytes[0] != '\r' || executedBytes[1] != '\n' { + t.Errorf("Control characters not properly executed") + } + + // Test CSI sequence + parser.Parse([]byte("\x1b[2J")) + if len(csiCalls) != 1 || csiCalls[0] != "J" { + t.Errorf("CSI sequence not properly parsed") + } +} + +func TestBufferSerialization(t *testing.T) { + buffer := NewTerminalBuffer(2, 2) + buffer.Write([]byte("AB\r\nCD")) + + snapshot := buffer.GetSnapshot() + data := snapshot.SerializeToBinary() + + // Binary format should contain: + // - 5 uint32s for dimensions (20 bytes) + // - 4 cells with char data and attributes + if len(data) < 20 { + t.Errorf("Serialized data too short: %d bytes", len(data)) + } +} \ No newline at end of file diff --git a/linux/pkg/termsocket/manager.go b/linux/pkg/termsocket/manager.go index f2a1a193..6f5b513c 100644 --- a/linux/pkg/termsocket/manager.go +++ b/linux/pkg/termsocket/manager.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/fsnotify/fsnotify" "github.com/vibetunnel/linux/pkg/session" "github.com/vibetunnel/linux/pkg/terminal" ) @@ -28,6 +29,8 @@ type Manager struct { mu sync.RWMutex subscribers map[string][]chan *terminal.BufferSnapshot subMu sync.RWMutex + shutdownCh chan struct{} + wg sync.WaitGroup } // NewManager creates a new terminal socket manager @@ -36,6 +39,7 @@ func NewManager(sessionManager *session.Manager) *Manager { sessionManager: sessionManager, buffers: make(map[string]*SessionBuffer), subscribers: make(map[string][]chan *terminal.BufferSnapshot), + shutdownCh: make(chan struct{}), } } @@ -69,7 +73,11 @@ func (m *Manager) GetOrCreateBuffer(sessionID string) (*SessionBuffer, error) { m.buffers[sessionID] = sb // Start monitoring the session's output - go m.monitorSession(sessionID, sb) + m.wg.Add(1) + go func() { + defer m.wg.Done() + m.monitorSession(sessionID, sb) + }() return sb, nil } @@ -140,33 +148,144 @@ func (m *Manager) SubscribeToBufferChanges(sessionID string, callback func(strin // monitorSession monitors a session's output and updates the terminal buffer func (m *Manager) monitorSession(sessionID string, sb *SessionBuffer) { - // This is a simplified version - in a real implementation, we would: - // 1. Set up a file watcher on the session's stream-out file - // 2. Parse new asciinema events as they arrive - // 3. Feed the output data to the terminal buffer - // 4. Notify subscribers of buffer changes + streamPath := sb.Session.StreamOutPath() + lastPos := int64(0) - // For now, we'll implement a basic polling approach + // Try to use file watching + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Printf("Failed to create file watcher, using polling: %v", err) + m.monitorSessionPolling(sessionID, sb) + return + } + defer watcher.Close() + + // Wait for stream file to exist + for i := 0; i < 50; i++ { // Wait up to 5 seconds + if _, err := os.Stat(streamPath); err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + // Add file to watcher + if err := watcher.Add(streamPath); err != nil { + log.Printf("Failed to watch file %s, using polling: %v", streamPath, err) + m.monitorSessionPolling(sessionID, sb) + return + } + + // Read initial content + if update, newPos, err := readStreamContent(streamPath, lastPos); err == nil && update != nil { + if len(update.OutputData) > 0 || update.Resize != nil { + sb.mu.Lock() + if len(update.OutputData) > 0 { + sb.Buffer.Write(update.OutputData) + } + if update.Resize != nil { + sb.Buffer.Resize(update.Resize.Width, update.Resize.Height) + } + snapshot := sb.Buffer.GetSnapshot() + sb.mu.Unlock() + m.notifySubscribers(sessionID, snapshot) + lastPos = newPos + } + } + + // Monitor for changes + sessionCheckTicker := time.NewTicker(5 * time.Second) + defer sessionCheckTicker.Stop() + + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + + if event.Op&fsnotify.Write == fsnotify.Write { + // Read new content + update, newPos, err := readStreamContent(streamPath, lastPos) + if err != nil { + log.Printf("Error reading stream content: %v", err) + continue + } + + if update != nil && (len(update.OutputData) > 0 || update.Resize != nil) { + // Update buffer + sb.mu.Lock() + if len(update.OutputData) > 0 { + sb.Buffer.Write(update.OutputData) + } + if update.Resize != nil { + sb.Buffer.Resize(update.Resize.Width, update.Resize.Height) + } + snapshot := sb.Buffer.GetSnapshot() + sb.mu.Unlock() + + // Notify subscribers + m.notifySubscribers(sessionID, snapshot) + } + + lastPos = newPos + } + + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Printf("File watcher error: %v", err) + + case <-sessionCheckTicker.C: + // Check if session is still alive + if !sb.Session.IsAlive() { + // Clean up when session ends + m.mu.Lock() + delete(m.buffers, sessionID) + m.mu.Unlock() + return + } + + case <-m.shutdownCh: + // Manager is shutting down + return + } + } +} + +// monitorSessionPolling is a fallback for when file watching isn't available +func (m *Manager) monitorSessionPolling(sessionID string, sb *SessionBuffer) { streamPath := sb.Session.StreamOutPath() lastPos := int64(0) for { + select { + case <-m.shutdownCh: + // Manager is shutting down + return + default: + } + // Check if session is still alive if !sb.Session.IsAlive() { break } // Read new content from stream file - data, newPos, err := readStreamContent(streamPath, lastPos) - if err != nil { + update, newPos, err := readStreamContent(streamPath, lastPos) + if err != nil && !os.IsNotExist(err) { log.Printf("Error reading stream content: %v", err) - continue } - if len(data) > 0 { + if update != nil && (len(update.OutputData) > 0 || update.Resize != nil) { // Update buffer sb.mu.Lock() - sb.Buffer.Write(data) + if len(update.OutputData) > 0 { + sb.Buffer.Write(update.OutputData) + } + if update.Resize != nil { + sb.Buffer.Resize(update.Resize.Width, update.Resize.Height) + } snapshot := sb.Buffer.GetSnapshot() sb.mu.Unlock() @@ -177,8 +296,7 @@ func (m *Manager) monitorSession(sessionID string, sb *SessionBuffer) { lastPos = newPos // Small delay to prevent busy waiting - // In production, use file watching instead - <-time.After(50 * time.Millisecond) + time.Sleep(50 * time.Millisecond) } // Clean up when session ends @@ -202,8 +320,20 @@ func (m *Manager) notifySubscribers(sessionID string, snapshot *terminal.BufferS } } +// StreamUpdate represents an update from the stream file +type StreamUpdate struct { + OutputData []byte + Resize *ResizeEvent +} + +// ResizeEvent represents a terminal resize +type ResizeEvent struct { + Width int + Height int +} + // readStreamContent reads new content from an asciinema stream file -func readStreamContent(path string, lastPos int64) ([]byte, int64, error) { +func readStreamContent(path string, lastPos int64) (*StreamUpdate, int64, error) { file, err := os.Open(path) if err != nil { return nil, lastPos, err @@ -235,7 +365,9 @@ func readStreamContent(path string, lastPos int64) ([]byte, int64, error) { } // Parse asciinema events and extract output data - outputData := []byte{} + update := &StreamUpdate{ + OutputData: []byte{}, + } decoder := json.NewDecoder(bytes.NewReader(newContent[:n])) // Skip header if at beginning of file @@ -264,12 +396,52 @@ func readStreamContent(path string, lastPos int64) ([]byte, int64, error) { if eventType == "o" { // Output event data, ok := event[2].(string) if ok { - outputData = append(outputData, []byte(data)...) + update.OutputData = append(update.OutputData, []byte(data)...) + } + } else if eventType == "r" { // Resize event + // Resize events have format: [timestamp, "r", "WIDTHxHEIGHT"] + data, ok := event[2].(string) + if ok { + // Parse "WIDTHxHEIGHT" format + var width, height int + if _, err := fmt.Sscanf(data, "%dx%d", &width, &height); err == nil { + update.Resize = &ResizeEvent{ + Width: width, + Height: height, + } + } } } - // TODO: Handle resize events ("r" type) } } - return outputData, lastPos + int64(n), nil + return update, lastPos + int64(n), nil +} + +// Shutdown gracefully shuts down the manager +func (m *Manager) Shutdown() { + log.Println("Shutting down terminal buffer manager...") + + // Signal shutdown + close(m.shutdownCh) + + // Wait for all monitors to finish + m.wg.Wait() + + // Close all subscriber channels + m.subMu.Lock() + for _, subs := range m.subscribers { + for _, ch := range subs { + close(ch) + } + } + m.subscribers = make(map[string][]chan *terminal.BufferSnapshot) + m.subMu.Unlock() + + // Clear buffers + m.mu.Lock() + m.buffers = make(map[string]*SessionBuffer) + m.mu.Unlock() + + log.Println("Terminal buffer manager shutdown complete") }