File size: 8,254 Bytes
eb09c29
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#!/usr/bin/env python3
"""
Test script to verify video processing functionality.
Creates a synthetic test video and tests the prediction pipeline.
"""

import sys
import tempfile
import logging
from pathlib import Path
import numpy as np
from PIL import Image, ImageDraw
import cv2

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def create_synthetic_video(output_path: Path, duration_seconds: float = 2.0, fps: int = 24):
    """Create a synthetic test video with simple animation."""

    width, height = 640, 480
    total_frames = int(duration_seconds * fps)

    # Create video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))

    logging.info(f"Creating synthetic video: {total_frames} frames at {fps} FPS")

    for frame_num in range(total_frames):
        # Create a frame with animated content
        frame = np.zeros((height, width, 3), dtype=np.uint8)

        # Add background gradient
        for y in range(height):
            intensity = int(255 * (y / height))
            frame[y, :] = [intensity // 3, intensity // 2, intensity]

        # Add moving circle (simulating an action)
        center_x = int(width * (0.2 + 0.6 * frame_num / total_frames))
        center_y = height // 2
        radius = 30 + int(20 * np.sin(frame_num * 0.3))

        # Convert to PIL for drawing
        pil_frame = Image.fromarray(frame)
        draw = ImageDraw.Draw(pil_frame)

        # Draw moving circle
        left = center_x - radius
        top = center_y - radius
        right = center_x + radius
        bottom = center_y + radius
        draw.ellipse([left, top, right, bottom], fill=(255, 255, 0))

        # Add some text to simulate action
        draw.text((50, 50), f"Frame {frame_num}", fill=(255, 255, 255))
        draw.text((50, 80), "Synthetic Action", fill=(255, 255, 255))

        # Convert back to numpy and BGR for OpenCV
        frame = np.array(pil_frame)
        frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        out.write(frame_bgr)

    out.release()
    logging.info(f"βœ“ Created synthetic video: {output_path}")
    return output_path

def test_video_reading():
    """Test video reading functionality without full model inference."""

    logging.info("=== Testing Video Reading ===")

    try:
        from predict import _read_video_frames, normalize_frames

        with tempfile.TemporaryDirectory() as tmp_dir:
            tmp_path = Path(tmp_dir)
            video_path = tmp_path / "test_video.mp4"

            # Create test video
            create_synthetic_video(video_path, duration_seconds=1.0, fps=12)  # Short video

            # Test reading frames
            logging.info("Testing frame reading...")
            frames = _read_video_frames(video_path, num_frames=8)

            if not frames:
                logging.error("βœ— No frames extracted")
                return False

            logging.info(f"βœ“ Extracted {len(frames)} frames")

            # Test frame normalization
            logging.info("Testing frame normalization...")
            normalized = normalize_frames(frames, required_frames=8)

            if len(normalized) != 8:
                logging.error(f"βœ— Expected 8 frames, got {len(normalized)}")
                return False

            logging.info("βœ“ Frame normalization successful")

            # Check frame properties
            for i, frame in enumerate(normalized):
                if frame.size != (224, 224):
                    logging.error(f"βœ— Frame {i} has wrong size: {frame.size}")
                    return False
                if frame.mode != 'RGB':
                    logging.error(f"βœ— Frame {i} has wrong mode: {frame.mode}")
                    return False

            logging.info("βœ“ All frames have correct properties")
            return True

    except Exception as e:
        logging.error(f"βœ— Video reading test failed: {e}")
        return False

def test_tensor_creation():
    """Test tensor creation from frames."""

    logging.info("=== Testing Tensor Creation ===")

    try:
        from predict import create_tensor_from_frames
        import torch

        # Create dummy frames
        frames = []
        for i in range(8):
            frame = Image.new('RGB', (224, 224), (i*30 % 255, 100, 150))
            frames.append(frame)

        logging.info("Testing tensor creation...")
        tensor = create_tensor_from_frames(frames, processor=None)  # Use manual creation

        # Check tensor properties
        expected_shape = (1, 3, 8, 224, 224)  # (batch, channels, frames, height, width)
        if tensor.shape != expected_shape:
            logging.error(f"βœ— Expected shape {expected_shape}, got {tensor.shape}")
            return False

        logging.info(f"βœ“ Tensor created with correct shape: {tensor.shape}")

        # Check tensor values are in reasonable range
        if tensor.min() < 0 or tensor.max() > 1:
            logging.warning(f"⚠ Tensor values outside [0,1]: [{tensor.min():.3f}, {tensor.max():.3f}]")

        logging.info("βœ“ Tensor creation successful")
        return True

    except Exception as e:
        logging.error(f"βœ— Tensor creation test failed: {e}")
        return False

def test_full_pipeline():
    """Test the complete prediction pipeline with a synthetic video."""

    logging.info("=== Testing Full Pipeline ===")

    try:
        from predict import predict_actions

        with tempfile.TemporaryDirectory() as tmp_dir:
            tmp_path = Path(tmp_dir)
            video_path = tmp_path / "test_video.mp4"

            # Create test video
            create_synthetic_video(video_path, duration_seconds=2.0, fps=15)

            logging.info("Running full prediction pipeline...")

            # Run prediction with smaller top_k for faster testing
            results = predict_actions(str(video_path), top_k=3)

            if not results:
                logging.error("βœ— No predictions returned")
                return False

            logging.info(f"βœ“ Got {len(results)} predictions")

            # Display results
            for i, (label, confidence) in enumerate(results, 1):
                logging.info(f"  {i}. {label}: {confidence:.3f}")

            # Basic validation
            if len(results) != 3:
                logging.error(f"βœ— Expected 3 results, got {len(results)}")
                return False

            for label, confidence in results:
                if not isinstance(label, str) or not isinstance(confidence, float):
                    logging.error(f"βœ— Invalid result format: {label}, {confidence}")
                    return False
                if confidence < 0 or confidence > 1:
                    logging.error(f"βœ— Invalid confidence: {confidence}")
                    return False

            logging.info("βœ“ Full pipeline test successful")
            return True

    except Exception as e:
        logging.error(f"βœ— Full pipeline test failed: {e}")
        logging.exception("Full error traceback:")
        return False

def main():
    """Run all tests."""

    print("πŸ§ͺ Video Processing Test Suite")
    print("=" * 50)

    tests = [
        ("Video Reading", test_video_reading),
        ("Tensor Creation", test_tensor_creation),
        ("Full Pipeline", test_full_pipeline),
    ]

    passed = 0
    total = len(tests)

    for test_name, test_func in tests:
        print(f"\nπŸ” Running: {test_name}")
        print("-" * 30)

        try:
            if test_func():
                print(f"βœ… {test_name} PASSED")
                passed += 1
            else:
                print(f"❌ {test_name} FAILED")
        except Exception as e:
            print(f"πŸ’₯ {test_name} CRASHED: {e}")
            logging.exception(f"Test {test_name} crashed:")

    print(f"\nπŸ“Š Test Results: {passed}/{total} tests passed")

    if passed == total:
        print("πŸŽ‰ All tests passed! Video processing is working correctly.")
        return 0
    else:
        print("⚠️  Some tests failed. Check the logs above for details.")
        return 1

if __name__ == "__main__":
    exit(main())