Collect Metrics from Output Files (YOLO & Others)

Some frameworks like YOLOv8 write metrics to output files instead of printing them. Use a file watcher to monitor these files and stream their contents to Valohai metadata automatically.

This pattern works for any framework that writes metrics to CSV, JSON, or text files during training.


The Problem

YOLOv8 (and similar frameworks) write training metrics to CSV files in the outputs directory:

runs/train/exp/
├── results.csv          # Training metrics per epoch
├── weights/
│   ├── best.pt          # Best model
│   └── last.pt          # Latest model
└── ...

Challenge: These metrics aren't printed as JSON, so Valohai doesn't capture them automatically.

Solution: Use a file watcher script that monitors output files and prints their contents as JSON.


Quick Example

valohai.yaml

- step:
    name: train-yolov8
    image: ultralytics/yolov8:latest
    command:
      - git clone https://github.com/ultralytics/yolov8.git
      - tar -xf /valohai/inputs/dataset/coco128.tar
      - pip install watchdog
      - nohup python ./scripts/valohai_watch.py &  # Start watcher in background
      - python yolov8/train.py --data coco128.yaml --epochs {parameters}
    inputs:
      - name: dataset
        default: https://github.com/ultralytics/yolov8/releases/download/v1.0/coco128.tar.xz
    parameters:
      - name: epochs
        type: integer
        default: 10
    environment: aws-eu-west-1-g4dn-xlarge

scripts/valohai_watch.py

import os
import time
import json
import csv
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

VH_OUTPUTS_DIR = os.getenv("VH_OUTPUTS_DIR", "/valohai/outputs")

class ValohaiMetricsWatcher(PatternMatchingEventHandler):
    """Watch for CSV files and log their contents to Valohai"""
    
    def on_modified(self, event):
        if ".csv" in event.src_path:
            self.log_csv_metrics(event.src_path)
    
    def log_csv_metrics(self, csv_path):
        """Read CSV and log the latest row as metrics"""
        try:
            with open(csv_path, "r") as file:
                reader = csv.DictReader(file)
                rows = list(reader)
                
                if rows:
                    # Get the latest row
                    latest = rows[-1]
                    
                    # Convert to metrics
                    metadata = {}
                    for key, value in latest.items():
                        key = key.strip()
                        value = value.strip()
                        
                        # Try to convert to number
                        try:
                            if '.' in value:
                                metadata[key] = float(value)
                            else:
                                metadata[key] = int(value)
                        except ValueError:
                            metadata[key] = value
                    
                    # Log to Valohai
                    print(json.dumps(metadata))
        
        except Exception as e:
            print(f"Error reading CSV {csv_path}: {e}")

if __name__ == "__main__":
    event_handler = ValohaiMetricsWatcher(patterns=["*.csv"])
    observer = Observer()
    observer.schedule(event_handler, path=VH_OUTPUTS_DIR, recursive=True)
    observer.start()
    
    print(f"Watching {VH_OUTPUTS_DIR} for metric files...")
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()

How It Works

  1. Start watcher in background: nohup python valohai_watch.py & runs the watcher script as a background process

  2. Monitor output directory: The watcher uses watchdog to detect file changes in /valohai/outputs/

  3. Parse and log: When a CSV is modified, the watcher reads the latest row and prints it as JSON

  4. Valohai captures: Valohai sees the printed JSON and records it as metadata


Complete Working Example

Here's a full implementation with model aliasing:

scripts/valohai_watch.py (Complete)

import os
import time
import json
import csv
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

VH_OUTPUTS_DIR = os.getenv("VH_OUTPUTS_DIR", "/valohai/outputs")

class ValohaiHandler(PatternMatchingEventHandler):
    """Watch for CSV files and model checkpoints"""
    
    def on_modified(self, event):
        if ".csv" in event.src_path:
            self.log_csv_metrics(event.src_path)
    
    def on_created(self, event):
        if (".pt" in event.src_path) or (".onnx" in event.src_path):
            self.create_model_alias(event.src_path)
    
    def log_csv_metrics(self, csv_path):
        """Read CSV and log the latest row as metrics"""
        try:
            with open(csv_path, "r") as file:
                data = list(csv.reader(file, delimiter=","))
                
                if len(data) < 2:  # Need header + at least one row
                    return
                
                keys = [k.strip() for k in data[0]]
                latest_values = [v.strip() for v in data[-1]]
                
                metadata = {}
                for i in range(len(keys)):
                    key = keys[i]
                    value = latest_values[i]
                    
                    # Convert to appropriate type
                    try:
                        if '.' in value:
                            metadata[key] = float(value)
                        else:
                            metadata[key] = int(value)
                    except ValueError:
                        metadata[key] = value
                
                # Log to Valohai
                print(json.dumps(metadata))
        
        except Exception as e:
            print(f"Error reading CSV {csv_path}: {e}")
    
    def create_model_alias(self, model_path):
        """Create Valohai aliases for model files"""
        try:
            # Wait for file to be fully written
            time.sleep(1)
            
            # Determine alias based on filename
            if "best" in model_path:
                alias = "current-best-model"
                if "onnx" in model_path:
                    alias = "production-model"
            else:
                return  # Only alias best models
            
            # Create metadata for the model
            metadata = {
                "valohai.alias": alias
            }
            
            model_name = os.path.basename(model_path)
            model_dir = os.path.dirname(model_path)
            
            # Save metadata file alongside model
            metadata_path = os.path.join(model_dir, f"{model_name}.metadata.json")
            with open(metadata_path, "w") as outfile:
                json.dump(metadata, outfile)
            
            print(f"Created alias '{alias}' for {model_name}")
        
        except Exception as e:
            print(f"Error creating alias for {model_path}: {e}")

if __name__ == "__main__":
    event_handler = ValohaiHandler(patterns=["*.csv", "*.pt", "*.onnx"])
    observer = Observer()
    observer.schedule(event_handler, path=VH_OUTPUTS_DIR, recursive=True)
    observer.start()
    
    print(f"Watching {VH_OUTPUTS_DIR} for CSV and model files...")
    print("Patterns: *.csv, *.pt, *.onnx")
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nStopping watcher...")
        observer.stop()
    
    observer.join()

Adapting for Other File Formats

JSON Files

def on_modified(self, event):
    if ".json" in event.src_path:
        self.log_json_metrics(event.src_path)

def log_json_metrics(self, json_path):
    """Read JSON file and log its contents"""
    try:
        with open(json_path, "r") as file:
            data = json.load(file)
            
            # Log the entire JSON
            print(json.dumps(data))
    
    except Exception as e:
        print(f"Error reading JSON {json_path}: {e}")

Text Files with Key-Value Pairs

def on_modified(self, event):
    if ".txt" in event.src_path and "metrics" in event.src_path:
        self.log_text_metrics(event.src_path)

def log_text_metrics(self, text_path):
    """Parse text file with 'key: value' format"""
    try:
        metadata = {}
        with open(text_path, "r") as file:
            for line in file:
                if ":" in line:
                    key, value = line.split(":", 1)
                    key = key.strip()
                    value = value.strip()
                    
                    # Try to convert to number
                    try:
                        value = float(value) if '.' in value else int(value)
                    except ValueError:
                        pass  # Keep as string
                    
                    metadata[key] = value
        
        if metadata:
            print(json.dumps(metadata))
    
    except Exception as e:
        print(f"Error reading text {text_path}: {e}")

TensorBoard Event Files

For TensorBoard logs, use tensorboard library:

from tensorboard.backend.event_processing import event_accumulator

def log_tensorboard_metrics(self, event_path):
    """Parse TensorBoard event file"""
    try:
        ea = event_accumulator.EventAccumulator(event_path)
        ea.Reload()
        
        # Get all scalar tags
        tags = ea.Tags().get('scalars', [])
        
        for tag in tags:
            events = ea.Scalars(tag)
            if events:
                latest = events[-1]
                metadata = {
                    "step": latest.step,
                    tag: latest.value
                }
                print(json.dumps(metadata))
    
    except Exception as e:
        print(f"Error reading TensorBoard {event_path}: {e}")

Best Practices

Start Watcher Before Training

Always start the watcher before your training script:

command:
  - pip install watchdog valohai-utils
  - nohup python ./scripts/valohai_watch.py &  # Start watcher first
  - python train.py  # Then start training

Use nohup for Background Execution

nohup ensures the watcher keeps running even if the parent process terminates:

nohup python ./scripts/valohai_watch.py &

Handle Partial Writes

Files might be written incrementally. Add a small delay:

def on_modified(self, event):
    # Wait a moment for file write to complete
    time.sleep(0.5)
    
    if ".csv" in event.src_path:
        self.log_csv_metrics(event.src_path)

Filter by Filename Pattern

Only watch specific files to avoid unnecessary processing:

# Watch only files matching specific patterns
event_handler = ValohaiHandler(
    patterns=["*/results.csv", "*/best.pt", "*/best.onnx"],
    ignore_patterns=["*/tmp/*", "*/cache/*"]
)

Error Handling

Always wrap file operations in try-except:

def log_csv_metrics(self, csv_path):
    try:
        with open(csv_path, "r") as file:
            # Processing...
            pass
    except FileNotFoundError:
        print(f"File not found: {csv_path}")
    except PermissionError:
        print(f"Permission denied: {csv_path}")
    except Exception as e:
        print(f"Unexpected error reading {csv_path}: {e}")

Common Issues

Watcher Not Starting

Symptom: No metrics logged, watcher script never runs

Causes & Fixes:

  • Missing dependency → Install watchdog: pip install watchdog

  • Script not in correct location → Check path in command

  • Background process killed → Use nohup and &

Debug:

command:
  - python ./scripts/valohai_watch.py &  # Test without nohup
  - sleep 5  # Give watcher time to start
  - python train.py

Metrics Logged Multiple Times

Symptom: Same epoch metrics appear repeatedly

Cause: CSV file modified multiple times per epoch

Solution: Track last processed row:

class ValohaiMetricsWatcher(PatternMatchingEventHandler):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.last_row_count = {}
    
    def log_csv_metrics(self, csv_path):
        with open(csv_path, "r") as file:
            reader = csv.DictReader(file)
            rows = list(reader)
            
            # Only log if new rows added
            current_count = len(rows)
            last_count = self.last_row_count.get(csv_path, 0)
            
            if current_count > last_count:
                latest = rows[-1]
                # Log metrics...
                self.last_row_count[csv_path] = current_count

File Not Found Errors

Symptom: Watcher crashes when trying to read files

Cause: File deleted or moved before watcher can read it

Solution: Check file exists before reading:

def log_csv_metrics(self, csv_path):
    if not os.path.exists(csv_path):
        return
    
    try:
        with open(csv_path, "r") as file:
            # Process...
            pass
    except Exception as e:
        print(f"Error: {e}")

When to Use This Pattern

Use file watchers when:

  • Framework writes metrics to files (YOLOv8, MMDetection, etc.)

  • You can't modify the framework's code

  • Metrics are in CSV, JSON, or structured text

Don't use file watchers when:

  • You can modify your training code (use direct JSON printing instead)

  • Framework has callback/hook system (use callbacks)

  • Metrics are printed to stdout (already captured by Valohai)


Example Project

Check out our complete working example on GitHub:

valohai/yolo-example

The repository includes:

  • Complete watcher script

  • YOLOv5 and YOLOv8 training configuration

  • valohai.yaml with proper setup

  • Step-by-step instructions


Next Steps

Last updated

Was this helpful?