|
|
""" |
|
|
Object Detection with model_api - Gradio Application |
|
|
Copyright (C) 2025 |
|
|
""" |
|
|
|
|
|
import gradio as gr |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
from PIL import Image |
|
|
import time |
|
|
import os |
|
|
from typing import Tuple, List |
|
|
import asyncio |
|
|
import warnings |
|
|
import cv2 |
|
|
import uuid |
|
|
from model_api.models import Model |
|
|
from model_api.visualizer import Visualizer |
|
|
|
|
|
warnings.filterwarnings("ignore", message=".*Invalid file descriptor.*") |
|
|
|
|
|
if hasattr(asyncio, 'set_event_loop_policy'): |
|
|
try: |
|
|
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
current_model = None |
|
|
current_model_name = None |
|
|
visualizer = Visualizer() |
|
|
|
|
|
|
|
|
streaming = False |
|
|
|
|
|
|
|
|
def get_available_models(): |
|
|
""" |
|
|
Scan the models folder for .xml files and return list of model names. |
|
|
|
|
|
Returns: |
|
|
list: List of model names (without .xml extension) |
|
|
""" |
|
|
models_dir = Path("models") |
|
|
xml_files = list(models_dir.glob("*.xml")) |
|
|
model_names = [f.stem for f in xml_files] |
|
|
return sorted(model_names) |
|
|
|
|
|
|
|
|
def load_model(model_name: str, device: str = "CPU", confidence_threshold: float = 0.3): |
|
|
""" |
|
|
Load OpenVINO model using model_api. |
|
|
|
|
|
Args: |
|
|
model_name: Name of the model (without .xml extension) |
|
|
device: Inference device (CPU, GPU, etc.) |
|
|
confidence_threshold: Confidence threshold for predictions |
|
|
|
|
|
Returns: |
|
|
Model instance from model_api |
|
|
""" |
|
|
global current_model, current_model_name |
|
|
|
|
|
|
|
|
model_path = Path("models") / f"{model_name}.xml" |
|
|
|
|
|
if not model_path.exists(): |
|
|
raise FileNotFoundError(f"Model not found: {model_path}") |
|
|
|
|
|
print(f"Loading model: {model_name} with confidence threshold: {confidence_threshold}") |
|
|
|
|
|
|
|
|
configuration = {} |
|
|
if "YOLO" in model_name.upper(): |
|
|
|
|
|
configuration["confidence_threshold"] = confidence_threshold |
|
|
configuration["iou_threshold"] = 0.5 |
|
|
else: |
|
|
|
|
|
configuration["confidence_threshold"] = confidence_threshold |
|
|
|
|
|
model = Model.create_model(str(model_path), device=device, configuration=configuration) |
|
|
model.get_performance_metrics().reset() |
|
|
|
|
|
current_model = model |
|
|
current_model_name = model_name |
|
|
|
|
|
print(f"Model {model_name} loaded successfully") |
|
|
return model |
|
|
|
|
|
|
|
|
def run_inference( |
|
|
image: np.ndarray, |
|
|
model_name: str, |
|
|
confidence_threshold: float |
|
|
) -> Tuple[Image.Image, str]: |
|
|
""" |
|
|
Perform inference and return visualized result with metrics. |
|
|
|
|
|
Args: |
|
|
image: Input image as numpy array |
|
|
model_name: Name of the model to use |
|
|
confidence_threshold: Confidence threshold for filtering predictions |
|
|
|
|
|
Returns: |
|
|
Tuple of (visualized_image, metrics_text) |
|
|
""" |
|
|
|
|
|
if image is None: |
|
|
return None, "⚠️ Please upload an image first." |
|
|
|
|
|
if model_name is None or model_name == "No models available": |
|
|
return None, "⚠️ No model selected or available." |
|
|
|
|
|
try: |
|
|
model = load_model(model_name, confidence_threshold=confidence_threshold) |
|
|
|
|
|
|
|
|
result = model(image) |
|
|
|
|
|
metrics = model.get_performance_metrics() |
|
|
inference_time = metrics.get_inference_time() |
|
|
preprocess_time = metrics.get_preprocess_time() |
|
|
postprocess_time = metrics.get_postprocess_time() |
|
|
fps = metrics.get_fps() |
|
|
|
|
|
|
|
|
metrics_text = f"""🔄 Preprocessing: {preprocess_time.mean()*1000:.2f} ms |
|
|
⚙️ Inference: {inference_time.mean()*1000:.2f} ms |
|
|
📊 Postprocessing: {postprocess_time.mean()*1000:.2f} ms |
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ |
|
|
⏱️ Total Time: {(preprocess_time.mean() + inference_time.mean() + postprocess_time.mean())*1000:.2f} ms |
|
|
🎯 FPS: {fps:.2f} |
|
|
📈 Total Frames: {inference_time.count} |
|
|
""" |
|
|
|
|
|
|
|
|
print(f"Visualizing results with confidence threshold: {confidence_threshold}") |
|
|
visualized_image = visualizer.render(image, result) |
|
|
|
|
|
return visualized_image, metrics_text |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error during inference: {str(e)}" |
|
|
return None, error_msg |
|
|
|
|
|
|
|
|
def run_video_inference( |
|
|
video_path: str, |
|
|
model_name: str, |
|
|
confidence_threshold: float |
|
|
): |
|
|
""" |
|
|
Process video and return complete result with inference. |
|
|
|
|
|
Args: |
|
|
video_path: Path to input video file |
|
|
model_name: Name of the model to use |
|
|
confidence_threshold: Confidence threshold for filtering predictions |
|
|
|
|
|
Returns: |
|
|
Tuple of (output_video_path, metrics_text, start_btn_state, stop_btn_state) |
|
|
""" |
|
|
global streaming |
|
|
streaming = True |
|
|
|
|
|
if video_path is None: |
|
|
return None, "⚠️ Please upload a video first.", gr.update(interactive=True), gr.update(interactive=False) |
|
|
|
|
|
if model_name is None or model_name == "No models available": |
|
|
return None, "⚠️ No model selected or available.", gr.update(interactive=True), gr.update(interactive=False) |
|
|
|
|
|
try: |
|
|
|
|
|
model = load_model(model_name, confidence_threshold=confidence_threshold) |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
return None, "⚠️ Error: Could not open video file.", gr.update(interactive=True), gr.update(interactive=False) |
|
|
|
|
|
|
|
|
video_codec = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
|
desired_fps = fps if fps > 0 else 30 |
|
|
|
|
|
|
|
|
ret, frame = cap.read() |
|
|
if not ret or frame is None: |
|
|
return None, "⚠️ Error: Could not read video frames.", gr.update(interactive=True), gr.update(interactive=False) |
|
|
|
|
|
|
|
|
result = model(frame) |
|
|
result_image = visualizer.render(frame, result) |
|
|
height, width = result_image.shape[:2] |
|
|
|
|
|
|
|
|
output_video_name = f"/tmp/output_{uuid.uuid4()}.mp4" |
|
|
output_video = cv2.VideoWriter(output_video_name, video_codec, desired_fps, (width, height)) |
|
|
|
|
|
|
|
|
output_video.write(result_image) |
|
|
|
|
|
n_frames = 1 |
|
|
|
|
|
|
|
|
while streaming: |
|
|
ret, frame = cap.read() |
|
|
|
|
|
if not ret or frame is None: |
|
|
break |
|
|
|
|
|
|
|
|
result = model(frame) |
|
|
result_image = visualizer.render(frame, result) |
|
|
output_video.write(result_image) |
|
|
n_frames += 1 |
|
|
|
|
|
|
|
|
output_video.release() |
|
|
cap.release() |
|
|
|
|
|
|
|
|
metrics = model.get_performance_metrics() |
|
|
inference_time = metrics.get_inference_time() |
|
|
preprocess_time = metrics.get_preprocess_time() |
|
|
postprocess_time = metrics.get_postprocess_time() |
|
|
fps_metric = metrics.get_fps() |
|
|
|
|
|
final_metrics = f"""✅ Video Processing Complete! |
|
|
|
|
|
🔄 Preprocessing: {preprocess_time.mean()*1000:.2f} ms |
|
|
⚙️ Inference: {inference_time.mean()*1000:.2f} ms |
|
|
📊 Postprocessing: {postprocess_time.mean()*1000:.2f} ms |
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ |
|
|
⏱️ Total Time: {(preprocess_time.mean() + inference_time.mean() + postprocess_time.mean())*1000:.2f} ms |
|
|
🎯 Average FPS: {fps_metric:.2f} |
|
|
📈 Total Frames: {n_frames} |
|
|
""" |
|
|
|
|
|
|
|
|
if os.path.exists(output_video_name) and os.path.getsize(output_video_name) > 0: |
|
|
return output_video_name, final_metrics, gr.update(interactive=True), gr.update(interactive=False) |
|
|
else: |
|
|
return None, final_metrics + "\n⚠️ Final video file not available.", gr.update(interactive=True), gr.update(interactive=False) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error during video inference: {str(e)}" |
|
|
return None, error_msg, gr.update(interactive=True), gr.update(interactive=False) |
|
|
|
|
|
|
|
|
def stop_video_inference(): |
|
|
"""Stop video processing.""" |
|
|
global streaming |
|
|
streaming = False |
|
|
return "⏹️ Video processing stopped.", gr.update(interactive=True), gr.update(interactive=False) |
|
|
|
|
|
|
|
|
def run_webcam_inference( |
|
|
frame: np.ndarray, |
|
|
model_name: str, |
|
|
confidence_threshold: float |
|
|
) -> Tuple[Image.Image, str]: |
|
|
""" |
|
|
Process webcam stream - runs inference on captured camera frame. |
|
|
|
|
|
Args: |
|
|
frame: Input frame from webcam as numpy array |
|
|
model_name: Name of the model to use |
|
|
confidence_threshold: Confidence threshold for filtering predictions |
|
|
|
|
|
Returns: |
|
|
Tuple of (visualized_image, metrics_text) |
|
|
""" |
|
|
if frame is None: |
|
|
return None, "⚠️ No frame received from webcam." |
|
|
|
|
|
if model_name is None or model_name == "No models available": |
|
|
return None, "⚠️ No model selected or available." |
|
|
|
|
|
try: |
|
|
|
|
|
model = load_model(model_name, confidence_threshold=confidence_threshold) |
|
|
|
|
|
|
|
|
result = model(frame) |
|
|
|
|
|
|
|
|
visualized_image = visualizer.render(frame, result) |
|
|
|
|
|
|
|
|
metrics = model.get_performance_metrics() |
|
|
inference_time = metrics.get_inference_time() |
|
|
preprocess_time = metrics.get_preprocess_time() |
|
|
postprocess_time = metrics.get_postprocess_time() |
|
|
fps = metrics.get_fps() |
|
|
|
|
|
|
|
|
metrics_text = f"""🔄 Preprocessing: {preprocess_time.mean()*1000:.2f} ms |
|
|
⚙️ Inference: {inference_time.mean()*1000:.2f} ms |
|
|
📊 Postprocessing: {postprocess_time.mean()*1000:.2f} ms |
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ |
|
|
⏱️ Total Time: {(preprocess_time.mean() + inference_time.mean() + postprocess_time.mean())*1000:.2f} ms |
|
|
🎯 FPS: {fps:.2f} |
|
|
📈 Total Frames: {inference_time.count} |
|
|
""" |
|
|
|
|
|
return visualized_image, metrics_text |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error during webcam inference: {str(e)}" |
|
|
return None, error_msg |
|
|
|
|
|
|
|
|
def enable_video_buttons(video): |
|
|
"""Enable start button when video is uploaded.""" |
|
|
if video is not None: |
|
|
return gr.update(interactive=True), gr.update(interactive=False) |
|
|
else: |
|
|
return gr.update(interactive=False), gr.update(interactive=False) |
|
|
|
|
|
|
|
|
def format_results(result, confidence_threshold: float) -> str: |
|
|
""" |
|
|
Format model results (classification or detection) as text. |
|
|
|
|
|
Args: |
|
|
result: Result object from model_api |
|
|
confidence_threshold: Confidence threshold for filtering |
|
|
|
|
|
Returns: |
|
|
Formatted results text |
|
|
""" |
|
|
|
|
|
if hasattr(result, 'top_labels') and result.top_labels: |
|
|
results_text = "🔍 Classification Results:\n" |
|
|
results_text += "━" * 50 + "\n" |
|
|
|
|
|
filtered_labels = [ |
|
|
label for label in result.top_labels |
|
|
if label.confidence >= confidence_threshold |
|
|
] |
|
|
|
|
|
if filtered_labels: |
|
|
for i, label in enumerate(filtered_labels, 1): |
|
|
results_text += f"{i}. {label.name}: {label.confidence:.3f}\n" |
|
|
else: |
|
|
results_text += f"No predictions above confidence threshold {confidence_threshold:.2f}\n" |
|
|
|
|
|
|
|
|
elif hasattr(result, 'segmentedObjects') and result.segmentedObjects: |
|
|
results_text = "🔍 Detected Objects:\n" |
|
|
results_text += "━" * 50 + "\n" |
|
|
|
|
|
|
|
|
filtered_objects = [ |
|
|
obj for obj in result.segmentedObjects |
|
|
if obj.score >= confidence_threshold |
|
|
] |
|
|
|
|
|
if filtered_objects: |
|
|
from collections import Counter |
|
|
label_counts = Counter(obj.str_label for obj in filtered_objects) |
|
|
|
|
|
for i, obj in enumerate(filtered_objects, 1): |
|
|
x1, y1 = int(obj.xmin), int(obj.ymin) |
|
|
x2, y2 = int(obj.xmax), int(obj.ymax) |
|
|
results_text += f"{i}. {obj.str_label}: {obj.score:.3f} @ [{x1}, {y1}, {x2}, {y2}]\n" |
|
|
|
|
|
results_text += "\n📊 Summary:\n" |
|
|
for label, count in label_counts.most_common(): |
|
|
results_text += f" • {label}: {count}\n" |
|
|
else: |
|
|
results_text += f"No detections above confidence threshold {confidence_threshold:.2f}\n" |
|
|
|
|
|
else: |
|
|
results_text = "No results available\n" |
|
|
|
|
|
return results_text |
|
|
|
|
|
|
|
|
def create_gradio_interface(): |
|
|
""" |
|
|
Create and configure the Gradio interface. |
|
|
|
|
|
Returns: |
|
|
gr.Blocks: Configured Gradio interface |
|
|
""" |
|
|
available_models = get_available_models() |
|
|
|
|
|
if not available_models: |
|
|
print("Warning: No models found in models/ folder") |
|
|
available_models = ["No models available"] |
|
|
|
|
|
with gr.Blocks(title="OpenVINO with model_api") as demo: |
|
|
gr.Markdown("# 🎯 OpenVINO with model_api") |
|
|
gr.Markdown("Experience high-performance object detection powered by **OpenVINO™** and **model_api**. See real-time inference with detailed performance metrics.") |
|
|
|
|
|
with gr.Tabs() as tabs: |
|
|
with gr.TabItem("📸 Image Inference"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
input_image = gr.Image( |
|
|
label="Input Image", |
|
|
type="numpy" |
|
|
) |
|
|
|
|
|
model_dropdown = gr.Dropdown( |
|
|
choices=available_models, |
|
|
value=available_models[0] if available_models else None, |
|
|
label="Select Model", |
|
|
info="Choose a model from the models/ folder" |
|
|
) |
|
|
|
|
|
confidence_slider = gr.Slider( |
|
|
minimum=0.0, |
|
|
maximum=1.0, |
|
|
value=0.3, |
|
|
step=0.05, |
|
|
label="Confidence Threshold", |
|
|
info="Minimum confidence for displaying predictions" |
|
|
) |
|
|
|
|
|
classify_btn = gr.Button("🚀 Run Inference", variant="primary") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
output_image = gr.Image( |
|
|
label="Detection Result", |
|
|
type="pil", |
|
|
show_label=False |
|
|
) |
|
|
|
|
|
metrics_output = gr.Textbox( |
|
|
label="Performance Metrics", |
|
|
lines=8, |
|
|
max_lines=15 |
|
|
) |
|
|
|
|
|
|
|
|
classify_btn.click( |
|
|
fn=run_inference, |
|
|
inputs=[input_image, model_dropdown, confidence_slider], |
|
|
outputs=[output_image, metrics_output] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("## 📸 Example Images") |
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["examples/vehicles.png", "YOLO-11-N" if "YOLO-11-N" in available_models else available_models[0], 0.5], |
|
|
["examples/dog.jpg", "YOLO-11-S" if "YOLO-11-S" in available_models else available_models[0], 0.6], |
|
|
["examples/people-walking.png", "YOLO-11-M" if "YOLO-11-M" in available_models else available_models[0], 0.3], |
|
|
["examples/zidane.jpg", "resnet50" if "resnet50" in available_models else available_models[0], 0.5], |
|
|
], |
|
|
inputs=[input_image, model_dropdown, confidence_slider], |
|
|
outputs=[output_image, metrics_output], |
|
|
fn=run_inference, |
|
|
cache_examples=True |
|
|
) |
|
|
|
|
|
with gr.TabItem("🎥 Video Inference"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
input_video = gr.Video( |
|
|
label="Input Video" |
|
|
) |
|
|
|
|
|
video_model_dropdown = gr.Dropdown( |
|
|
choices=available_models, |
|
|
value=available_models[0] if available_models else None, |
|
|
label="Select Model", |
|
|
info="Choose a model from the models/ folder" |
|
|
) |
|
|
|
|
|
video_confidence_slider = gr.Slider( |
|
|
minimum=0.0, |
|
|
maximum=1.0, |
|
|
value=0.3, |
|
|
step=0.05, |
|
|
label="Confidence Threshold", |
|
|
info="Minimum confidence for displaying predictions" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
video_start_btn = gr.Button("▶️ Start Processing", variant="primary", interactive=False) |
|
|
video_stop_btn = gr.Button("⏹️ Stop", variant="stop", interactive=False) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
output_video = gr.Video( |
|
|
label="Processed Video", |
|
|
autoplay=True, |
|
|
show_label=False |
|
|
) |
|
|
|
|
|
video_metrics_output = gr.Textbox( |
|
|
label="Performance Metrics", |
|
|
lines=8, |
|
|
max_lines=15 |
|
|
) |
|
|
|
|
|
|
|
|
input_video.change( |
|
|
fn=enable_video_buttons, |
|
|
inputs=[input_video], |
|
|
outputs=[video_start_btn, video_stop_btn] |
|
|
) |
|
|
|
|
|
|
|
|
def start_processing_wrapper(video, model, conf): |
|
|
|
|
|
yield None, "🔄 Starting video processing...", gr.update(interactive=False), gr.update(interactive=True) |
|
|
|
|
|
result = run_video_inference(video, model, conf) |
|
|
yield result |
|
|
|
|
|
video_start_btn.click( |
|
|
fn=start_processing_wrapper, |
|
|
inputs=[input_video, video_model_dropdown, video_confidence_slider], |
|
|
outputs=[output_video, video_metrics_output, video_start_btn, video_stop_btn] |
|
|
) |
|
|
|
|
|
video_stop_btn.click( |
|
|
fn=stop_video_inference, |
|
|
inputs=None, |
|
|
outputs=[video_metrics_output, video_start_btn, video_stop_btn] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("## 🎬 Example Videos") |
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["examples/doggo.mp4", "YOLO-11-S" if "YOLO-11-S" in available_models else available_models[0], 0.4], |
|
|
["examples/basketball.mp4", "YOLO-11-N" if "YOLO-11-N" in available_models else available_models[0], 0.3], |
|
|
], |
|
|
inputs=[input_video, video_model_dropdown, video_confidence_slider], |
|
|
outputs=[output_video, video_metrics_output], |
|
|
fn=run_video_inference, |
|
|
cache_examples=True |
|
|
) |
|
|
|
|
|
with gr.TabItem("📹 Live Inference"): |
|
|
gr.Markdown("### Real-time inference using your webcam") |
|
|
gr.Markdown("⚠️ **Note:** Allow browser access to your webcam when prompted.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
webcam_input = gr.Image( |
|
|
sources=["webcam"], |
|
|
label="Webcam", |
|
|
type="numpy", |
|
|
streaming=True, |
|
|
show_label=False |
|
|
) |
|
|
|
|
|
webcam_model_dropdown = gr.Dropdown( |
|
|
choices=available_models, |
|
|
value=available_models[0] if available_models else None, |
|
|
label="Select Model", |
|
|
info="Choose a model from the models/ folder" |
|
|
) |
|
|
|
|
|
webcam_confidence_slider = gr.Slider( |
|
|
minimum=0.0, |
|
|
maximum=1.0, |
|
|
value=0.3, |
|
|
step=0.05, |
|
|
label="Confidence Threshold", |
|
|
info="Minimum confidence for displaying predictions" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
webcam_output = gr.Image( |
|
|
label="Detection Result", |
|
|
type="pil", |
|
|
show_label=False |
|
|
) |
|
|
|
|
|
webcam_metrics_output = gr.Textbox( |
|
|
label="Performance Metrics", |
|
|
lines=8, |
|
|
max_lines=15 |
|
|
) |
|
|
|
|
|
|
|
|
webcam_input.stream( |
|
|
fn=run_webcam_inference, |
|
|
inputs=[webcam_input, webcam_model_dropdown, webcam_confidence_slider], |
|
|
outputs=[webcam_output, webcam_metrics_output], |
|
|
time_limit=60, |
|
|
stream_every=0.1, |
|
|
concurrency_limit=16 |
|
|
) |
|
|
|
|
|
return demo |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo = create_gradio_interface() |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False, |
|
|
show_error=True, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ssr_mode=False, |
|
|
) |
|
|
|