Add scripts to work in parallel and aggregate results separately

This commit is contained in:
Sami Samhuri 2025-06-24 21:55:21 -04:00
parent c9fbfc1b67
commit 554488d1c4
No known key found for this signature in database
4 changed files with 458 additions and 2 deletions

106
aggregate_results.rb Executable file
View file

@ -0,0 +1,106 @@
#!/usr/bin/env ruby
require 'csv'
require 'json'
require 'fileutils'
puts "🔄 Aggregating results into master.csv..."
# Create master CSV
master_csv = CSV.open('results/master.csv', 'w')
master_csv << %w[model image_size prompt_name image_filename tags raw_output timestamp success]
# Stats tracking
total_rows = 0
missing_files = []
# Walk through all result directories
Dir.glob('results/*/*/*').select { |f| f.end_with?('.csv') }.sort.each do |csv_file|
# Extract metadata from path
parts = csv_file.split('/')
model = parts[-3].gsub('-', ':')
size = parts[-2].to_i
prompt_name = File.basename(parts[-1], '.csv')
# Skip the master.csv if it exists
next if csv_file.include?('master.csv')
print "\r Processing: #{model}/#{size}/#{prompt_name}..."
begin
# Read the CSV
row_count = 0
CSV.foreach(csv_file, headers: true) do |row|
master_csv << [
model,
size,
prompt_name,
row['image_filename'],
row['tags'],
row['raw_output'],
row['timestamp'],
row['success']
]
row_count += 1
total_rows += 1
end
print "\r#{model}/#{size}/#{prompt_name}: #{row_count} rows"
puts
rescue => e
missing_files << csv_file
puts "\r ❌ Error reading #{csv_file}: #{e.message}"
end
end
master_csv.close
puts "\n" + "=" * 60
puts "AGGREGATION COMPLETE"
puts "=" * 60
puts "Total rows: #{total_rows}"
puts "Output: results/master.csv"
if missing_files.any?
puts "\n⚠️ Failed to read #{missing_files.length} files:"
missing_files.each { |f| puts "#{f}" }
end
# Generate summary statistics
puts "\n📊 Generating summary statistics..."
summary = {
total_analyses: total_rows,
by_model: Hash.new(0),
by_size: Hash.new(0),
by_prompt: Hash.new(0),
success_rate: 0,
timestamp: Time.now.iso8601
}
success_count = 0
CSV.foreach('results/master.csv', headers: true) do |row|
summary[:by_model][row['model']] += 1
summary[:by_size][row['image_size']] += 1
summary[:by_prompt][row['prompt_name']] += 1
success_count += 1 if row['success'] == 'true'
end
summary[:success_rate] = (success_count.to_f / total_rows * 100).round(2)
File.write('results/summary.json', JSON.pretty_generate(summary))
puts "\nSummary by model:"
summary[:by_model].each do |model, count|
puts "#{model}: #{count} analyses"
end
puts "\nSummary by size:"
summary[:by_size].sort_by { |size, _| size.to_i }.each do |size, count|
puts "#{size}px: #{count} analyses"
end
puts "\nSuccess rate: #{summary[:success_rate]}%"
puts "\n📄 Full summary saved to: results/summary.json"

View file

@ -16,8 +16,8 @@ class TagExtractor
'qwen2.5vl:3b' => 2,
'moondream:1.8b' => 8, # doesn't help a lot but doesn't hurt either
'llava:7b' => 2,
'llava:13b' => 2,
'llama3.2-vision:11b' => 1, # super slow, 3+ minutes for 8 photos
# 'llava:13b' => 2,
# 'llama3.2-vision:11b' => 1, # super slow, 3+ minutes for 8 photos
'llava-phi3:3.8b' => 4
}
VALID_EXTENSIONS = %w[.jpg .jpeg .png .gif .bmp .tiff .tif].freeze

201
extract_tags_worker.rb Executable file
View file

@ -0,0 +1,201 @@
#!/usr/bin/env ruby
require 'json'
require 'base64'
require 'net/http'
require 'uri'
require 'fileutils'
require 'csv'
require 'optparse'
require 'time'
# Simplified worker that processes a specific model/size/prompt combination
class TagExtractorWorker
OLLAMA_URL = 'http://localhost:11434/api/generate'
def initialize(model:, size:, prompt:, timeout: 120)
@model = model
@size = size
@prompt_name = prompt
@prompt_file = "prompts/#{prompt}.txt"
@timeout = timeout
unless File.exist?(@prompt_file)
raise "Prompt file not found: #{@prompt_file}"
end
@prompt_content = File.read(@prompt_file).strip
end
def run
output_dir = "results/#{@model.gsub(':', '-')}/#{@size}"
FileUtils.mkdir_p(output_dir)
csv_path = File.join(output_dir, "#{@prompt_name}.csv")
# Check if already processed
if File.exist?(csv_path)
existing_count = CSV.read(csv_path).length - 1 # Minus header
total_images = Dir["photo-#{@size}/*.{jpg,jpeg,png}"].length
if existing_count >= total_images
puts "✓ Already complete: #{@model}/#{@size}/#{@prompt_name} (#{existing_count} images)"
return
else
puts "⚠️ Resuming: #{@model}/#{@size}/#{@prompt_name} (#{existing_count}/#{total_images} done)"
end
end
puts "🚀 Processing: #{@model} / size=#{@size} / prompt=#{@prompt_name}"
# Collect images
images = Dir["photo-#{@size}/*"].select { |f| f.match?(/\.(jpg|jpeg|png)$/i) }.sort
if images.empty?
puts "❌ No images found in photo-#{@size}/"
return
end
# Load existing results to avoid reprocessing
processed = Set.new
if File.exist?(csv_path)
CSV.foreach(csv_path, headers: true) do |row|
processed << row['image_filename']
end
end
# Open CSV for appending
is_new = !File.exist?(csv_path)
csv = CSV.open(csv_path, 'a')
csv << %w[image_filename tags raw_output timestamp success] if is_new
# Process images
images.each_with_index do |image_path, idx|
filename = File.basename(image_path)
if processed.include?(filename)
next
end
print "\r Progress: #{idx + 1}/#{images.length} - #{filename}"
# Process image
result = process_image(image_path)
# Save result
csv << [
filename,
result[:tags],
result[:raw_output].gsub("\n", " "),
Time.now.iso8601,
result[:success]
]
csv.flush
end
csv.close
puts "\n✅ Complete: #{images.length} images processed"
# Save metadata
metadata_path = File.join(output_dir, 'run.json')
File.write(metadata_path, JSON.pretty_generate({
model: @model,
image_size: @size,
prompt_name: @prompt_name,
timestamp: Time.now.iso8601,
images_processed: images.length
}))
end
private
def process_image(image_path)
# Read and encode image
image_data = File.read(image_path, mode: 'rb')
image_base64 = Base64.strict_encode64(image_data)
# Query Ollama
uri = URI.parse(OLLAMA_URL)
http = Net::HTTP.new(uri.host, uri.port)
http.read_timeout = @timeout
request = Net::HTTP::Post.new(uri.path)
request['Content-Type'] = 'application/json'
request.body = {
model: @model,
prompt: @prompt_content,
images: [image_base64],
stream: false,
options: {
temperature: 0.1,
num_predict: 500
}
}.to_json
response = http.request(request)
if response.code == '200'
data = JSON.parse(response.body)
raw_output = data['response']
tags = extract_tags(raw_output)
{ success: true, tags: tags, raw_output: raw_output }
else
{ success: false, tags: '', raw_output: "HTTP #{response.code}: #{response.message}" }
end
rescue Net::ReadTimeout
{ success: false, tags: '', raw_output: "Timeout after #{@timeout}s" }
rescue => e
{ success: false, tags: '', raw_output: "Error: #{e.message}" }
end
def extract_tags(raw_output)
cleaned = raw_output.strip
lines = cleaned.split("\n")
tag_line = lines.find { |line| line.include?(',') } || cleaned
tag_line
.gsub(/^(tags:|keywords:|output:)/i, '')
.gsub(/["\[\]{}]/, '')
.strip
end
end
# CLI
if __FILE__ == $0
options = {}
OptionParser.new do |opts|
opts.banner = "Usage: #{$0} -m MODEL -s SIZE -p PROMPT [options]"
opts.on("-m", "--model MODEL", "Model to use (required)") do |m|
options[:model] = m
end
opts.on("-s", "--size SIZE", Integer, "Image size (required)") do |s|
options[:size] = s
end
opts.on("-p", "--prompt PROMPT", "Prompt name without .txt (required)") do |p|
options[:prompt] = p
end
opts.on("-t", "--timeout SECONDS", Integer, "Request timeout (default: 120)") do |t|
options[:timeout] = t
end
opts.on("-h", "--help", "Show this help") do
puts opts
exit
end
end.parse!
if options[:model].nil? || options[:size].nil? || options[:prompt].nil?
puts "Error: Missing required arguments"
puts "Run with -h for help"
exit 1
end
worker = TagExtractorWorker.new(**options)
worker.run
end

149
run_parallel_extraction.rb Executable file
View file

@ -0,0 +1,149 @@
#!/usr/bin/env ruby
require 'csv'
require 'optparse'
require 'fileutils'
# Get all combinations of model, size, and prompt
def get_all_jobs
models = [
'qwen2.5vl:3b',
'moondream:1.8b',
'llava:7b',
'llava:13b',
# 'llama3.2-vision:11b',
'llava-phi3:3.8b'
]
sizes = Dir.glob('photo-*').select { |d| File.directory?(d) }
.map { |d| d.match(/photo-(\d+)/)[1].to_i }
.sort
prompts = Dir.glob('prompts/*.txt')
.map { |f| File.basename(f, '.txt') }
.sort
jobs = []
models.each do |model|
sizes.each do |size|
prompts.each do |prompt|
jobs << { model: model, size: size, prompt: prompt }
end
end
end
jobs
end
# Check if a job is already complete
def job_complete?(job)
csv_path = "results/#{job[:model].gsub(':', '-')}/#{job[:size]}/#{job[:prompt]}.csv"
return false unless File.exist?(csv_path)
# Check if all images were processed
csv_count = CSV.read(csv_path).length - 1 # Minus header
image_count = Dir["photo-#{job[:size]}/*.{jpg,jpeg,png}"].length
csv_count >= image_count
end
# Main execution
options = {
parallel: 2,
models: nil,
skip_complete: true
}
OptionParser.new do |opts|
opts.banner = "Usage: #{$0} [options]"
opts.on("-j", "--parallel NUM", Integer, "Number of parallel workers (default: 2)") do |n|
options[:parallel] = n
end
opts.on("-m", "--models MODELS", "Comma-separated list of models to process") do |m|
options[:models] = m.split(',').map(&:strip)
end
opts.on("--no-skip", "Don't skip completed jobs") do
options[:skip_complete] = false
end
opts.on("-h", "--help", "Show this help") do
puts opts
exit
end
end.parse!
# Get all jobs
all_jobs = get_all_jobs
# Filter by models if specified
if options[:models]
all_jobs.select! { |job| options[:models].include?(job[:model]) }
end
# Filter completed jobs
if options[:skip_complete]
remaining_jobs = all_jobs.reject { |job| job_complete?(job) }
completed = all_jobs.length - remaining_jobs.length
if completed > 0
puts "✓ Skipping #{completed} completed jobs"
end
all_jobs = remaining_jobs
end
if all_jobs.empty?
puts "✅ All jobs complete!"
exit
end
puts "📊 Jobs to process: #{all_jobs.length}"
puts "🚀 Running with #{options[:parallel]} parallel workers"
puts
# Group jobs by model to minimize model switching
jobs_by_model = all_jobs.group_by { |job| job[:model] }
# Process each model's jobs
jobs_by_model.each do |model, model_jobs|
puts "\n" + "=" * 60
puts "Processing #{model} (#{model_jobs.length} jobs)"
puts "=" * 60
# Ensure model is loaded
unless `ollama list`.include?(model.split(':').first)
puts "📦 Pulling #{model}..."
system("ollama pull #{model}")
end
# Process jobs in batches
model_jobs.each_slice(options[:parallel]) do |batch|
threads = batch.map do |job|
Thread.new do
cmd = [
"./extract_tags_worker.rb",
"-m '#{job[:model]}'",
"-s #{job[:size]}",
"-p '#{job[:prompt]}'"
].join(" ")
system(cmd)
end
end
# Wait for batch to complete
threads.each(&:join)
end
# Unload model to free memory
puts "🧹 Unloading #{model}..."
system("ollama stop #{model}", out: File::NULL, err: File::NULL)
end
puts "\n✅ All jobs complete!"
# Offer to aggregate results
puts "\nRun ./aggregate_results.rb to create the master CSV"