model_api / app.py
mgumowsk's picture
fs
2e9611f
"""
Object Detection with model_api - Gradio Application
Copyright (C) 2025
"""
import gradio as gr
import numpy as np
from pathlib import Path
from PIL import Image
import time
import os
from typing import Tuple, List
import asyncio
import warnings
import cv2
import uuid
from model_api.models import Model
from model_api.visualizer import Visualizer
warnings.filterwarnings("ignore", message=".*Invalid file descriptor.*")
if hasattr(asyncio, 'set_event_loop_policy'):
try:
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
except Exception:
pass
# Global variables for model caching
current_model = None
current_model_name = None
visualizer = Visualizer()
# Global variable for video streaming control
streaming = False
def get_available_models():
"""
Scan the models folder for .xml files and return list of model names.
Returns:
list: List of model names (without .xml extension)
"""
models_dir = Path("models")
xml_files = list(models_dir.glob("*.xml"))
model_names = [f.stem for f in xml_files]
return sorted(model_names)
def load_model(model_name: str, device: str = "CPU", confidence_threshold: float = 0.3):
"""
Load OpenVINO model using model_api.
Args:
model_name: Name of the model (without .xml extension)
device: Inference device (CPU, GPU, etc.)
confidence_threshold: Confidence threshold for predictions
Returns:
Model instance from model_api
"""
global current_model, current_model_name
# Always reload model to apply new confidence threshold
model_path = Path("models") / f"{model_name}.xml"
if not model_path.exists():
raise FileNotFoundError(f"Model not found: {model_path}")
print(f"Loading model: {model_name} with confidence threshold: {confidence_threshold}")
# Set configuration based on model type
configuration = {}
if "YOLO" in model_name.upper():
# YOLO models use confidence_threshold and iou_threshold
configuration["confidence_threshold"] = confidence_threshold
configuration["iou_threshold"] = 0.5 # Standard IoU threshold for NMS
else:
# Other detection models typically use CONFIDENCE_THRESHOLD
configuration["confidence_threshold"] = confidence_threshold
model = Model.create_model(str(model_path), device=device, configuration=configuration)
model.get_performance_metrics().reset()
current_model = model
current_model_name = model_name
print(f"Model {model_name} loaded successfully")
return model
def run_inference(
image: np.ndarray,
model_name: str,
confidence_threshold: float
) -> Tuple[Image.Image, str]:
"""
Perform inference and return visualized result with metrics.
Args:
image: Input image as numpy array
model_name: Name of the model to use
confidence_threshold: Confidence threshold for filtering predictions
Returns:
Tuple of (visualized_image, metrics_text)
"""
# Input validation
if image is None:
return None, "⚠️ Please upload an image first."
if model_name is None or model_name == "No models available":
return None, "⚠️ No model selected or available."
try:
model = load_model(model_name, confidence_threshold=confidence_threshold)
# Run inference
result = model(image)
# Get performance metrics
metrics = model.get_performance_metrics()
inference_time = metrics.get_inference_time()
preprocess_time = metrics.get_preprocess_time()
postprocess_time = metrics.get_postprocess_time()
fps = metrics.get_fps()
# Format metrics text
metrics_text = f"""🔄 Preprocessing: {preprocess_time.mean()*1000:.2f} ms
⚙️ Inference: {inference_time.mean()*1000:.2f} ms
📊 Postprocessing: {postprocess_time.mean()*1000:.2f} ms
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
⏱️ Total Time: {(preprocess_time.mean() + inference_time.mean() + postprocess_time.mean())*1000:.2f} ms
🎯 FPS: {fps:.2f}
📈 Total Frames: {inference_time.count}
"""
# Visualize results using model_api's visualizer
print(f"Visualizing results with confidence threshold: {confidence_threshold}")
visualized_image = visualizer.render(image, result)
return visualized_image, metrics_text
except Exception as e:
error_msg = f"Error during inference: {str(e)}"
return None, error_msg
def run_video_inference(
video_path: str,
model_name: str,
confidence_threshold: float
):
"""
Process video and return complete result with inference.
Args:
video_path: Path to input video file
model_name: Name of the model to use
confidence_threshold: Confidence threshold for filtering predictions
Returns:
Tuple of (output_video_path, metrics_text, start_btn_state, stop_btn_state)
"""
global streaming
streaming = True
if video_path is None:
return None, "⚠️ Please upload a video first.", gr.update(interactive=True), gr.update(interactive=False)
if model_name is None or model_name == "No models available":
return None, "⚠️ No model selected or available.", gr.update(interactive=True), gr.update(interactive=False)
try:
# Load model
model = load_model(model_name, confidence_threshold=confidence_threshold)
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, "⚠️ Error: Could not open video file.", gr.update(interactive=True), gr.update(interactive=False)
# Get video properties
video_codec = cv2.VideoWriter_fourcc(*"mp4v")
fps = int(cap.get(cv2.CAP_PROP_FPS))
desired_fps = fps if fps > 0 else 30
# Read first frame to get dimensions
ret, frame = cap.read()
if not ret or frame is None:
return None, "⚠️ Error: Could not read video frames.", gr.update(interactive=True), gr.update(interactive=False)
# Process first frame to get output dimensions
result = model(frame)
result_image = visualizer.render(frame, result)
height, width = result_image.shape[:2]
# Create output video writer
output_video_name = f"/tmp/output_{uuid.uuid4()}.mp4"
output_video = cv2.VideoWriter(output_video_name, video_codec, desired_fps, (width, height))
# Write first frame
output_video.write(result_image)
n_frames = 1
# Process remaining frames
while streaming:
ret, frame = cap.read()
if not ret or frame is None:
break
# Run inference
result = model(frame)
result_image = visualizer.render(frame, result)
output_video.write(result_image)
n_frames += 1
# Release resources
output_video.release()
cap.release()
# Get final metrics
metrics = model.get_performance_metrics()
inference_time = metrics.get_inference_time()
preprocess_time = metrics.get_preprocess_time()
postprocess_time = metrics.get_postprocess_time()
fps_metric = metrics.get_fps()
final_metrics = f"""✅ Video Processing Complete!
🔄 Preprocessing: {preprocess_time.mean()*1000:.2f} ms
⚙️ Inference: {inference_time.mean()*1000:.2f} ms
📊 Postprocessing: {postprocess_time.mean()*1000:.2f} ms
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
⏱️ Total Time: {(preprocess_time.mean() + inference_time.mean() + postprocess_time.mean())*1000:.2f} ms
🎯 Average FPS: {fps_metric:.2f}
📈 Total Frames: {n_frames}
"""
# Verify final file exists before returning
if os.path.exists(output_video_name) and os.path.getsize(output_video_name) > 0:
return output_video_name, final_metrics, gr.update(interactive=True), gr.update(interactive=False)
else:
return None, final_metrics + "\n⚠️ Final video file not available.", gr.update(interactive=True), gr.update(interactive=False)
except Exception as e:
error_msg = f"Error during video inference: {str(e)}"
return None, error_msg, gr.update(interactive=True), gr.update(interactive=False)
def stop_video_inference():
"""Stop video processing."""
global streaming
streaming = False
return "⏹️ Video processing stopped.", gr.update(interactive=True), gr.update(interactive=False)
def run_webcam_inference(
frame: np.ndarray,
model_name: str,
confidence_threshold: float
) -> Tuple[Image.Image, str]:
"""
Process webcam stream - runs inference on captured camera frame.
Args:
frame: Input frame from webcam as numpy array
model_name: Name of the model to use
confidence_threshold: Confidence threshold for filtering predictions
Returns:
Tuple of (visualized_image, metrics_text)
"""
if frame is None:
return None, "⚠️ No frame received from webcam."
if model_name is None or model_name == "No models available":
return None, "⚠️ No model selected or available."
try:
# Load or use cached model
model = load_model(model_name, confidence_threshold=confidence_threshold)
# Run inference
result = model(frame)
# Visualize results
visualized_image = visualizer.render(frame, result)
# Get performance metrics
metrics = model.get_performance_metrics()
inference_time = metrics.get_inference_time()
preprocess_time = metrics.get_preprocess_time()
postprocess_time = metrics.get_postprocess_time()
fps = metrics.get_fps()
# Format metrics text
metrics_text = f"""🔄 Preprocessing: {preprocess_time.mean()*1000:.2f} ms
⚙️ Inference: {inference_time.mean()*1000:.2f} ms
📊 Postprocessing: {postprocess_time.mean()*1000:.2f} ms
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
⏱️ Total Time: {(preprocess_time.mean() + inference_time.mean() + postprocess_time.mean())*1000:.2f} ms
🎯 FPS: {fps:.2f}
📈 Total Frames: {inference_time.count}
"""
return visualized_image, metrics_text
except Exception as e:
error_msg = f"Error during webcam inference: {str(e)}"
return None, error_msg
def enable_video_buttons(video):
"""Enable start button when video is uploaded."""
if video is not None:
return gr.update(interactive=True), gr.update(interactive=False)
else:
return gr.update(interactive=False), gr.update(interactive=False)
def format_results(result, confidence_threshold: float) -> str:
"""
Format model results (classification or detection) as text.
Args:
result: Result object from model_api
confidence_threshold: Confidence threshold for filtering
Returns:
Formatted results text
"""
# Check if it's a classification result
if hasattr(result, 'top_labels') and result.top_labels:
results_text = "🔍 Classification Results:\n"
results_text += "━" * 50 + "\n"
filtered_labels = [
label for label in result.top_labels
if label.confidence >= confidence_threshold
]
if filtered_labels:
for i, label in enumerate(filtered_labels, 1):
results_text += f"{i}. {label.name}: {label.confidence:.3f}\n"
else:
results_text += f"No predictions above confidence threshold {confidence_threshold:.2f}\n"
# Check if it's a detection result
elif hasattr(result, 'segmentedObjects') and result.segmentedObjects:
results_text = "🔍 Detected Objects:\n"
results_text += "━" * 50 + "\n"
# Filter by confidence
filtered_objects = [
obj for obj in result.segmentedObjects
if obj.score >= confidence_threshold
]
if filtered_objects:
from collections import Counter
label_counts = Counter(obj.str_label for obj in filtered_objects)
for i, obj in enumerate(filtered_objects, 1):
x1, y1 = int(obj.xmin), int(obj.ymin)
x2, y2 = int(obj.xmax), int(obj.ymax)
results_text += f"{i}. {obj.str_label}: {obj.score:.3f} @ [{x1}, {y1}, {x2}, {y2}]\n"
results_text += "\n📊 Summary:\n"
for label, count in label_counts.most_common():
results_text += f" • {label}: {count}\n"
else:
results_text += f"No detections above confidence threshold {confidence_threshold:.2f}\n"
else:
results_text = "No results available\n"
return results_text
def create_gradio_interface():
"""
Create and configure the Gradio interface.
Returns:
gr.Blocks: Configured Gradio interface
"""
available_models = get_available_models()
if not available_models:
print("Warning: No models found in models/ folder")
available_models = ["No models available"]
with gr.Blocks(title="OpenVINO with model_api") as demo:
gr.Markdown("# 🎯 OpenVINO with model_api")
gr.Markdown("Experience high-performance object detection powered by **OpenVINO™** and **model_api**. See real-time inference with detailed performance metrics.")
with gr.Tabs() as tabs:
with gr.TabItem("📸 Image Inference"):
with gr.Row():
with gr.Column(scale=1):
input_image = gr.Image(
label="Input Image",
type="numpy"
)
model_dropdown = gr.Dropdown(
choices=available_models,
value=available_models[0] if available_models else None,
label="Select Model",
info="Choose a model from the models/ folder"
)
confidence_slider = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.3,
step=0.05,
label="Confidence Threshold",
info="Minimum confidence for displaying predictions"
)
classify_btn = gr.Button("🚀 Run Inference", variant="primary")
with gr.Column(scale=1):
output_image = gr.Image(
label="Detection Result",
type="pil",
show_label=False
)
metrics_output = gr.Textbox(
label="Performance Metrics",
lines=8,
max_lines=15
)
# Connect the button to the inference function
classify_btn.click(
fn=run_inference,
inputs=[input_image, model_dropdown, confidence_slider],
outputs=[output_image, metrics_output]
)
# Examples section
gr.Markdown("---")
gr.Markdown("## 📸 Example Images")
gr.Examples(
examples=[
["examples/vehicles.png", "YOLO-11-N" if "YOLO-11-N" in available_models else available_models[0], 0.5],
["examples/dog.jpg", "YOLO-11-S" if "YOLO-11-S" in available_models else available_models[0], 0.6],
["examples/people-walking.png", "YOLO-11-M" if "YOLO-11-M" in available_models else available_models[0], 0.3],
["examples/zidane.jpg", "resnet50" if "resnet50" in available_models else available_models[0], 0.5],
],
inputs=[input_image, model_dropdown, confidence_slider],
outputs=[output_image, metrics_output],
fn=run_inference,
cache_examples=True
)
with gr.TabItem("🎥 Video Inference"):
with gr.Row():
with gr.Column(scale=1):
input_video = gr.Video(
label="Input Video"
)
video_model_dropdown = gr.Dropdown(
choices=available_models,
value=available_models[0] if available_models else None,
label="Select Model",
info="Choose a model from the models/ folder"
)
video_confidence_slider = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.3,
step=0.05,
label="Confidence Threshold",
info="Minimum confidence for displaying predictions"
)
with gr.Row():
video_start_btn = gr.Button("▶️ Start Processing", variant="primary", interactive=False)
video_stop_btn = gr.Button("⏹️ Stop", variant="stop", interactive=False)
with gr.Column(scale=1):
output_video = gr.Video(
label="Processed Video",
autoplay=True,
show_label=False
)
video_metrics_output = gr.Textbox(
label="Performance Metrics",
lines=8,
max_lines=15
)
# Enable start button when video is uploaded
input_video.change(
fn=enable_video_buttons,
inputs=[input_video],
outputs=[video_start_btn, video_stop_btn]
)
# Connect video buttons - when clicked, start is disabled and stop is enabled
def start_processing_wrapper(video, model, conf):
# First disable start and enable stop
yield None, "🔄 Starting video processing...", gr.update(interactive=False), gr.update(interactive=True)
# Then run the actual processing
result = run_video_inference(video, model, conf)
yield result
video_start_btn.click(
fn=start_processing_wrapper,
inputs=[input_video, video_model_dropdown, video_confidence_slider],
outputs=[output_video, video_metrics_output, video_start_btn, video_stop_btn]
)
video_stop_btn.click(
fn=stop_video_inference,
inputs=None,
outputs=[video_metrics_output, video_start_btn, video_stop_btn]
)
# Video Examples section
gr.Markdown("---")
gr.Markdown("## 🎬 Example Videos")
gr.Examples(
examples=[
["examples/doggo.mp4", "YOLO-11-S" if "YOLO-11-S" in available_models else available_models[0], 0.4],
["examples/basketball.mp4", "YOLO-11-N" if "YOLO-11-N" in available_models else available_models[0], 0.3],
],
inputs=[input_video, video_model_dropdown, video_confidence_slider],
outputs=[output_video, video_metrics_output],
fn=run_video_inference,
cache_examples=True
)
with gr.TabItem("📹 Live Inference"):
gr.Markdown("### Real-time inference using your webcam")
gr.Markdown("⚠️ **Note:** Allow browser access to your webcam when prompted.")
with gr.Row():
with gr.Column(scale=1):
webcam_input = gr.Image(
sources=["webcam"],
label="Webcam",
type="numpy",
streaming=True,
show_label=False
)
webcam_model_dropdown = gr.Dropdown(
choices=available_models,
value=available_models[0] if available_models else None,
label="Select Model",
info="Choose a model from the models/ folder"
)
webcam_confidence_slider = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.3,
step=0.05,
label="Confidence Threshold",
info="Minimum confidence for displaying predictions"
)
with gr.Column(scale=1):
webcam_output = gr.Image(
label="Detection Result",
type="pil",
show_label=False
)
webcam_metrics_output = gr.Textbox(
label="Performance Metrics",
lines=8,
max_lines=15
)
# Set up streaming from webcam
webcam_input.stream(
fn=run_webcam_inference,
inputs=[webcam_input, webcam_model_dropdown, webcam_confidence_slider],
outputs=[webcam_output, webcam_metrics_output],
time_limit=60,
stream_every=0.1,
concurrency_limit=16
)
return demo
if __name__ == "__main__":
demo = create_gradio_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
# Disable experimental Server-Side Rendering to avoid asyncio fd cleanup errors
# observed on some hosting environments (e.g., HF Spaces with Python 3.10).
# Falling back to the classic Gradio frontend keeps the event loop lifecycle
# straightforward and prevents "Invalid file descriptor: -1" warnings at shutdown.
ssr_mode=False,
)