Tweaks from the rust implementation

This commit is contained in:
Peter Steinberger 2025-06-20 14:19:56 +02:00
parent f4a21d8459
commit d0e92ea932
4 changed files with 78 additions and 39 deletions

View file

@ -4,10 +4,10 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"sync"
"time"
@ -288,40 +288,20 @@ func (h *BufferWebSocketHandler) processAndSendContent(sessionID, streamPath str
return
}
// Read new content
newContentSize := currentSize - *seenBytes
newContent := make([]byte, newContentSize)
bytesRead, err := file.Read(newContent)
if err != nil {
return
}
// Create a reader for the remaining content
reader := io.LimitReader(file, currentSize-*seenBytes)
decoder := json.NewDecoder(reader)
// Update seen bytes to current position
*seenBytes = currentSize
// Process content line by line
content := string(newContent[:bytesRead])
lines := strings.Split(content, "\n")
// Handle incomplete last line
endIndex := len(lines)
if !strings.HasSuffix(content, "\n") && len(lines) > 0 {
incompleteLineBytes := int64(len(lines[len(lines)-1]))
*seenBytes -= incompleteLineBytes
endIndex = len(lines) - 1
}
// Process complete lines
for i := 0; i < endIndex; i++ {
line := lines[i]
if line == "" {
continue
}
// Try to parse as header first
// Process JSON objects as a stream
for {
// First, try to decode the header if not sent
if !*headerSent {
var header protocol.AsciinemaHeader
if err := json.Unmarshal([]byte(line), &header); err == nil && header.Version > 0 {
pos := decoder.InputOffset()
if err := decoder.Decode(&header); err == nil && header.Version > 0 {
*headerSent = true
// Send header as binary message
headerData, _ := json.Marshal(map[string]interface{}{
@ -334,12 +314,31 @@ func (h *BufferWebSocketHandler) processAndSendContent(sessionID, streamPath str
return
}
continue
} else {
// Reset decoder position if header decode failed
file.Seek(*seenBytes-currentSize+pos, 1)
decoder = json.NewDecoder(io.LimitReader(file, currentSize-*seenBytes-pos))
}
}
// Try to parse as event array [timestamp, type, data]
// Try to decode as event array [timestamp, type, data]
var eventArray []interface{}
if err := json.Unmarshal([]byte(line), &eventArray); err == nil && len(eventArray) == 3 {
if err := decoder.Decode(&eventArray); err != nil {
if err == io.EOF {
// Update seenBytes to actual position read
actualRead, _ := file.Seek(0, 1)
*seenBytes = actualRead
return
}
// If JSON decode fails, we might have incomplete data
// Reset to last known good position
actualRead, _ := file.Seek(0, 1)
*seenBytes = actualRead
return
}
// Process the event
if len(eventArray) == 3 {
timestamp, ok1 := eventArray[0].(float64)
eventType, ok2 := eventArray[1].(string)
data, ok3 := eventArray[2].(string)

View file

@ -146,8 +146,8 @@ func (w *StreamWriter) scheduleFlush() {
w.flushTimer.Stop()
}
// Set up new timer for 5ms flush delay
w.flushTimer = time.AfterFunc(5*time.Millisecond, func() {
// Set up new timer for 1ms flush delay for better real-time performance
w.flushTimer = time.AfterFunc(1*time.Millisecond, func() {
w.mutex.Lock()
defer w.mutex.Unlock()
@ -187,8 +187,8 @@ func (w *StreamWriter) scheduleBatchSync() {
w.syncTimer.Stop()
}
// Schedule sync after 5ms to batch multiple writes
w.syncTimer = time.AfterFunc(5*time.Millisecond, func() {
// Schedule sync after 1ms for better real-time performance
w.syncTimer = time.AfterFunc(1*time.Millisecond, func() {
if w.needsSync {
if file, ok := w.writer.(*os.File); ok {
if err := file.Sync(); err != nil {

View file

@ -124,6 +124,11 @@ func NewPTY(session *Session) (*PTY, error) {
return nil, fmt.Errorf("failed to set PTY size: %w", err)
}
// Configure terminal modes for proper interactive shell behavior
// The creack/pty library handles basic setup, but we ensure the terminal
// is in the correct mode for interactive use (not raw mode)
debugLog("[DEBUG] NewPTY: Terminal configured for interactive mode")
streamOut, err := os.Create(session.StreamOutPath())
if err != nil {
log.Printf("[ERROR] NewPTY: Failed to create stream-out: %v", err)
@ -218,6 +223,41 @@ func (p *PTY) Run() error {
debugLog("[DEBUG] PTY.Run: Stdin pipe opened successfully")
// Set up SIGWINCH handling for terminal resize
winchCh := make(chan os.Signal, 1)
signal.Notify(winchCh, syscall.SIGWINCH)
defer signal.Stop(winchCh)
// Handle SIGWINCH in a separate goroutine
go func() {
for range winchCh {
// Get current terminal size if we're attached to a terminal
if term.IsTerminal(int(os.Stdin.Fd())) {
width, height, err := term.GetSize(int(os.Stdin.Fd()))
if err == nil {
debugLog("[DEBUG] PTY.Run: Received SIGWINCH, resizing to %dx%d", width, height)
if err := pty.Setsize(p.pty, &pty.Winsize{
Rows: uint16(height),
Cols: uint16(width),
}); err != nil {
log.Printf("[ERROR] PTY.Run: Failed to resize PTY: %v", err)
} else {
// Update session info
p.session.mu.Lock()
p.session.info.Width = width
p.session.info.Height = height
p.session.mu.Unlock()
// Write resize event to stream
if err := p.streamWriter.WriteResize(uint32(width), uint32(height)); err != nil {
log.Printf("[ERROR] PTY.Run: Failed to write resize event: %v", err)
}
}
}
}
}
}()
// Use select-based polling if available
if useSelectPolling {
return p.pollWithSelect()

View file

@ -98,8 +98,8 @@ func (p *PTY) pollWithSelect() error {
fds = append(fds, controlFd)
}
// Wait for activity with 1s timeout to reduce CPU usage
ready, err := selectRead(fds, 1*time.Second)
// Wait for activity with 100ms timeout for better responsiveness
ready, err := selectRead(fds, 100*time.Millisecond)
if err != nil {
log.Printf("[ERROR] select error: %v", err)
return err