Added real streaming endpoint

This commit is contained in:
Armin Ronacher 2025-06-16 14:29:26 +02:00
parent 0a5f947b98
commit 1beb938646

View file

@ -10,7 +10,7 @@ use std::thread;
use std::time::SystemTime;
use uuid::Uuid;
use crate::http_server::{HttpServer, Method, Response, StatusCode};
use crate::http_server::{HttpRequest, HttpServer, Method, Response, StatusCode};
use crate::sessions;
// Types matching the TypeScript interface
@ -102,7 +102,9 @@ pub fn start_server(bind_address: &str, control_path: PathBuf) -> Result<()> {
(&Method::GET, path)
if path.starts_with("/api/sessions/") && path.ends_with("/stream") =>
{
handle_session_stream(&control_path, &path)
// Handle streaming differently - bypass normal response handling
handle_session_stream_direct(&control_path, &path, &mut req);
return; // Skip the normal response handling
}
(&Method::GET, path)
if path.starts_with("/api/sessions/") && path.ends_with("/snapshot") =>
@ -645,3 +647,254 @@ fn handle_session_stream(control_path: &PathBuf, path: &str) -> Response<String>
json_response(StatusCode::BAD_REQUEST, &error)
}
}
fn handle_session_stream_direct(control_path: &PathBuf, path: &str, req: &mut HttpRequest) {
let session_id = match extract_session_id(path) {
Some(id) => id,
None => {
let error = ApiResponse {
success: None,
message: None,
error: Some("Invalid session ID".to_string()),
session_id: None,
};
let response = json_response(StatusCode::BAD_REQUEST, &error);
let _ = req.respond(response_to_bytes(response));
return;
}
};
// First check if the session exists
let sessions = match sessions::list_sessions(control_path) {
Ok(sessions) => sessions,
Err(e) => {
let error = ApiResponse {
success: None,
message: None,
error: Some(format!("Failed to list sessions: {}", e)),
session_id: None,
};
let response = json_response(StatusCode::INTERNAL_SERVER_ERROR, &error);
let _ = req.respond(response_to_bytes(response));
return;
}
};
let session_entry = match sessions.get(&session_id) {
Some(entry) => entry,
None => {
let error = ApiResponse {
success: None,
message: None,
error: Some("Session not found".to_string()),
session_id: None,
};
let response = json_response(StatusCode::NOT_FOUND, &error);
let _ = req.respond(response_to_bytes(response));
return;
}
};
let stream_out_path = &session_entry.stream_out;
// Check if the stream-out file exists
if !std::path::Path::new(stream_out_path).exists() {
let error = ApiResponse {
success: None,
message: None,
error: Some("Session stream file not found".to_string()),
session_id: None,
};
let response = json_response(StatusCode::NOT_FOUND, &error);
let _ = req.respond(response_to_bytes(response));
return;
}
println!("Starting streaming SSE for session {}", session_id);
// Send SSE headers
let headers = "HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Access-Control-Allow-Origin: *
";
if let Err(e) = req.respond(headers.as_bytes()) {
println!("Failed to send SSE headers: {}", e);
return;
}
let start_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
// First, send existing content from the file
if let Ok(content) = fs::read_to_string(stream_out_path) {
let mut header_sent = false;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(line) {
// Check if this is a header line
if parsed.get("version").is_some()
&& parsed.get("width").is_some()
&& parsed.get("height").is_some()
{
let data = format!("data: {}
", line);
if let Err(e) = req.respond(data.as_bytes()) {
println!("Failed to send header data: {}", e);
return;
}
header_sent = true;
}
// Check if this is an event line [timestamp, type, data]
else if parsed.as_array().map(|arr| arr.len() >= 3).unwrap_or(false) {
// Convert to instant event for immediate playback
if let Some(arr) = parsed.as_array() {
let instant_event = serde_json::json!([0, arr[1], arr[2]]);
let data = format!("data: {}
", instant_event);
if let Err(e) = req.respond(data.as_bytes()) {
println!("Failed to send event data: {}", e);
return;
}
}
}
}
}
// Send default header if none found
if !header_sent {
let default_header = serde_json::json!({
"version": 2,
"width": 80,
"height": 24,
"timestamp": start_time as u64,
"env": { "TERM": "xterm-256color" }
});
let data = format!("data: {}
", default_header);
if let Err(e) = req.respond(data.as_bytes()) {
println!("Failed to send default header: {}", e);
return;
}
}
} else {
// Send default header if file can't be read
let default_header = serde_json::json!({
"version": 2,
"width": 80,
"height": 24,
"timestamp": start_time as u64,
"env": { "TERM": "xterm-256color" }
});
let data = format!("data: {}
", default_header);
if let Err(e) = req.respond(data.as_bytes()) {
println!("Failed to send fallback header: {}", e);
return;
}
}
// Now use tail -f to stream new content with immediate flushing
let stream_path_clone = stream_out_path.clone();
match Command::new("tail")
.args(&["-f", &stream_path_clone])
.stdout(Stdio::piped())
.spawn()
{
Ok(mut child) => {
if let Some(stdout) = child.stdout.take() {
let reader = BufReader::new(stdout);
// Stream lines immediately as they come in
for line in reader.lines() {
match line {
Ok(line) => {
if line.trim().is_empty() {
continue;
}
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&line) {
// Skip headers in tail output
if parsed.get("version").is_some() && parsed.get("width").is_some() {
continue;
}
// Process event lines
if let Some(arr) = parsed.as_array() {
if arr.len() >= 3 {
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
let real_time_event = serde_json::json!([
current_time - start_time,
arr[1],
arr[2]
]);
let data = format!("data: {}
", real_time_event);
if let Err(e) = req.respond(data.as_bytes()) {
println!("Failed to send streaming data: {}", e);
break;
}
}
}
} else {
// Handle non-JSON as raw output
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
let cast_event = serde_json::json!([current_time - start_time, "o", line]);
let data = format!("data: {}
", cast_event);
if let Err(e) = req.respond(data.as_bytes()) {
println!("Failed to send raw streaming data: {}", e);
break;
}
}
}
Err(e) => {
println!("Error reading from tail: {}", e);
break;
}
}
}
// Clean up
let _ = child.kill();
}
}
Err(e) => {
println!("Failed to start tail command: {}", e);
let error_data = format!("data: {{\"type\":\"error\",\"message\":\"Failed to start streaming: {}\"}}
", e);
let _ = req.respond(error_data.as_bytes());
}
}
// Send end marker
let end_data = "data: {\"type\":\"end\"}
";
let _ = req.respond(end_data.as_bytes());
println!("Ended streaming SSE for session {}", session_id);
}