#!/usr/bin/env python3 """ VORTEX@SANDBOX Agent Control API Ultra-fast REST + WebSocket interface for AI agents """ import asyncio import base64 import os import subprocess import json from typing import Optional from io import BytesIO from fastapi import FastAPI, WebSocket, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import uvicorn # Optional imports with fallbacks try: import pyautogui pyautogui.FAILSAFE = False pyautogui.PAUSE = 0.01 # Minimal delay for speed except: pyautogui = None try: import mss except: mss = None try: from PIL import Image except: Image = None app = FastAPI( title="vortex@sandbox API", description="Agent control interface for containerized Linux desktop", version="1.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # ============================================ # Request Models # ============================================ class MouseMove(BaseModel): x: int y: int class MouseClick(BaseModel): x: int y: int button: str = "left" clicks: int = 1 class KeyPress(BaseModel): key: str modifiers: list[str] = [] class TypeText(BaseModel): text: str interval: float = 0.01 class Screenshot(BaseModel): region: Optional[list[int]] = None # [x, y, w, h] format: str = "png" class RunCommand(BaseModel): command: str timeout: int = 30 class Navigate(BaseModel): url: str # ============================================ # Core Endpoints # ============================================ @app.get("/") async def root(): return { "name": "vortex@sandbox", "version": "1.0.0", "status": "running", "endpoints": { "mouse": "/mouse/move, /mouse/click", "keyboard": "/key, /type", "screen": "/screenshot", "system": "/exec, /health", "browser": "/navigate, /cdp" } } @app.get("/health") async def health(): return {"status": "healthy", "display": os.environ.get("DISPLAY", ":99")} # ============================================ # Mouse Control # ============================================ @app.post("/mouse/move") async def mouse_move(req: MouseMove): if pyautogui: pyautogui.moveTo(req.x, req.y, duration=0) else: subprocess.run(["xdotool", "mousemove", str(req.x), str(req.y)]) return {"moved": [req.x, req.y]} @app.post("/mouse/click") async def mouse_click(req: MouseClick): if pyautogui: pyautogui.click(req.x, req.y, clicks=req.clicks, button=req.button) else: btn = {"left": "1", "middle": "2", "right": "3"}.get(req.button, "1") subprocess.run(["xdotool", "mousemove", str(req.x), str(req.y)]) for _ in range(req.clicks): subprocess.run(["xdotool", "click", btn]) return {"clicked": [req.x, req.y], "button": req.button} @app.post("/mouse/scroll") async def mouse_scroll(direction: str = "down", amount: int = 3): if pyautogui: pyautogui.scroll(-amount if direction == "down" else amount) else: btn = "5" if direction == "down" else "4" for _ in range(amount): subprocess.run(["xdotool", "click", btn]) return {"scrolled": direction, "amount": amount} # ============================================ # Keyboard Control # ============================================ @app.post("/key") async def key_press(req: KeyPress): if pyautogui: if req.modifiers: pyautogui.hotkey(*req.modifiers, req.key) else: pyautogui.press(req.key) else: key_combo = "+".join(req.modifiers + [req.key]) if req.modifiers else req.key subprocess.run(["xdotool", "key", key_combo]) return {"pressed": req.key, "modifiers": req.modifiers} @app.post("/type") async def type_text(req: TypeText): if pyautogui: pyautogui.write(req.text, interval=req.interval) else: subprocess.run(["xdotool", "type", "--delay", str(int(req.interval*1000)), req.text]) return {"typed": req.text} @app.post("/hotkey") async def hotkey(keys: list[str]): if pyautogui: pyautogui.hotkey(*keys) else: subprocess.run(["xdotool", "key", "+".join(keys)]) return {"hotkey": keys} # ============================================ # Screenshot # ============================================ @app.post("/screenshot") async def screenshot(req: Screenshot = Screenshot()): try: if mss: with mss.mss() as sct: monitor = sct.monitors[1] if req.region: monitor = {"left": req.region[0], "top": req.region[1], "width": req.region[2], "height": req.region[3]} img = sct.grab(monitor) if Image: pil_img = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX") buffer = BytesIO() pil_img.save(buffer, format=req.format.upper()) data = base64.b64encode(buffer.getvalue()).decode() return {"success": True, "format": req.format, "data": data, "size": [img.width, img.height]} # Fallback to scrot result = subprocess.run(["scrot", "-o", "/tmp/screen.png"], capture_output=True) with open("/tmp/screen.png", "rb") as f: data = base64.b64encode(f.read()).decode() return {"success": True, "format": "png", "data": data} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/screenshot.png") async def screenshot_direct(): """Direct PNG download""" try: subprocess.run(["scrot", "-o", "/tmp/screen.png"], capture_output=True) with open("/tmp/screen.png", "rb") as f: data = f.read() from fastapi.responses import Response return Response(content=data, media_type="image/png") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================ # System Commands # ============================================ @app.post("/exec") async def exec_command(req: RunCommand): try: result = subprocess.run( req.command, shell=True, capture_output=True, text=True, timeout=req.timeout ) return { "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, "code": result.returncode } except subprocess.TimeoutExpired: raise HTTPException(status_code=408, detail="Command timed out") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================ # Browser Control (via CDP) # ============================================ @app.post("/navigate") async def navigate(req: Navigate): """Navigate browser via xdotool (focus + Ctrl+L + URL + Enter)""" try: # Focus Chromium window subprocess.run(["xdotool", "search", "--name", "Chromium", "windowactivate"], timeout=2) await asyncio.sleep(0.1) # Ctrl+L to focus address bar subprocess.run(["xdotool", "key", "ctrl+l"], timeout=1) await asyncio.sleep(0.1) # Type URL subprocess.run(["xdotool", "type", "--delay", "10", req.url], timeout=10) await asyncio.sleep(0.1) # Press Enter subprocess.run(["xdotool", "key", "Return"], timeout=1) return {"navigated": req.url} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/cdp") async def get_cdp_info(): """Get Chrome DevTools Protocol endpoint info""" try: import httpx async with httpx.AsyncClient() as client: resp = await client.get("http://localhost:9222/json/version", timeout=5) return resp.json() except: return {"error": "CDP not available", "port": 9222} # ============================================ # WebSocket for Real-time Control # ============================================ @app.websocket("/ws") async def websocket_control(ws: WebSocket): """WebSocket for streaming commands""" await ws.accept() try: while True: data = await ws.receive_text() cmd = json.loads(data) action = cmd.get("action") result = {"error": "unknown action"} if action == "move": subprocess.run(["xdotool", "mousemove", str(cmd["x"]), str(cmd["y"])]) result = {"moved": [cmd["x"], cmd["y"]]} elif action == "click": subprocess.run(["xdotool", "mousemove", str(cmd["x"]), str(cmd["y"])]) subprocess.run(["xdotool", "click", "1"]) result = {"clicked": [cmd["x"], cmd["y"]]} elif action == "type": subprocess.run(["xdotool", "type", cmd["text"]]) result = {"typed": cmd["text"]} elif action == "key": subprocess.run(["xdotool", "key", cmd["key"]]) result = {"pressed": cmd["key"]} elif action == "screenshot": subprocess.run(["scrot", "-o", "/tmp/ws_screen.png"]) with open("/tmp/ws_screen.png", "rb") as f: img_data = base64.b64encode(f.read()).decode() result = {"screenshot": img_data} await ws.send_text(json.dumps(result)) except Exception as e: await ws.close() # ============================================ # Run Server # ============================================ if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8080, log_level="warning")