diff --git a/mac/VibeTunnel/Core/Services/BunServer.swift b/mac/VibeTunnel/Core/Services/BunServer.swift index b5c6f993..4f21dfa3 100644 --- a/mac/VibeTunnel/Core/Services/BunServer.swift +++ b/mac/VibeTunnel/Core/Services/BunServer.swift @@ -1,6 +1,15 @@ import Foundation import OSLog +/// Server state enumeration +enum ServerState { + case idle + case starting + case running + case stopping + case crashed +} + /// Bun vibetunnel server implementation. /// /// Manages the Bun-based vibetunnel server as a subprocess. This implementation @@ -18,11 +27,19 @@ final class BunServer { private var stderrPipe: Pipe? private var outputTask: Task? private var errorTask: Task? + + /// Server state machine - thread-safe through MainActor + private var state: ServerState = .idle + + /// Resource cleanup tracking + private var isCleaningUp = false private let logger = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "BunServer") private let serverOutput = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "ServerOutput") - var isRunning = false + var isRunning: Bool { + state == .running + } var port: String = "" @@ -37,10 +54,24 @@ final class BunServer { // MARK: - Public Methods func start() async throws { - guard !isRunning else { - logger.warning("Bun server already running") + // Update state atomically using MainActor + let currentState = state + if currentState == .running || currentState == .starting { + logger.warning("Bun server already running or starting") return } + if currentState == .stopping { + logger.warning("Cannot start server while stopping") + throw BunServerError.invalidState + } + state = .starting + + defer { + // Ensure we reset state on error + if state == .starting { + state = .idle + } + } guard !port.isEmpty else { let error = BunServerError.invalidPort @@ -88,7 +119,7 @@ final class BunServer { // Set working directory to Resources/web directory where public folder is located let webPath = URL(fileURLWithPath: resourcesPath).appendingPathComponent("web").path process.currentDirectoryURL = URL(fileURLWithPath: webPath) - logger.info("Working directory: \(webPath)") + logger.info("Process working directory: \(webPath)") // Static files are always at Resources/web/public let staticPath = URL(fileURLWithPath: resourcesPath).appendingPathComponent("web/public").path @@ -119,7 +150,7 @@ final class BunServer { process.arguments = ["-l", "-c", vibetunnelCommand] logger.info("Executing command: /bin/zsh -l -c \"\(vibetunnelCommand)\"") - logger.info("Working directory: \(resourcesPath)") + logger.info("Binary location: \(resourcesPath)") // Set up environment - login shell will load the rest let environment = ProcessInfo.processInfo.environment @@ -142,9 +173,6 @@ final class BunServer { // Start the process (this just launches it and returns immediately) try await process.runAsync() - // Mark server as running - isRunning = true - logger.info("Bun server process started") // Give the process a moment to start before checking for early failures @@ -152,9 +180,14 @@ final class BunServer { // Check if process exited immediately (indicating failure) if !process.isRunning { - isRunning = false let exitCode = process.terminationStatus - logger.error("Process exited immediately with code: \(exitCode)") + + // Special handling for exit code 9 (port in use) + if exitCode == 9 { + logger.error("Process exited immediately: Port \(self.port) is already in use (exit code: 9)") + } else { + logger.error("Process exited immediately with code: \(exitCode)") + } // Try to read any error output var errorDetails = "Exit code: \(exitCode)" @@ -169,6 +202,9 @@ final class BunServer { throw BunServerError.processFailedToStart } + // Mark server as running only after successful start + state = .running + logger.info("Bun server process started successfully") // Monitor process termination @@ -176,8 +212,6 @@ final class BunServer { await monitorProcessTermination() } } catch { - isRunning = false - // Log more detailed error information let errorMessage: String if let bunError = error as? BunServerError { @@ -197,8 +231,32 @@ final class BunServer { } func stop() async { - guard let process, isRunning else { - logger.warning("Bun server not running") + // Update state atomically using MainActor + switch state { + case .running, .crashed: + break // Continue with stop + default: + logger.warning("Bun server not running (state: \(String(describing: self.state)))") + return + } + + // Prevent concurrent cleanup + if isCleaningUp { + logger.warning("Already cleaning up server") + return + } + + state = .stopping + isCleaningUp = true + + defer { + state = .idle + isCleaningUp = false + } + + guard let process else { + logger.warning("No process to stop") + await performCleanup() return } @@ -207,6 +265,17 @@ final class BunServer { // Cancel output monitoring tasks outputTask?.cancel() errorTask?.cancel() + + // Close pipes to trigger EOF in monitors + if let pipe = self.stdoutPipe { + try? pipe.fileHandleForReading.close() + } + if let pipe = self.stderrPipe { + try? pipe.fileHandleForReading.close() + } + + // Give tasks a moment to complete + try? await Task.sleep(for: .milliseconds(100)) // Terminate the process await process.terminateAsync() @@ -221,12 +290,7 @@ final class BunServer { } // Clean up - self.process = nil - self.stdoutPipe = nil - self.stderrPipe = nil - self.outputTask = nil - self.errorTask = nil - isRunning = false + await performCleanup() logger.info("Bun server stopped") } @@ -250,70 +314,135 @@ final class BunServer { func cleanup() async { await stop() } + + /// Get current server state + func getState() -> ServerState { + state + } // MARK: - Private Methods + + /// Perform cleanup of all resources + private func performCleanup() async { + self.process = nil + self.stdoutPipe = nil + self.stderrPipe = nil + self.outputTask = nil + self.errorTask = nil + } private func startOutputMonitoring() { // Capture pipes and port before starting detached tasks - let stdoutPipe = self.stdoutPipe - let stderrPipe = self.stderrPipe + guard let stdoutPipe = self.stdoutPipe, + let stderrPipe = self.stderrPipe else { + logger.warning("No pipes available for monitoring") + return + } + let currentPort = self.port + + // Create a sendable reference for logging + let logHandler = LogHandler() - // Monitor stdout on background thread - outputTask = Task.detached { [weak self] in - guard let self, let pipe = stdoutPipe else { return } + // Monitor stdout on background thread with DispatchSource + outputTask = Task.detached { [logHandler] in + let pipe = stdoutPipe let handle = pipe.fileHandleForReading - self.logger.debug("Starting stdout monitoring for Bun server on port \(currentPort)") - - while !Task.isCancelled { - autoreleasepool { - let data = handle.availableData - if !data.isEmpty, let output = String(data: data, encoding: .utf8) { - let lines = output.trimmingCharacters(in: .whitespacesAndNewlines) - .components(separatedBy: .newlines) - for line in lines where !line.isEmpty { - // Skip shell initialization messages - if line.contains("zsh:") || line.hasPrefix("Last login:") { - continue - } - - // Log to OSLog with appropriate level - Task { @MainActor in - self.logServerOutput(line, isError: false) - } + let source = DispatchSource.makeReadSource(fileDescriptor: handle.fileDescriptor) + + let logger = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "BunServer") + logger.debug("Starting stdout monitoring for Bun server on port \(currentPort)") + + // Create a cancellation handler + let cancelSource = { + source.cancel() + try? handle.close() + } + + source.setEventHandler { [logHandler] in + let data = handle.availableData + if data.isEmpty { + // EOF reached + cancelSource() + return + } + + if let output = String(data: data, encoding: .utf8) { + let lines = output.trimmingCharacters(in: .whitespacesAndNewlines) + .components(separatedBy: .newlines) + for line in lines where !line.isEmpty { + // Skip shell initialization messages + if line.contains("zsh:") || line.hasPrefix("Last login:") { + continue } + + // Log to OSLog with appropriate level + logHandler.log(line, isError: false) } } } - - self.logger.debug("Stopped stdout monitoring for Bun server") + + source.setCancelHandler { + logger.debug("Stopped stdout monitoring for Bun server") + } + + source.activate() + + // Keep the task alive until cancelled + while !Task.isCancelled { + try? await Task.sleep(for: .milliseconds(100)) + } + + cancelSource() } - // Monitor stderr on background thread - errorTask = Task.detached { [weak self] in - guard let self, let pipe = stderrPipe else { return } + // Monitor stderr on background thread with DispatchSource + errorTask = Task.detached { [logHandler] in + let pipe = stderrPipe let handle = pipe.fileHandleForReading - self.logger.debug("Starting stderr monitoring for Bun server on port \(currentPort)") - - while !Task.isCancelled { - autoreleasepool { - let data = handle.availableData - if !data.isEmpty, let output = String(data: data, encoding: .utf8) { - let lines = output.trimmingCharacters(in: .whitespacesAndNewlines) - .components(separatedBy: .newlines) - for line in lines where !line.isEmpty { - // Log stderr as errors/warnings - Task { @MainActor in - self.logServerOutput(line, isError: true) - } - } + let source = DispatchSource.makeReadSource(fileDescriptor: handle.fileDescriptor) + + let logger = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "BunServer") + logger.debug("Starting stderr monitoring for Bun server on port \(currentPort)") + + // Create a cancellation handler + let cancelSource = { + source.cancel() + try? handle.close() + } + + source.setEventHandler { [logHandler] in + let data = handle.availableData + if data.isEmpty { + // EOF reached + cancelSource() + return + } + + if let output = String(data: data, encoding: .utf8) { + let lines = output.trimmingCharacters(in: .whitespacesAndNewlines) + .components(separatedBy: .newlines) + for line in lines where !line.isEmpty { + // Log stderr as errors/warnings + logHandler.log(line, isError: true) } } } - - self.logger.debug("Stopped stderr monitoring for Bun server") + + source.setCancelHandler { + logger.debug("Stopped stderr monitoring for Bun server") + } + + source.activate() + + // Keep the task alive until cancelled + while !Task.isCancelled { + try? await Task.sleep(for: .milliseconds(100)) + } + + cancelSource() } } @@ -362,11 +491,17 @@ final class BunServer { await process.waitUntilExitAsync() let exitCode = process.terminationStatus + + // Check current state + let currentState = state + let wasRunning = currentState == .running + if wasRunning { + state = .crashed + } - if self.isRunning { + if wasRunning { // Unexpected termination self.logger.error("Bun server terminated unexpectedly with exit code: \(exitCode)") - self.isRunning = false // Clean up process reference self.process = nil @@ -391,6 +526,7 @@ enum BunServerError: LocalizedError { case binaryNotFound case processFailedToStart case invalidPort + case invalidState var errorDescription: String? { switch self { @@ -400,6 +536,8 @@ enum BunServerError: LocalizedError { "The server process failed to start" case .invalidPort: "Server port is not configured" + case .invalidState: + "Server is in an invalid state for this operation" } } } @@ -465,3 +603,24 @@ extension Process { } } } + +// MARK: - LogHandler + +/// A sendable log handler for use in detached tasks +private final class LogHandler: Sendable { + private let serverOutput = Logger(subsystem: "sh.vibetunnel.vibetunnel", category: "ServerOutput") + + func log(_ line: String, isError: Bool) { + let lowercased = line.lowercased() + + if isError || lowercased.contains("error") || lowercased.contains("failed") || lowercased.contains("fatal") { + serverOutput.error("\(line, privacy: .public)") + } else if lowercased.contains("warn") || lowercased.contains("warning") { + serverOutput.warning("\(line, privacy: .public)") + } else if lowercased.contains("debug") || lowercased.contains("verbose") { + serverOutput.debug("\(line, privacy: .public)") + } else { + serverOutput.info("\(line, privacy: .public)") + } + } +} diff --git a/mac/VibeTunnel/Core/Services/ServerManager.swift b/mac/VibeTunnel/Core/Services/ServerManager.swift index ebb78d7b..87469568 100644 --- a/mac/VibeTunnel/Core/Services/ServerManager.swift +++ b/mac/VibeTunnel/Core/Services/ServerManager.swift @@ -90,16 +90,39 @@ class ServerManager { func start() async { // Check if we already have a running server if let existingServer = bunServer { - logger.info("Server already running on port \(existingServer.port)") - - // Ensure our state is synced - isRunning = true - lastError = nil - return + let state = existingServer.getState() + + switch state { + case .running: + logger.info("Server already running on port \(existingServer.port)") + // Ensure our state is synced + isRunning = true + lastError = nil + return + case .starting: + logger.info("Server is already starting") + return + case .stopping: + logger.warning("Cannot start server while it's stopping") + lastError = BunServerError.invalidState + return + case .crashed, .idle: + // Clean up and proceed with start + bunServer = nil + isRunning = false + } } + // First check if port is truly available by trying to bind to it + let portNumber = Int(self.port) ?? 4_020 + + let canBind = await PortConflictResolver.shared.canBindToPort(portNumber) + if !canBind { + logger.warning("Cannot bind to port \(portNumber), checking for conflicts...") + } + // Check for port conflicts before starting - if let conflict = await PortConflictResolver.shared.detectConflict(on: Int(self.port) ?? 4_020) { + if let conflict = await PortConflictResolver.shared.detectConflict(on: portNumber) { logger.warning("Port \(self.port) is in use by \(conflict.process.name) (PID: \(conflict.process.pid))") // Handle based on conflict type @@ -109,8 +132,7 @@ class ServerManager { do { try await PortConflictResolver.shared.resolveConflict(conflict) - // Wait a moment for port to be fully released - try await Task.sleep(for: .milliseconds(500)) + // resolveConflict now includes exponential backoff } catch { logger.error("Failed to resolve port conflict: \(error)") lastError = PortConflictError.failedToKillProcess(pid: pid) @@ -147,11 +169,19 @@ class ServerManager { try await server.start() bunServer = server - isRunning = true - lastError = nil - - // Reset crash counter on successful start - consecutiveCrashes = 0 + // Check server state to ensure it's actually running + if server.getState() == .running { + isRunning = true + lastError = nil + // Reset crash counter on successful start + consecutiveCrashes = 0 + } else { + logger.error("Server started but not in running state") + isRunning = false + bunServer = nil + lastError = BunServerError.processFailedToStart + return + } logger.info("Started server on port \(self.port)") @@ -161,14 +191,9 @@ class ServerManager { logger.error("Failed to start server: \(error.localizedDescription)") lastError = error - // Check if server is actually running despite the error - if let server = bunServer, server.isRunning { - logger.warning("Server reported as running despite startup error, syncing state") - isRunning = true - } else { - isRunning = false - bunServer = nil - } + // Always clean up on error + isRunning = false + bunServer = nil } } @@ -202,8 +227,29 @@ class ServerManager { await stop() - // Add a brief delay to ensure the port is released by the OS - try? await Task.sleep(for: .milliseconds(500)) + // Wait with exponential backoff for port to be available + let portNumber = Int(self.port) ?? 4_020 + var retries = 0 + let maxRetries = 5 + + while retries < maxRetries { + let delay = 1.0 * pow(2.0, Double(retries)) // 1, 2, 4, 8, 16 seconds + logger.info("Waiting \(delay) seconds for port to be released (attempt \(retries + 1)/\(maxRetries))...") + try? await Task.sleep(for: .seconds(delay)) + + if await PortConflictResolver.shared.canBindToPort(portNumber) { + logger.info("Port \(portNumber) is now available") + break + } + + retries += 1 + } + + if retries == maxRetries { + logger.error("Port \(portNumber) still unavailable after \(maxRetries) attempts") + lastError = PortConflictError.portStillInUse(port: portNumber) + return + } await start() } @@ -268,7 +314,12 @@ class ServerManager { /// Handle server crash with automatic restart logic private func handleServerCrash(exitCode: Int32) async { - logger.error("Server crashed with exit code: \(exitCode)") + // Special handling for exit code 9 (port in use) + if exitCode == 9 { + logger.error("Server failed to start: Port \(self.port) is already in use") + } else { + logger.error("Server crashed with exit code: \(exitCode)") + } // Update state immediately isRunning = false @@ -314,14 +365,69 @@ class ServerManager { return } - // Calculate backoff delay - let baseDelay: TimeInterval = 2.0 - let delay = baseDelay * pow(2.0, Double(consecutiveCrashes - 1)) + // Special handling for exit code 9 (port already in use) + if exitCode == 9 { + logger.info("Port \(self.port) is in use, checking for conflicts...") - logger.info("Will restart server after \(delay) seconds (attempt \(self.consecutiveCrashes) of \(maxRetries))") + // Check for port conflicts + if let conflict = await PortConflictResolver.shared.detectConflict(on: Int(self.port) ?? 4_020) { + logger.warning("Found port conflict: \(conflict.process.name) (PID: \(conflict.process.pid))") - // Wait with exponential backoff - try? await Task.sleep(for: .seconds(delay)) + // Try to resolve the conflict + if case .killOurInstance(let pid, let processName) = conflict.suggestedAction { + logger.info("Attempting to kill conflicting process: \(processName) (PID: \(pid))") + + do { + try await PortConflictResolver.shared.resolveConflict(conflict) + // resolveConflict now includes exponential backoff + } catch { + logger.error("Failed to resolve port conflict: \(error)") + lastError = PortConflictError.failedToKillProcess(pid: pid) + return + } + } else { + logger.error("Cannot auto-resolve port conflict") + return + } + } else { + // Port might still be in TIME_WAIT state, wait with backoff + logger.info("Port may be in TIME_WAIT state, checking availability...") + + let portNumber = Int(self.port) ?? 4_020 + var retries = 0 + let maxRetries = 5 + + while retries < maxRetries { + let delay = 2.0 * pow(2.0, Double(retries)) // 2, 4, 8, 16, 32 seconds + logger.info("Waiting \(delay) seconds for port to clear (attempt \(retries + 1)/\(maxRetries))...") + try? await Task.sleep(for: .seconds(delay)) + + if await PortConflictResolver.shared.canBindToPort(portNumber) { + logger.info("Port \(portNumber) is now available") + break + } + + retries += 1 + } + + if retries == maxRetries { + logger.error("Port \(portNumber) still in TIME_WAIT after \(maxRetries) attempts") + lastError = PortConflictError.portStillInUse(port: portNumber) + return + } + } + } else { + // Normal crash handling with exponential backoff + let baseDelay: TimeInterval = 2.0 + let delay = baseDelay * pow(2.0, Double(consecutiveCrashes - 1)) + + logger + .info("Will restart server after \(delay) seconds (attempt \(self.consecutiveCrashes) of \(maxRetries))" + ) + + // Wait with exponential backoff + try? await Task.sleep(for: .seconds(delay)) + } // Only restart if we haven't been manually stopped in the meantime guard bunServer == nil else { @@ -342,16 +448,19 @@ class ServerManager { guard let server = bunServer else { continue } - // Check if the server process is still running + // Check server state and process health + let state = server.getState() let health = await server.checkHealth() - if !health && isRunning { + if (!health || state == .crashed) && isRunning { logger.warning("Server health check failed but state shows running, syncing state") isRunning = false bunServer = nil - // Trigger restart - await handleServerCrash(exitCode: -1) + // Only trigger restart if not already handling a crash + if !isHandlingCrash { + await handleServerCrash(exitCode: -1) + } } } } diff --git a/mac/VibeTunnel/Core/Utilities/PortConflictResolver.swift b/mac/VibeTunnel/Core/Utilities/PortConflictResolver.swift index b5fa9ceb..c87bd289 100644 --- a/mac/VibeTunnel/Core/Utilities/PortConflictResolver.swift +++ b/mac/VibeTunnel/Core/Utilities/PortConflictResolver.swift @@ -1,6 +1,6 @@ import Foundation import OSLog -import Darwin +import Darwin.C /// Information about a process that's using a port struct ProcessDetails {