Make server control a state machine

This commit is contained in:
Peter Steinberger 2025-06-22 11:02:36 +02:00
parent bb462c8826
commit ba315226c2
3 changed files with 370 additions and 102 deletions

View file

@ -1,6 +1,15 @@
import Foundation
import OSLog
/// Server state enumeration
enum ServerState {
case idle
case starting
case running
case stopping
case crashed
}
/// Bun vibetunnel server implementation.
///
/// Manages the Bun-based vibetunnel server as a subprocess. This implementation
@ -18,11 +27,19 @@ final class BunServer {
private var stderrPipe: Pipe?
private var outputTask: Task<Void, Never>?
private var errorTask: Task<Void, Never>?
/// Server state machine - thread-safe through MainActor
private var state: ServerState = .idle
/// Resource cleanup tracking
private var isCleaningUp = false
private let logger = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "BunServer")
private let serverOutput = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "ServerOutput")
var isRunning = false
var isRunning: Bool {
state == .running
}
var port: String = ""
@ -37,10 +54,24 @@ final class BunServer {
// MARK: - Public Methods
func start() async throws {
guard !isRunning else {
logger.warning("Bun server already running")
// Update state atomically using MainActor
let currentState = state
if currentState == .running || currentState == .starting {
logger.warning("Bun server already running or starting")
return
}
if currentState == .stopping {
logger.warning("Cannot start server while stopping")
throw BunServerError.invalidState
}
state = .starting
defer {
// Ensure we reset state on error
if state == .starting {
state = .idle
}
}
guard !port.isEmpty else {
let error = BunServerError.invalidPort
@ -88,7 +119,7 @@ final class BunServer {
// Set working directory to Resources/web directory where public folder is located
let webPath = URL(fileURLWithPath: resourcesPath).appendingPathComponent("web").path
process.currentDirectoryURL = URL(fileURLWithPath: webPath)
logger.info("Working directory: \(webPath)")
logger.info("Process working directory: \(webPath)")
// Static files are always at Resources/web/public
let staticPath = URL(fileURLWithPath: resourcesPath).appendingPathComponent("web/public").path
@ -119,7 +150,7 @@ final class BunServer {
process.arguments = ["-l", "-c", vibetunnelCommand]
logger.info("Executing command: /bin/zsh -l -c \"\(vibetunnelCommand)\"")
logger.info("Working directory: \(resourcesPath)")
logger.info("Binary location: \(resourcesPath)")
// Set up environment - login shell will load the rest
let environment = ProcessInfo.processInfo.environment
@ -142,9 +173,6 @@ final class BunServer {
// Start the process (this just launches it and returns immediately)
try await process.runAsync()
// Mark server as running
isRunning = true
logger.info("Bun server process started")
// Give the process a moment to start before checking for early failures
@ -152,9 +180,14 @@ final class BunServer {
// Check if process exited immediately (indicating failure)
if !process.isRunning {
isRunning = false
let exitCode = process.terminationStatus
logger.error("Process exited immediately with code: \(exitCode)")
// Special handling for exit code 9 (port in use)
if exitCode == 9 {
logger.error("Process exited immediately: Port \(self.port) is already in use (exit code: 9)")
} else {
logger.error("Process exited immediately with code: \(exitCode)")
}
// Try to read any error output
var errorDetails = "Exit code: \(exitCode)"
@ -169,6 +202,9 @@ final class BunServer {
throw BunServerError.processFailedToStart
}
// Mark server as running only after successful start
state = .running
logger.info("Bun server process started successfully")
// Monitor process termination
@ -176,8 +212,6 @@ final class BunServer {
await monitorProcessTermination()
}
} catch {
isRunning = false
// Log more detailed error information
let errorMessage: String
if let bunError = error as? BunServerError {
@ -197,8 +231,32 @@ final class BunServer {
}
func stop() async {
guard let process, isRunning else {
logger.warning("Bun server not running")
// Update state atomically using MainActor
switch state {
case .running, .crashed:
break // Continue with stop
default:
logger.warning("Bun server not running (state: \(String(describing: self.state)))")
return
}
// Prevent concurrent cleanup
if isCleaningUp {
logger.warning("Already cleaning up server")
return
}
state = .stopping
isCleaningUp = true
defer {
state = .idle
isCleaningUp = false
}
guard let process else {
logger.warning("No process to stop")
await performCleanup()
return
}
@ -207,6 +265,17 @@ final class BunServer {
// Cancel output monitoring tasks
outputTask?.cancel()
errorTask?.cancel()
// Close pipes to trigger EOF in monitors
if let pipe = self.stdoutPipe {
try? pipe.fileHandleForReading.close()
}
if let pipe = self.stderrPipe {
try? pipe.fileHandleForReading.close()
}
// Give tasks a moment to complete
try? await Task.sleep(for: .milliseconds(100))
// Terminate the process
await process.terminateAsync()
@ -221,12 +290,7 @@ final class BunServer {
}
// Clean up
self.process = nil
self.stdoutPipe = nil
self.stderrPipe = nil
self.outputTask = nil
self.errorTask = nil
isRunning = false
await performCleanup()
logger.info("Bun server stopped")
}
@ -250,70 +314,135 @@ final class BunServer {
func cleanup() async {
await stop()
}
/// Get current server state
func getState() -> ServerState {
state
}
// MARK: - Private Methods
/// Perform cleanup of all resources
private func performCleanup() async {
self.process = nil
self.stdoutPipe = nil
self.stderrPipe = nil
self.outputTask = nil
self.errorTask = nil
}
private func startOutputMonitoring() {
// Capture pipes and port before starting detached tasks
let stdoutPipe = self.stdoutPipe
let stderrPipe = self.stderrPipe
guard let stdoutPipe = self.stdoutPipe,
let stderrPipe = self.stderrPipe else {
logger.warning("No pipes available for monitoring")
return
}
let currentPort = self.port
// Create a sendable reference for logging
let logHandler = LogHandler()
// Monitor stdout on background thread
outputTask = Task.detached { [weak self] in
guard let self, let pipe = stdoutPipe else { return }
// Monitor stdout on background thread with DispatchSource
outputTask = Task.detached { [logHandler] in
let pipe = stdoutPipe
let handle = pipe.fileHandleForReading
self.logger.debug("Starting stdout monitoring for Bun server on port \(currentPort)")
while !Task.isCancelled {
autoreleasepool {
let data = handle.availableData
if !data.isEmpty, let output = String(data: data, encoding: .utf8) {
let lines = output.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: .newlines)
for line in lines where !line.isEmpty {
// Skip shell initialization messages
if line.contains("zsh:") || line.hasPrefix("Last login:") {
continue
}
// Log to OSLog with appropriate level
Task { @MainActor in
self.logServerOutput(line, isError: false)
}
let source = DispatchSource.makeReadSource(fileDescriptor: handle.fileDescriptor)
let logger = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "BunServer")
logger.debug("Starting stdout monitoring for Bun server on port \(currentPort)")
// Create a cancellation handler
let cancelSource = {
source.cancel()
try? handle.close()
}
source.setEventHandler { [logHandler] in
let data = handle.availableData
if data.isEmpty {
// EOF reached
cancelSource()
return
}
if let output = String(data: data, encoding: .utf8) {
let lines = output.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: .newlines)
for line in lines where !line.isEmpty {
// Skip shell initialization messages
if line.contains("zsh:") || line.hasPrefix("Last login:") {
continue
}
// Log to OSLog with appropriate level
logHandler.log(line, isError: false)
}
}
}
self.logger.debug("Stopped stdout monitoring for Bun server")
source.setCancelHandler {
logger.debug("Stopped stdout monitoring for Bun server")
}
source.activate()
// Keep the task alive until cancelled
while !Task.isCancelled {
try? await Task.sleep(for: .milliseconds(100))
}
cancelSource()
}
// Monitor stderr on background thread
errorTask = Task.detached { [weak self] in
guard let self, let pipe = stderrPipe else { return }
// Monitor stderr on background thread with DispatchSource
errorTask = Task.detached { [logHandler] in
let pipe = stderrPipe
let handle = pipe.fileHandleForReading
self.logger.debug("Starting stderr monitoring for Bun server on port \(currentPort)")
while !Task.isCancelled {
autoreleasepool {
let data = handle.availableData
if !data.isEmpty, let output = String(data: data, encoding: .utf8) {
let lines = output.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: .newlines)
for line in lines where !line.isEmpty {
// Log stderr as errors/warnings
Task { @MainActor in
self.logServerOutput(line, isError: true)
}
}
let source = DispatchSource.makeReadSource(fileDescriptor: handle.fileDescriptor)
let logger = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "BunServer")
logger.debug("Starting stderr monitoring for Bun server on port \(currentPort)")
// Create a cancellation handler
let cancelSource = {
source.cancel()
try? handle.close()
}
source.setEventHandler { [logHandler] in
let data = handle.availableData
if data.isEmpty {
// EOF reached
cancelSource()
return
}
if let output = String(data: data, encoding: .utf8) {
let lines = output.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: .newlines)
for line in lines where !line.isEmpty {
// Log stderr as errors/warnings
logHandler.log(line, isError: true)
}
}
}
self.logger.debug("Stopped stderr monitoring for Bun server")
source.setCancelHandler {
logger.debug("Stopped stderr monitoring for Bun server")
}
source.activate()
// Keep the task alive until cancelled
while !Task.isCancelled {
try? await Task.sleep(for: .milliseconds(100))
}
cancelSource()
}
}
@ -362,11 +491,17 @@ final class BunServer {
await process.waitUntilExitAsync()
let exitCode = process.terminationStatus
// Check current state
let currentState = state
let wasRunning = currentState == .running
if wasRunning {
state = .crashed
}
if self.isRunning {
if wasRunning {
// Unexpected termination
self.logger.error("Bun server terminated unexpectedly with exit code: \(exitCode)")
self.isRunning = false
// Clean up process reference
self.process = nil
@ -391,6 +526,7 @@ enum BunServerError: LocalizedError {
case binaryNotFound
case processFailedToStart
case invalidPort
case invalidState
var errorDescription: String? {
switch self {
@ -400,6 +536,8 @@ enum BunServerError: LocalizedError {
"The server process failed to start"
case .invalidPort:
"Server port is not configured"
case .invalidState:
"Server is in an invalid state for this operation"
}
}
}
@ -465,3 +603,24 @@ extension Process {
}
}
}
// MARK: - LogHandler
/// A sendable log handler for use in detached tasks
private final class LogHandler: Sendable {
private let serverOutput = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "ServerOutput")
func log(_ line: String, isError: Bool) {
let lowercased = line.lowercased()
if isError || lowercased.contains("error") || lowercased.contains("failed") || lowercased.contains("fatal") {
serverOutput.error("\(line, privacy: .public)")
} else if lowercased.contains("warn") || lowercased.contains("warning") {
serverOutput.warning("\(line, privacy: .public)")
} else if lowercased.contains("debug") || lowercased.contains("verbose") {
serverOutput.debug("\(line, privacy: .public)")
} else {
serverOutput.info("\(line, privacy: .public)")
}
}
}

View file

@ -90,16 +90,39 @@ class ServerManager {
func start() async {
// Check if we already have a running server
if let existingServer = bunServer {
logger.info("Server already running on port \(existingServer.port)")
// Ensure our state is synced
isRunning = true
lastError = nil
return
let state = existingServer.getState()
switch state {
case .running:
logger.info("Server already running on port \(existingServer.port)")
// Ensure our state is synced
isRunning = true
lastError = nil
return
case .starting:
logger.info("Server is already starting")
return
case .stopping:
logger.warning("Cannot start server while it's stopping")
lastError = BunServerError.invalidState
return
case .crashed, .idle:
// Clean up and proceed with start
bunServer = nil
isRunning = false
}
}
// First check if port is truly available by trying to bind to it
let portNumber = Int(self.port) ?? 4_020
let canBind = await PortConflictResolver.shared.canBindToPort(portNumber)
if !canBind {
logger.warning("Cannot bind to port \(portNumber), checking for conflicts...")
}
// Check for port conflicts before starting
if let conflict = await PortConflictResolver.shared.detectConflict(on: Int(self.port) ?? 4_020) {
if let conflict = await PortConflictResolver.shared.detectConflict(on: portNumber) {
logger.warning("Port \(self.port) is in use by \(conflict.process.name) (PID: \(conflict.process.pid))")
// Handle based on conflict type
@ -109,8 +132,7 @@ class ServerManager {
do {
try await PortConflictResolver.shared.resolveConflict(conflict)
// Wait a moment for port to be fully released
try await Task.sleep(for: .milliseconds(500))
// resolveConflict now includes exponential backoff
} catch {
logger.error("Failed to resolve port conflict: \(error)")
lastError = PortConflictError.failedToKillProcess(pid: pid)
@ -147,11 +169,19 @@ class ServerManager {
try await server.start()
bunServer = server
isRunning = true
lastError = nil
// Reset crash counter on successful start
consecutiveCrashes = 0
// Check server state to ensure it's actually running
if server.getState() == .running {
isRunning = true
lastError = nil
// Reset crash counter on successful start
consecutiveCrashes = 0
} else {
logger.error("Server started but not in running state")
isRunning = false
bunServer = nil
lastError = BunServerError.processFailedToStart
return
}
logger.info("Started server on port \(self.port)")
@ -161,14 +191,9 @@ class ServerManager {
logger.error("Failed to start server: \(error.localizedDescription)")
lastError = error
// Check if server is actually running despite the error
if let server = bunServer, server.isRunning {
logger.warning("Server reported as running despite startup error, syncing state")
isRunning = true
} else {
isRunning = false
bunServer = nil
}
// Always clean up on error
isRunning = false
bunServer = nil
}
}
@ -202,8 +227,29 @@ class ServerManager {
await stop()
// Add a brief delay to ensure the port is released by the OS
try? await Task.sleep(for: .milliseconds(500))
// Wait with exponential backoff for port to be available
let portNumber = Int(self.port) ?? 4_020
var retries = 0
let maxRetries = 5
while retries < maxRetries {
let delay = 1.0 * pow(2.0, Double(retries)) // 1, 2, 4, 8, 16 seconds
logger.info("Waiting \(delay) seconds for port to be released (attempt \(retries + 1)/\(maxRetries))...")
try? await Task.sleep(for: .seconds(delay))
if await PortConflictResolver.shared.canBindToPort(portNumber) {
logger.info("Port \(portNumber) is now available")
break
}
retries += 1
}
if retries == maxRetries {
logger.error("Port \(portNumber) still unavailable after \(maxRetries) attempts")
lastError = PortConflictError.portStillInUse(port: portNumber)
return
}
await start()
}
@ -268,7 +314,12 @@ class ServerManager {
/// Handle server crash with automatic restart logic
private func handleServerCrash(exitCode: Int32) async {
logger.error("Server crashed with exit code: \(exitCode)")
// Special handling for exit code 9 (port in use)
if exitCode == 9 {
logger.error("Server failed to start: Port \(self.port) is already in use")
} else {
logger.error("Server crashed with exit code: \(exitCode)")
}
// Update state immediately
isRunning = false
@ -314,14 +365,69 @@ class ServerManager {
return
}
// Calculate backoff delay
let baseDelay: TimeInterval = 2.0
let delay = baseDelay * pow(2.0, Double(consecutiveCrashes - 1))
// Special handling for exit code 9 (port already in use)
if exitCode == 9 {
logger.info("Port \(self.port) is in use, checking for conflicts...")
logger.info("Will restart server after \(delay) seconds (attempt \(self.consecutiveCrashes) of \(maxRetries))")
// Check for port conflicts
if let conflict = await PortConflictResolver.shared.detectConflict(on: Int(self.port) ?? 4_020) {
logger.warning("Found port conflict: \(conflict.process.name) (PID: \(conflict.process.pid))")
// Wait with exponential backoff
try? await Task.sleep(for: .seconds(delay))
// Try to resolve the conflict
if case .killOurInstance(let pid, let processName) = conflict.suggestedAction {
logger.info("Attempting to kill conflicting process: \(processName) (PID: \(pid))")
do {
try await PortConflictResolver.shared.resolveConflict(conflict)
// resolveConflict now includes exponential backoff
} catch {
logger.error("Failed to resolve port conflict: \(error)")
lastError = PortConflictError.failedToKillProcess(pid: pid)
return
}
} else {
logger.error("Cannot auto-resolve port conflict")
return
}
} else {
// Port might still be in TIME_WAIT state, wait with backoff
logger.info("Port may be in TIME_WAIT state, checking availability...")
let portNumber = Int(self.port) ?? 4_020
var retries = 0
let maxRetries = 5
while retries < maxRetries {
let delay = 2.0 * pow(2.0, Double(retries)) // 2, 4, 8, 16, 32 seconds
logger.info("Waiting \(delay) seconds for port to clear (attempt \(retries + 1)/\(maxRetries))...")
try? await Task.sleep(for: .seconds(delay))
if await PortConflictResolver.shared.canBindToPort(portNumber) {
logger.info("Port \(portNumber) is now available")
break
}
retries += 1
}
if retries == maxRetries {
logger.error("Port \(portNumber) still in TIME_WAIT after \(maxRetries) attempts")
lastError = PortConflictError.portStillInUse(port: portNumber)
return
}
}
} else {
// Normal crash handling with exponential backoff
let baseDelay: TimeInterval = 2.0
let delay = baseDelay * pow(2.0, Double(consecutiveCrashes - 1))
logger
.info("Will restart server after \(delay) seconds (attempt \(self.consecutiveCrashes) of \(maxRetries))"
)
// Wait with exponential backoff
try? await Task.sleep(for: .seconds(delay))
}
// Only restart if we haven't been manually stopped in the meantime
guard bunServer == nil else {
@ -342,16 +448,19 @@ class ServerManager {
guard let server = bunServer else { continue }
// Check if the server process is still running
// Check server state and process health
let state = server.getState()
let health = await server.checkHealth()
if !health && isRunning {
if (!health || state == .crashed) && isRunning {
logger.warning("Server health check failed but state shows running, syncing state")
isRunning = false
bunServer = nil
// Trigger restart
await handleServerCrash(exitCode: -1)
// Only trigger restart if not already handling a crash
if !isHandlingCrash {
await handleServerCrash(exitCode: -1)
}
}
}
}

View file

@ -1,6 +1,6 @@
import Foundation
import OSLog
import Darwin
import Darwin.C
/// Information about a process that's using a port
struct ProcessDetails {