import gradio as gr import os import json import uvicorn import time import base64 from fastapi import FastAPI, Response from fastapi.staticfiles import StaticFiles from game import game_engine from pydantic import BaseModel # --- Setup FastAPI for Static Files --- app = FastAPI() # Ensure directories exist os.makedirs("ui/static", exist_ok=True) app.mount("/static", StaticFiles(directory="ui/static"), name="static") # --- Global Logging --- LOG_BUFFER = [] def add_log(message): timestamp = time.strftime("%H:%M:%S") entry = f"[{timestamp}] {message}\n" + "-"*40 + "\n" LOG_BUFFER.append(entry) # Keep last 50 logs if len(LOG_BUFFER) > 50: LOG_BUFFER.pop(0) # --- API Bridge --- class BridgeRequest(BaseModel): action: str data: dict = {} class TTSRequest(BaseModel): text: str voice_id: str @app.post("/api/bridge") async def api_bridge(request: BridgeRequest): """Direct API endpoint for game logic communication.""" input_data = json.dumps({"action": request.action, "data": request.data}) # Log Request add_log(f"IN: {input_data}") print(f"API Bridge Received: {input_data}") response = session.handle_input(input_data) # Log Response if response: add_log(f"OUT: {json.dumps(response)}") return response or {} # --- Game Logic Wrapper --- class GameSession: def __init__(self): self.session_id = None self.game = None self.voice_enabled = False self.game_mode = "interactive" def start(self, difficulty="medium", mode="interactive", voice=True): self.session_id, self.game = game_engine.start_game(difficulty) self.voice_enabled = voice self.game_mode = mode return self._get_init_data() def _get_init_data(self): if not self.game: return None # Prepare static data for tools cameras = list(self.game.scenario["evidence"]["footage_data"].keys()) dna_map = {} for k, v in self.game.scenario["evidence"]["dna_evidence"].items(): dna_map[k] = v.get("label", k) # Fallback to ID if no label return { "action": "init_game", "data": { "scenario": self.game.scenario, "round": self.game.round, "points": self.game.points, "available_cameras": cameras, "dna_map": dna_map, "unlocked_evidence": self.game.unlocked_evidence, "mode": self.game_mode } } def handle_input(self, input_json): if not input_json: return None try: data = json.loads(input_json) except: return None action = data.get("action") payload = data.get("data", {}) if action == "ready": # Wait for explicit start from Gradio UI, or return existing state if self.game: return self._get_init_data() return None # Wait for user to pick case if not self.game: return None if action == "ai_step": step_data = self.game.run_ai_step() # Format result for frontend # We need to map the AI's internal result to frontend actions actions = [] # 1. Thought Bubble actions.append({ "action": "ai_thought", "data": {"thought": step_data["thought"]} }) res = step_data["result"] res_type = step_data["action"] # use_tool, chat, accuse if res_type == "use_tool": # We need to construct the same evidence payload as handle_input tool_name = step_data["result"].get("location") # Wait, result of use_tool is the raw dict # I need the tool name from decision # Re-running logic here is messy. # Better: run_ai_step should return enough info. # It returns "result" which is the output of use_tool. # But we don't know which tool it was easily unless we parse "action" or store it. # In run_ai_step I did: # tool_name = decision.get("tool_name") # kwargs = decision.get("args", {}) # Let's update run_ai_step to include tool_name in result wrapper? # No, let's just infer or update run_ai_step. # Actually, the loop in JS will call this. # I'll handle formatting here. pass return { "action": "ai_step_result", "data": step_data } if action == "select_suspect": return None if action == "next_round": if self.game.advance_round(): return { "action": "update_status", "data": { "round": self.game.round, "points": self.game.points } } else: return { "action": "game_over", "data": { "message": "COLD CASE. You ran out of time.", "verdict": False } } if action == "chat_message": suspect_id = payload.get("suspect_id") message = payload.get("message") response = self.game.question_suspect(suspect_id, message) suspect = next((s for s in self.game.scenario["suspects"] if s["id"] == suspect_id), None) suspect_name = next((s["name"] for s in self.game.scenario["suspects"] if s["id"] == suspect_id), "Suspect") # Clean text for ElevenLabs cleaned_response = response # Remove text within leading parentheses (e.g., "(A bit defensively)") if cleaned_response.strip().startswith('(') and ')' in cleaned_response: end_paren_idx = cleaned_response.find(')') if end_paren_idx != -1: cleaned_response = cleaned_response[end_paren_idx + 1:].strip() # Remove all asterisks cleaned_response = cleaned_response.replace('*', '') # Trim leading/trailing whitespace cleaned_response = cleaned_response.strip() # Generate Audio audio_b64 = None # Check Voice Enabled if self.voice_enabled and suspect and "voice_id" in suspect and cleaned_response: audio_bytes = self.game.voice_manager.generate_audio(cleaned_response, suspect["voice_id"]) if audio_bytes: audio_b64 = "data:audio/mpeg;base64," + base64.b64encode(audio_bytes).decode('utf-8') return { "action": "update_chat", "data": { "role": "suspect", "name": suspect_name, "content": response, "audio": audio_b64 } } if action == "use_tool": tool_name = payload.get("tool") arg = payload.get("input") # Default for single-input tools if tool_name == "accuse": result = self.game.make_accusation(payload.get("suspect_id")) if result["result"] == "win": return { "action": "game_over", "data": { "message": result["message"], "verdict": True } } elif result["result"] == "loss": return { "action": "game_over", "data": { "message": result["message"], "verdict": False } } else: return { "action": "round_failure", "data": { "message": result["message"], "eliminated_id": result["eliminated_id"], "round": result["new_round"], "points": result["new_points"] } } kwargs = {} if tool_name == "get_location": kwargs = {"phone_number": arg} elif tool_name == "call_alibi": # Support both simple string (old) and structured (new) if "alibi_id" in payload: arg = payload.get("alibi_id") # Update arg for formatter kwargs = { "alibi_id": arg, "question": payload.get("question") } else: kwargs = {"phone_number": arg} # Fallback elif tool_name == "get_dna_test": kwargs = {"evidence_id": arg} elif tool_name == "get_footage": kwargs = {"location": arg} result = self.game.use_tool(tool_name, **kwargs) if "error" in result: return { "action": "tool_error", "data": {"message": result["error"]} } # Format the result nicely evidence_data = format_tool_response(tool_name, arg, result, self.game.scenario) # Include updated points and unlocks in response evidence_data["updated_points"] = self.game.points evidence_data["unlocked_evidence"] = self.game.unlocked_evidence if "newly_unlocked" in result and result["newly_unlocked"]: evidence_data["newly_unlocked"] = result["newly_unlocked"] return { "action": "add_evidence", "data": evidence_data } return None def format_tool_response(tool_name, arg, result, scenario): """Formats tool output into HTML and finds associated suspect.""" suspect_id = None suspect_name = None html = "" title = f"Tool: {tool_name}" # Helpers to find suspect def find_by_phone(phone): clean_input = "".join(filter(str.isdigit, str(phone))) for s in scenario["suspects"]: s_phone = "".join(filter(str.isdigit, str(s.get("phone_number", "")))) if clean_input and s_phone.endswith(clean_input): return s return None def find_by_name(name): for s in scenario["suspects"]: if s["name"].lower() == name.lower(): return s return None def find_by_alibi_id(aid): for s in scenario["suspects"]: if s.get("alibi_id") == aid: return s return None # Logic per tool if tool_name == "get_location": suspect = find_by_phone(arg) if suspect: suspect_id = suspect["id"] suspect_name = suspect["name"] title = f"📍 Location Data" if "history" in result: html += "" elif "description" in result: html += f"
Time: {result.get('timestamp')}
" html += f"
Loc: {result.get('description')}
" elif "error" in result: html += f"
{result['error']}
" else: html += str(result) elif tool_name == "call_alibi": suspect = find_by_phone(arg) # Try phone first if not suspect: suspect = find_by_alibi_id(arg) # Try ID if suspect: suspect_id = suspect["id"] suspect_name = suspect["name"] title = f"📞 Alibi Check" if "error" in result: html += f"
{result['error']}
" else: html += f"
Contact: {result.get('contact_name')}
" html += f"
\"{result.get('response')}\"
" html += f"
Confidence: {result.get('confidence')}
" elif tool_name == "get_dna_test": # Get the label for the evidence item evidence_label = scenario["evidence"]["dna_evidence"].get(arg, {}).get("label", arg) title = f"🧬 DNA Result for {evidence_label}" if "matches" in result: # Multiple matches html += f"
Mixed Sample:
" html += f"
Notes: {result.get('notes')}
" # Note: We don't auto-assign a suspect_id for mixed results to avoid cluttering one card else: # Single match if "primary_match" in result and result["primary_match"] != "Unknown": suspect = find_by_name(result["primary_match"]) if suspect: suspect_id = suspect["id"] suspect_name = suspect["name"] if "error" in result: html += f"
{result['error']}
" else: html += f"
Match: {result.get('primary_match')}
" html += f"
Confidence: {result.get('confidence')}
" html += f"
Notes: {result.get('notes')}
" elif tool_name == "get_footage": title = "📹 Security Footage" if "error" in result: html += f"
{result['error']}
" else: html += f"
Cam: {result.get('location')}
" html += f"
Time: {result.get('time_range')}
" html += f"
Visible: {', '.join(result.get('visible_people', []))}
" html += f"
Detail: {result.get('key_details')}
" else: html = str(result) return { "title": title, "html_content": html, "suspect_id": suspect_id, "suspect_name": suspect_name } session = GameSession() # --- Gradio App --- def get_game_iframe(): with open("ui/templates/game_interface.html", "r") as f: html_content = f.read() html_content = html_content.replace('../static/', '/static/') html_content_escaped = html_content.replace('"', '"') # Iframe is hidden initially iframe = f""" """ return iframe def start_game_from_ui(case_name, mode, voice): difficulty = "medium" if "Coffee" in case_name: difficulty = "easy" if "Gallery" in case_name: difficulty = "hard" mode_slug = "spectator" if "Spectator" in mode else "interactive" init_data = session.start(difficulty, mode_slug, voice) # Extract data for tools phones = [s["phone_number"] for s in init_data["data"]["scenario"]["suspects"]] cameras = init_data["data"]["available_cameras"] suspects = [s["name"] for s in init_data["data"]["scenario"]["suspects"]] # Return visible updates return ( gr.update(visible=False), # Hide selector row gr.update(visible=True), # Show game frame json.dumps(init_data), # Send init data to bridge gr.update(choices=phones, value=phones[0] if phones else None), gr.update(choices=cameras, value=cameras[0] if cameras else None), gr.update(choices=suspects, value=suspects[0] if suspects else None) ) css = """ #bridge-input, #bridge-output, #log-input { display: none !important; } /* Remove viewport-based height constraints */ .gradio-container { padding: 0 !important; max-width: 100% !important; display: flex; flex-direction: column; } /* Fixed height for game container */ #game-frame-container { height: 70vh !important; min-height: 500px; max-height: 800px; border: none; overflow: hidden; padding: 0; margin-bottom: 2rem; } #game-frame-container > .html-container { height: 100% !important; display: flex; flex-direction: column; } #game-frame-container .prose { height: 100% !important; max-width: 100% !important; } /* Setup screen styling */ #setup-container { max-width: 800px; margin: 2rem auto; padding: 2rem; } footer { display: none !important; } """ with gr.Blocks(title="Murder.Ai") as demo: gr.HTML(f"") # --- Initial Setup Screen --- with gr.Row(elem_id="setup-container", visible=True) as setup_col: with gr.Column(): gr.Markdown("# 🕵️ MURDER.AI") gr.Markdown("### 1. Select Case File") case_dropdown = gr.Dropdown( choices=["The Silicon Valley Incident (Medium)", "The Coffee Shop Murder (Easy)", "The Gallery Heist (Hard)"], value="The Silicon Valley Incident (Medium)", show_label=False ) gr.Markdown("### 2. Game Configuration") with gr.Row(): game_mode = gr.Radio(["Interactive", "AI Spectator (Beta)"], value="Interactive", label="Game Mode") voice_toggle = gr.Checkbox(value=True, label="Enable Voice (ElevenLabs)") gr.Markdown("### 3. Investigation") start_btn = gr.Button("📂 OPEN CASE FILE", variant="primary", size="lg") # Game Frame (Hidden Initially) with gr.Group(visible=False, elem_id="game-frame-container") as game_group: game_html = gr.HTML(value=get_game_iframe()) bridge_input = gr.Textbox(elem_id="bridge-input", visible=True) bridge_output = gr.Textbox(elem_id="bridge-output", visible=True) log_input = gr.Textbox(elem_id="log-input", visible=True) # Input from JS for logs # Log Box with gr.Accordion("System Logs (MCP Traffic)", open=False): with gr.Row(): refresh_logs_btn = gr.Button("🔄 Refresh Logs", scale=0) auto_refresh = gr.Checkbox(label="Auto-refresh (1s)", value=False, scale=0) log_box = gr.Textbox(label="Traffic", lines=10, max_lines=10, interactive=False, autoscroll=True, elem_id="visible-log-box") # Log Polling log_timer = gr.Timer(1, active=False) def poll_logs(): return "".join(LOG_BUFFER) log_timer.tick(fn=poll_logs, outputs=log_box) refresh_logs_btn.click(fn=poll_logs, outputs=log_box) def toggle_timer(active): return gr.Timer(active=active) auto_refresh.change(fn=toggle_timer, inputs=auto_refresh, outputs=log_timer) # --- Footer / Showcase --- gr.Markdown("---") gr.Markdown("### 🎥 How to Play (Demo)") gr.HTML('') gr.Markdown("# 🛠️ Try MCP Tools Directly (Click Open Case Files first)") gr.Markdown("*Note: Start a game first to populate these tools.*") with gr.Tabs(): with gr.Tab("📍 Location"): loc_phone = gr.Radio(label="Select Phone Number", choices=[], interactive=True) loc_btn = gr.Button("Get Location") loc_out = gr.JSON(label="Result") with gr.Tab("📹 Footage"): foot_cam = gr.Radio(label="Select Camera", choices=[], interactive=True) foot_btn = gr.Button("Get Footage") foot_out = gr.JSON(label="Result") with gr.Tab("🧬 DNA"): dna_id = gr.Textbox(label="Evidence ID") dna_btn = gr.Button("Test DNA") dna_out = gr.JSON(label="Result") with gr.Tab("📞 Alibi"): alibi_id = gr.Textbox(label="Alibi ID") alibi_q = gr.Textbox(label="Question", value="Where were they?") alibi_btn = gr.Button("Call Alibi") alibi_out = gr.JSON(label="Result") with gr.Tab("💬 Interrogate"): int_suspect = gr.Radio(label="Select Suspect", choices=[], interactive=True) int_q = gr.Textbox(label="Question", value="Where were you?") int_btn = gr.Button("Interrogate") int_out_text = gr.Markdown(label="Response") int_out_audio = gr.Audio(label="Voice Response", autoplay=True) # --- Event Handlers for Tools --- def wrap_tool(tool_name, *args): if not session.game: return {"error": "Start game first"} kwargs = {} if tool_name == "get_location": kwargs = {"phone_number": args[0]} elif tool_name == "get_footage": kwargs = {"location": args[0]} elif tool_name == "get_dna_test": kwargs = {"evidence_id": args[0]} elif tool_name == "call_alibi": kwargs = {"alibi_id": args[0], "question": args[1]} return session.game.use_tool(tool_name, **kwargs) loc_btn.click(lambda p: wrap_tool("get_location", p), inputs=loc_phone, outputs=loc_out) foot_btn.click(lambda c: wrap_tool("get_footage", c), inputs=foot_cam, outputs=foot_out) dna_btn.click(lambda e: wrap_tool("get_dna_test", e), inputs=dna_id, outputs=dna_out) alibi_btn.click(lambda i, q: wrap_tool("call_alibi", i, q), inputs=[alibi_id, alibi_q], outputs=alibi_out) def wrap_chat(suspect_name, question): if not session.game: return "Start game first", None # Find ID from name s_id = next((s["id"] for s in session.game.scenario["suspects"] if s["name"] == suspect_name), None) if not s_id: return "Suspect not found", None resp = session.game.question_suspect(s_id, question) # Audio audio_path = None suspect = next((s for s in session.game.scenario["suspects"] if s["id"] == s_id), None) # Clean text cleaned = resp if cleaned.strip().startswith('(') and ')' in cleaned: cleaned = cleaned[cleaned.find(')')+1:].strip() cleaned = cleaned.replace('*', '').strip() if suspect and "voice_id" in suspect and cleaned: audio_bytes = session.game.voice_manager.generate_audio(cleaned, suspect["voice_id"]) if audio_bytes: import tempfile with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: fp.write(audio_bytes) audio_path = fp.name return resp, audio_path int_btn.click(wrap_chat, inputs=[int_suspect, int_q], outputs=[int_out_text, int_out_audio]) # Start Game Event start_btn.click( fn=start_game_from_ui, inputs=[case_dropdown, game_mode, voice_toggle], outputs=[setup_col, game_group, bridge_output, loc_phone, foot_cam, int_suspect] ) # Bridge Logic with Logging (Legacy/Fallback) def bridge_logic_with_log(input_data, current_log): # ... existing logic ... return None, None # Disabled # Bridge Logic (Python -> JS) bridge_output.change( None, inputs=[bridge_output], js=""" (data) => { if (!data) return; const iframe = document.querySelector('#game-frame-container iframe'); if (iframe && iframe.contentWindow) { iframe.contentWindow.postMessage(JSON.parse(data), '*'); } } """ ) app = gr.mount_gradio_app(app, demo, path="/") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)