vibetunnel/linux/pkg/termsocket/manager.go
2025-06-20 18:54:06 +02:00

447 lines
10 KiB
Go

package termsocket
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/vibetunnel/linux/pkg/session"
"github.com/vibetunnel/linux/pkg/terminal"
)
// SessionBuffer holds both the session and its terminal buffer
type SessionBuffer struct {
Session *session.Session
Buffer *terminal.TerminalBuffer
mu sync.RWMutex
}
// Manager manages terminal buffers for sessions
type Manager struct {
sessionManager *session.Manager
buffers map[string]*SessionBuffer
mu sync.RWMutex
subscribers map[string][]chan *terminal.BufferSnapshot
subMu sync.RWMutex
shutdownCh chan struct{}
wg sync.WaitGroup
}
// NewManager creates a new terminal socket manager
func NewManager(sessionManager *session.Manager) *Manager {
return &Manager{
sessionManager: sessionManager,
buffers: make(map[string]*SessionBuffer),
subscribers: make(map[string][]chan *terminal.BufferSnapshot),
shutdownCh: make(chan struct{}),
}
}
// GetOrCreateBuffer gets or creates a terminal buffer for a session
func (m *Manager) GetOrCreateBuffer(sessionID string) (*SessionBuffer, error) {
m.mu.Lock()
defer m.mu.Unlock()
// Check if buffer already exists
if sb, exists := m.buffers[sessionID]; exists {
return sb, nil
}
// Get session from session manager
sess, err := m.sessionManager.GetSession(sessionID)
if err != nil {
return nil, fmt.Errorf("session not found: %w", err)
}
// Get session info to determine terminal size
info := sess.GetInfo()
// Create terminal buffer
buffer := terminal.NewTerminalBuffer(info.Width, info.Height)
sb := &SessionBuffer{
Session: sess,
Buffer: buffer,
}
m.buffers[sessionID] = sb
// Start monitoring the session's output
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.monitorSession(sessionID, sb)
}()
return sb, nil
}
// GetBufferSnapshot gets the current buffer snapshot for a session
func (m *Manager) GetBufferSnapshot(sessionID string) (*terminal.BufferSnapshot, error) {
sb, err := m.GetOrCreateBuffer(sessionID)
if err != nil {
return nil, err
}
sb.mu.RLock()
defer sb.mu.RUnlock()
return sb.Buffer.GetSnapshot(), nil
}
// SubscribeToBufferChanges subscribes to buffer changes for a session
func (m *Manager) SubscribeToBufferChanges(sessionID string, callback func(string, *terminal.BufferSnapshot)) (func(), error) {
// Ensure buffer exists
_, err := m.GetOrCreateBuffer(sessionID)
if err != nil {
return nil, err
}
// Create subscription channel
ch := make(chan *terminal.BufferSnapshot, 10)
m.subMu.Lock()
m.subscribers[sessionID] = append(m.subscribers[sessionID], ch)
m.subMu.Unlock()
// Start goroutine to handle callbacks
done := make(chan struct{})
go func() {
for {
select {
case snapshot := <-ch:
callback(sessionID, snapshot)
case <-done:
return
}
}
}()
// Return unsubscribe function
return func() {
close(done)
m.subMu.Lock()
defer m.subMu.Unlock()
// Remove channel from subscribers
subs := m.subscribers[sessionID]
for i, sub := range subs {
if sub == ch {
m.subscribers[sessionID] = append(subs[:i], subs[i+1:]...)
close(ch)
break
}
}
// Clean up if no more subscribers
if len(m.subscribers[sessionID]) == 0 {
delete(m.subscribers, sessionID)
}
}, nil
}
// monitorSession monitors a session's output and updates the terminal buffer
func (m *Manager) monitorSession(sessionID string, sb *SessionBuffer) {
streamPath := sb.Session.StreamOutPath()
lastPos := int64(0)
// Try to use file watching
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("Failed to create file watcher, using polling: %v", err)
m.monitorSessionPolling(sessionID, sb)
return
}
defer watcher.Close()
// Wait for stream file to exist
for i := 0; i < 50; i++ { // Wait up to 5 seconds
if _, err := os.Stat(streamPath); err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
// Add file to watcher
if err := watcher.Add(streamPath); err != nil {
log.Printf("Failed to watch file %s, using polling: %v", streamPath, err)
m.monitorSessionPolling(sessionID, sb)
return
}
// Read initial content
if update, newPos, err := readStreamContent(streamPath, lastPos); err == nil && update != nil {
if len(update.OutputData) > 0 || update.Resize != nil {
sb.mu.Lock()
if len(update.OutputData) > 0 {
sb.Buffer.Write(update.OutputData)
}
if update.Resize != nil {
sb.Buffer.Resize(update.Resize.Width, update.Resize.Height)
}
snapshot := sb.Buffer.GetSnapshot()
sb.mu.Unlock()
m.notifySubscribers(sessionID, snapshot)
lastPos = newPos
}
}
// Monitor for changes
sessionCheckTicker := time.NewTicker(5 * time.Second)
defer sessionCheckTicker.Stop()
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
// Read new content
update, newPos, err := readStreamContent(streamPath, lastPos)
if err != nil {
log.Printf("Error reading stream content: %v", err)
continue
}
if update != nil && (len(update.OutputData) > 0 || update.Resize != nil) {
// Update buffer
sb.mu.Lock()
if len(update.OutputData) > 0 {
sb.Buffer.Write(update.OutputData)
}
if update.Resize != nil {
sb.Buffer.Resize(update.Resize.Width, update.Resize.Height)
}
snapshot := sb.Buffer.GetSnapshot()
sb.mu.Unlock()
// Notify subscribers
m.notifySubscribers(sessionID, snapshot)
}
lastPos = newPos
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("File watcher error: %v", err)
case <-sessionCheckTicker.C:
// Check if session is still alive
if !sb.Session.IsAlive() {
// Clean up when session ends
m.mu.Lock()
delete(m.buffers, sessionID)
m.mu.Unlock()
return
}
case <-m.shutdownCh:
// Manager is shutting down
return
}
}
}
// monitorSessionPolling is a fallback for when file watching isn't available
func (m *Manager) monitorSessionPolling(sessionID string, sb *SessionBuffer) {
streamPath := sb.Session.StreamOutPath()
lastPos := int64(0)
for {
select {
case <-m.shutdownCh:
// Manager is shutting down
return
default:
}
// Check if session is still alive
if !sb.Session.IsAlive() {
break
}
// Read new content from stream file
update, newPos, err := readStreamContent(streamPath, lastPos)
if err != nil && !os.IsNotExist(err) {
log.Printf("Error reading stream content: %v", err)
}
if update != nil && (len(update.OutputData) > 0 || update.Resize != nil) {
// Update buffer
sb.mu.Lock()
if len(update.OutputData) > 0 {
sb.Buffer.Write(update.OutputData)
}
if update.Resize != nil {
sb.Buffer.Resize(update.Resize.Width, update.Resize.Height)
}
snapshot := sb.Buffer.GetSnapshot()
sb.mu.Unlock()
// Notify subscribers
m.notifySubscribers(sessionID, snapshot)
}
lastPos = newPos
// Small delay to prevent busy waiting
time.Sleep(50 * time.Millisecond)
}
// Clean up when session ends
m.mu.Lock()
delete(m.buffers, sessionID)
m.mu.Unlock()
}
// notifySubscribers sends buffer updates to all subscribers
func (m *Manager) notifySubscribers(sessionID string, snapshot *terminal.BufferSnapshot) {
m.subMu.RLock()
subs := m.subscribers[sessionID]
m.subMu.RUnlock()
for _, ch := range subs {
select {
case ch <- snapshot:
default:
// Channel full, skip
}
}
}
// StreamUpdate represents an update from the stream file
type StreamUpdate struct {
OutputData []byte
Resize *ResizeEvent
}
// ResizeEvent represents a terminal resize
type ResizeEvent struct {
Width int
Height int
}
// readStreamContent reads new content from an asciinema stream file
func readStreamContent(path string, lastPos int64) (*StreamUpdate, int64, error) {
file, err := os.Open(path)
if err != nil {
return nil, lastPos, err
}
defer file.Close()
// Get current file size
stat, err := file.Stat()
if err != nil {
return nil, lastPos, err
}
currentSize := stat.Size()
if currentSize <= lastPos {
// No new content
return nil, lastPos, nil
}
// Seek to last position
if _, err := file.Seek(lastPos, 0); err != nil {
return nil, lastPos, err
}
// Read new content
newContent := make([]byte, currentSize-lastPos)
n, err := file.Read(newContent)
if err != nil && err != io.EOF {
return nil, lastPos, err
}
// Parse asciinema events and extract output data
update := &StreamUpdate{
OutputData: []byte{},
}
decoder := json.NewDecoder(bytes.NewReader(newContent[:n]))
// Skip header if at beginning of file
if lastPos == 0 {
var header map[string]interface{}
if err := decoder.Decode(&header); err == nil {
// Successfully decoded header, continue
}
}
// Parse events
for decoder.More() {
var event []interface{}
if err := decoder.Decode(&event); err != nil {
// Incomplete event, return what we have so far
break
}
// Asciinema format: [timestamp, event_type, data]
if len(event) >= 3 {
eventType, ok := event[1].(string)
if !ok {
continue
}
if eventType == "o" { // Output event
data, ok := event[2].(string)
if ok {
update.OutputData = append(update.OutputData, []byte(data)...)
}
} else if eventType == "r" { // Resize event
// Resize events have format: [timestamp, "r", "WIDTHxHEIGHT"]
data, ok := event[2].(string)
if ok {
// Parse "WIDTHxHEIGHT" format
var width, height int
if _, err := fmt.Sscanf(data, "%dx%d", &width, &height); err == nil {
update.Resize = &ResizeEvent{
Width: width,
Height: height,
}
}
}
}
}
}
return update, lastPos + int64(n), nil
}
// Shutdown gracefully shuts down the manager
func (m *Manager) Shutdown() {
log.Println("Shutting down terminal buffer manager...")
// Signal shutdown
close(m.shutdownCh)
// Wait for all monitors to finish
m.wg.Wait()
// Close all subscriber channels
m.subMu.Lock()
for _, subs := range m.subscribers {
for _, ch := range subs {
close(ch)
}
}
m.subscribers = make(map[string][]chan *terminal.BufferSnapshot)
m.subMu.Unlock()
// Clear buffers
m.mu.Lock()
m.buffers = make(map[string]*SessionBuffer)
m.mu.Unlock()
log.Println("Terminal buffer manager shutdown complete")
}