add monitor

This commit is contained in:
Peter Steinberger 2025-06-20 17:45:30 +02:00
parent d702d1c390
commit 4e55c98f10
2 changed files with 309 additions and 19 deletions

View file

@ -0,0 +1,118 @@
package terminal
import (
"testing"
)
func TestTerminalBuffer(t *testing.T) {
// Create a 80x24 terminal buffer
buffer := NewTerminalBuffer(80, 24)
// Test writing simple text
text := "Hello, World!"
n, err := buffer.Write([]byte(text))
if err != nil {
t.Fatalf("Failed to write to buffer: %v", err)
}
if n != len(text) {
t.Errorf("Expected to write %d bytes, wrote %d", len(text), n)
}
// Get snapshot
snapshot := buffer.GetSnapshot()
if snapshot.Cols != 80 || snapshot.Rows != 24 {
t.Errorf("Unexpected dimensions: %dx%d", snapshot.Cols, snapshot.Rows)
}
// Check that text was written
firstLine := snapshot.Cells[0]
for i, ch := range text {
if i >= len(firstLine) {
break
}
if firstLine[i].Char != ch {
t.Errorf("Expected char %c at position %d, got %c", ch, i, firstLine[i].Char)
}
}
// Test cursor movement
buffer.Write([]byte("\r\n"))
snapshot = buffer.GetSnapshot()
if snapshot.CursorY != 1 || snapshot.CursorX != 0 {
t.Errorf("Expected cursor at (0,1), got (%d,%d)", snapshot.CursorX, snapshot.CursorY)
}
// Test ANSI escape sequences
buffer.Write([]byte("\x1b[2J")) // Clear screen
snapshot = buffer.GetSnapshot()
// All cells should be spaces
for y := 0; y < snapshot.Rows; y++ {
for x := 0; x < snapshot.Cols; x++ {
if snapshot.Cells[y][x].Char != ' ' {
t.Errorf("Expected space at (%d,%d), got %c", x, y, snapshot.Cells[y][x].Char)
}
}
}
// Test resize
buffer.Resize(120, 30)
snapshot = buffer.GetSnapshot()
if snapshot.Cols != 120 || snapshot.Rows != 30 {
t.Errorf("Resize failed: expected 120x30, got %dx%d", snapshot.Cols, snapshot.Rows)
}
}
func TestAnsiParser(t *testing.T) {
parser := NewAnsiParser()
var printedChars []rune
var executedBytes []byte
var csiCalls []string
parser.OnPrint = func(r rune) {
printedChars = append(printedChars, r)
}
parser.OnExecute = func(b byte) {
executedBytes = append(executedBytes, b)
}
parser.OnCsi = func(params []int, intermediate []byte, final byte) {
csiCalls = append(csiCalls, string(final))
}
// Test simple text
parser.Parse([]byte("Hello"))
if string(printedChars) != "Hello" {
t.Errorf("Expected 'Hello', got '%s'", string(printedChars))
}
// Test control characters
printedChars = nil
parser.Parse([]byte("\r\n"))
if len(executedBytes) != 2 || executedBytes[0] != '\r' || executedBytes[1] != '\n' {
t.Errorf("Control characters not properly executed")
}
// Test CSI sequence
parser.Parse([]byte("\x1b[2J"))
if len(csiCalls) != 1 || csiCalls[0] != "J" {
t.Errorf("CSI sequence not properly parsed")
}
}
func TestBufferSerialization(t *testing.T) {
buffer := NewTerminalBuffer(2, 2)
buffer.Write([]byte("AB\r\nCD"))
snapshot := buffer.GetSnapshot()
data := snapshot.SerializeToBinary()
// Binary format should contain:
// - 5 uint32s for dimensions (20 bytes)
// - 4 cells with char data and attributes
if len(data) < 20 {
t.Errorf("Serialized data too short: %d bytes", len(data))
}
}

View file

@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/vibetunnel/linux/pkg/session"
"github.com/vibetunnel/linux/pkg/terminal"
)
@ -28,6 +29,8 @@ type Manager struct {
mu sync.RWMutex
subscribers map[string][]chan *terminal.BufferSnapshot
subMu sync.RWMutex
shutdownCh chan struct{}
wg sync.WaitGroup
}
// NewManager creates a new terminal socket manager
@ -36,6 +39,7 @@ func NewManager(sessionManager *session.Manager) *Manager {
sessionManager: sessionManager,
buffers: make(map[string]*SessionBuffer),
subscribers: make(map[string][]chan *terminal.BufferSnapshot),
shutdownCh: make(chan struct{}),
}
}
@ -69,7 +73,11 @@ func (m *Manager) GetOrCreateBuffer(sessionID string) (*SessionBuffer, error) {
m.buffers[sessionID] = sb
// Start monitoring the session's output
go m.monitorSession(sessionID, sb)
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.monitorSession(sessionID, sb)
}()
return sb, nil
}
@ -140,33 +148,144 @@ func (m *Manager) SubscribeToBufferChanges(sessionID string, callback func(strin
// monitorSession monitors a session's output and updates the terminal buffer
func (m *Manager) monitorSession(sessionID string, sb *SessionBuffer) {
// This is a simplified version - in a real implementation, we would:
// 1. Set up a file watcher on the session's stream-out file
// 2. Parse new asciinema events as they arrive
// 3. Feed the output data to the terminal buffer
// 4. Notify subscribers of buffer changes
streamPath := sb.Session.StreamOutPath()
lastPos := int64(0)
// For now, we'll implement a basic polling approach
// Try to use file watching
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("Failed to create file watcher, using polling: %v", err)
m.monitorSessionPolling(sessionID, sb)
return
}
defer watcher.Close()
// Wait for stream file to exist
for i := 0; i < 50; i++ { // Wait up to 5 seconds
if _, err := os.Stat(streamPath); err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
// Add file to watcher
if err := watcher.Add(streamPath); err != nil {
log.Printf("Failed to watch file %s, using polling: %v", streamPath, err)
m.monitorSessionPolling(sessionID, sb)
return
}
// Read initial content
if update, newPos, err := readStreamContent(streamPath, lastPos); err == nil && update != nil {
if len(update.OutputData) > 0 || update.Resize != nil {
sb.mu.Lock()
if len(update.OutputData) > 0 {
sb.Buffer.Write(update.OutputData)
}
if update.Resize != nil {
sb.Buffer.Resize(update.Resize.Width, update.Resize.Height)
}
snapshot := sb.Buffer.GetSnapshot()
sb.mu.Unlock()
m.notifySubscribers(sessionID, snapshot)
lastPos = newPos
}
}
// Monitor for changes
sessionCheckTicker := time.NewTicker(5 * time.Second)
defer sessionCheckTicker.Stop()
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
// Read new content
update, newPos, err := readStreamContent(streamPath, lastPos)
if err != nil {
log.Printf("Error reading stream content: %v", err)
continue
}
if update != nil && (len(update.OutputData) > 0 || update.Resize != nil) {
// Update buffer
sb.mu.Lock()
if len(update.OutputData) > 0 {
sb.Buffer.Write(update.OutputData)
}
if update.Resize != nil {
sb.Buffer.Resize(update.Resize.Width, update.Resize.Height)
}
snapshot := sb.Buffer.GetSnapshot()
sb.mu.Unlock()
// Notify subscribers
m.notifySubscribers(sessionID, snapshot)
}
lastPos = newPos
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("File watcher error: %v", err)
case <-sessionCheckTicker.C:
// Check if session is still alive
if !sb.Session.IsAlive() {
// Clean up when session ends
m.mu.Lock()
delete(m.buffers, sessionID)
m.mu.Unlock()
return
}
case <-m.shutdownCh:
// Manager is shutting down
return
}
}
}
// monitorSessionPolling is a fallback for when file watching isn't available
func (m *Manager) monitorSessionPolling(sessionID string, sb *SessionBuffer) {
streamPath := sb.Session.StreamOutPath()
lastPos := int64(0)
for {
select {
case <-m.shutdownCh:
// Manager is shutting down
return
default:
}
// Check if session is still alive
if !sb.Session.IsAlive() {
break
}
// Read new content from stream file
data, newPos, err := readStreamContent(streamPath, lastPos)
if err != nil {
update, newPos, err := readStreamContent(streamPath, lastPos)
if err != nil && !os.IsNotExist(err) {
log.Printf("Error reading stream content: %v", err)
continue
}
if len(data) > 0 {
if update != nil && (len(update.OutputData) > 0 || update.Resize != nil) {
// Update buffer
sb.mu.Lock()
sb.Buffer.Write(data)
if len(update.OutputData) > 0 {
sb.Buffer.Write(update.OutputData)
}
if update.Resize != nil {
sb.Buffer.Resize(update.Resize.Width, update.Resize.Height)
}
snapshot := sb.Buffer.GetSnapshot()
sb.mu.Unlock()
@ -177,8 +296,7 @@ func (m *Manager) monitorSession(sessionID string, sb *SessionBuffer) {
lastPos = newPos
// Small delay to prevent busy waiting
// In production, use file watching instead
<-time.After(50 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
}
// Clean up when session ends
@ -202,8 +320,20 @@ func (m *Manager) notifySubscribers(sessionID string, snapshot *terminal.BufferS
}
}
// StreamUpdate represents an update from the stream file
type StreamUpdate struct {
OutputData []byte
Resize *ResizeEvent
}
// ResizeEvent represents a terminal resize
type ResizeEvent struct {
Width int
Height int
}
// readStreamContent reads new content from an asciinema stream file
func readStreamContent(path string, lastPos int64) ([]byte, int64, error) {
func readStreamContent(path string, lastPos int64) (*StreamUpdate, int64, error) {
file, err := os.Open(path)
if err != nil {
return nil, lastPos, err
@ -235,7 +365,9 @@ func readStreamContent(path string, lastPos int64) ([]byte, int64, error) {
}
// Parse asciinema events and extract output data
outputData := []byte{}
update := &StreamUpdate{
OutputData: []byte{},
}
decoder := json.NewDecoder(bytes.NewReader(newContent[:n]))
// Skip header if at beginning of file
@ -264,12 +396,52 @@ func readStreamContent(path string, lastPos int64) ([]byte, int64, error) {
if eventType == "o" { // Output event
data, ok := event[2].(string)
if ok {
outputData = append(outputData, []byte(data)...)
update.OutputData = append(update.OutputData, []byte(data)...)
}
} else if eventType == "r" { // Resize event
// Resize events have format: [timestamp, "r", "WIDTHxHEIGHT"]
data, ok := event[2].(string)
if ok {
// Parse "WIDTHxHEIGHT" format
var width, height int
if _, err := fmt.Sscanf(data, "%dx%d", &width, &height); err == nil {
update.Resize = &ResizeEvent{
Width: width,
Height: height,
}
}
}
}
// TODO: Handle resize events ("r" type)
}
}
return outputData, lastPos + int64(n), nil
return update, lastPos + int64(n), nil
}
// Shutdown gracefully shuts down the manager
func (m *Manager) Shutdown() {
log.Println("Shutting down terminal buffer manager...")
// Signal shutdown
close(m.shutdownCh)
// Wait for all monitors to finish
m.wg.Wait()
// Close all subscriber channels
m.subMu.Lock()
for _, subs := range m.subscribers {
for _, ch := range subs {
close(ch)
}
}
m.subscribers = make(map[string][]chan *terminal.BufferSnapshot)
m.subMu.Unlock()
// Clear buffers
m.mu.Lock()
m.buffers = make(map[string]*SessionBuffer)
m.mu.Unlock()
log.Println("Terminal buffer manager shutdown complete")
}