Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import json | |
| import uvicorn | |
| import time | |
| import base64 | |
| from fastapi import FastAPI, Response | |
| from fastapi.staticfiles import StaticFiles | |
| from game import game_engine | |
| from pydantic import BaseModel | |
| # --- Setup FastAPI for Static Files --- | |
| app = FastAPI() | |
| # Ensure directories exist | |
| os.makedirs("ui/static", exist_ok=True) | |
| app.mount("/static", StaticFiles(directory="ui/static"), name="static") | |
| # --- Global Logging --- | |
| LOG_BUFFER = [] | |
| def add_log(message): | |
| timestamp = time.strftime("%H:%M:%S") | |
| entry = f"[{timestamp}] {message}\n" + "-"*40 + "\n" | |
| LOG_BUFFER.append(entry) | |
| # Keep last 50 logs | |
| if len(LOG_BUFFER) > 50: | |
| LOG_BUFFER.pop(0) | |
| # --- API Bridge --- | |
| class BridgeRequest(BaseModel): | |
| action: str | |
| data: dict = {} | |
| class TTSRequest(BaseModel): | |
| text: str | |
| voice_id: str | |
| async def api_bridge(request: BridgeRequest): | |
| """Direct API endpoint for game logic communication.""" | |
| input_data = json.dumps({"action": request.action, "data": request.data}) | |
| # Log Request | |
| add_log(f"IN: {input_data}") | |
| print(f"API Bridge Received: {input_data}") | |
| response = session.handle_input(input_data) | |
| # Log Response | |
| if response: | |
| add_log(f"OUT: {json.dumps(response)}") | |
| return response or {} | |
| # --- Game Logic Wrapper --- | |
| class GameSession: | |
| def __init__(self): | |
| self.session_id = None | |
| self.game = None | |
| self.voice_enabled = False | |
| self.game_mode = "interactive" | |
| def start(self, difficulty="medium", mode="interactive", voice=True): | |
| self.session_id, self.game = game_engine.start_game(difficulty) | |
| self.voice_enabled = voice | |
| self.game_mode = mode | |
| return self._get_init_data() | |
| def _get_init_data(self): | |
| if not self.game: | |
| return None | |
| # Prepare static data for tools | |
| cameras = list(self.game.scenario["evidence"]["footage_data"].keys()) | |
| dna_map = {} | |
| for k, v in self.game.scenario["evidence"]["dna_evidence"].items(): | |
| dna_map[k] = v.get("label", k) # Fallback to ID if no label | |
| return { | |
| "action": "init_game", | |
| "data": { | |
| "scenario": self.game.scenario, | |
| "round": self.game.round, | |
| "points": self.game.points, | |
| "available_cameras": cameras, | |
| "dna_map": dna_map, | |
| "unlocked_evidence": self.game.unlocked_evidence, | |
| "mode": self.game_mode | |
| } | |
| } | |
| def handle_input(self, input_json): | |
| if not input_json: | |
| return None | |
| try: | |
| data = json.loads(input_json) | |
| except: | |
| return None | |
| action = data.get("action") | |
| payload = data.get("data", {}) | |
| if action == "ready": | |
| # Wait for explicit start from Gradio UI, or return existing state | |
| if self.game: | |
| return self._get_init_data() | |
| return None # Wait for user to pick case | |
| if not self.game: | |
| return None | |
| if action == "ai_step": | |
| step_data = self.game.run_ai_step() | |
| # Format result for frontend | |
| # We need to map the AI's internal result to frontend actions | |
| actions = [] | |
| # 1. Thought Bubble | |
| actions.append({ | |
| "action": "ai_thought", | |
| "data": {"thought": step_data["thought"]} | |
| }) | |
| res = step_data["result"] | |
| res_type = step_data["action"] # use_tool, chat, accuse | |
| if res_type == "use_tool": | |
| # We need to construct the same evidence payload as handle_input | |
| tool_name = step_data["result"].get("location") # Wait, result of use_tool is the raw dict | |
| # I need the tool name from decision | |
| # Re-running logic here is messy. | |
| # Better: run_ai_step should return enough info. | |
| # It returns "result" which is the output of use_tool. | |
| # But we don't know which tool it was easily unless we parse "action" or store it. | |
| # In run_ai_step I did: | |
| # tool_name = decision.get("tool_name") | |
| # kwargs = decision.get("args", {}) | |
| # Let's update run_ai_step to include tool_name in result wrapper? | |
| # No, let's just infer or update run_ai_step. | |
| # Actually, the loop in JS will call this. | |
| # I'll handle formatting here. | |
| pass | |
| return { | |
| "action": "ai_step_result", | |
| "data": step_data | |
| } | |
| if action == "select_suspect": | |
| return None | |
| if action == "next_round": | |
| if self.game.advance_round(): | |
| return { | |
| "action": "update_status", | |
| "data": { | |
| "round": self.game.round, | |
| "points": self.game.points | |
| } | |
| } | |
| else: | |
| return { | |
| "action": "game_over", | |
| "data": { | |
| "message": "COLD CASE. You ran out of time.", | |
| "verdict": False | |
| } | |
| } | |
| if action == "chat_message": | |
| suspect_id = payload.get("suspect_id") | |
| message = payload.get("message") | |
| response = self.game.question_suspect(suspect_id, message) | |
| suspect = next((s for s in self.game.scenario["suspects"] if s["id"] == suspect_id), None) | |
| suspect_name = next((s["name"] for s in self.game.scenario["suspects"] if s["id"] == suspect_id), "Suspect") | |
| # Clean text for ElevenLabs | |
| cleaned_response = response | |
| # Remove text within leading parentheses (e.g., "(A bit defensively)") | |
| if cleaned_response.strip().startswith('(') and ')' in cleaned_response: | |
| end_paren_idx = cleaned_response.find(')') | |
| if end_paren_idx != -1: | |
| cleaned_response = cleaned_response[end_paren_idx + 1:].strip() | |
| # Remove all asterisks | |
| cleaned_response = cleaned_response.replace('*', '') | |
| # Trim leading/trailing whitespace | |
| cleaned_response = cleaned_response.strip() | |
| # Generate Audio | |
| audio_b64 = None | |
| # Check Voice Enabled | |
| if self.voice_enabled and suspect and "voice_id" in suspect and cleaned_response: | |
| audio_bytes = self.game.voice_manager.generate_audio(cleaned_response, suspect["voice_id"]) | |
| if audio_bytes: | |
| audio_b64 = "data:audio/mpeg;base64," + base64.b64encode(audio_bytes).decode('utf-8') | |
| return { | |
| "action": "update_chat", | |
| "data": { | |
| "role": "suspect", | |
| "name": suspect_name, | |
| "content": response, | |
| "audio": audio_b64 | |
| } | |
| } | |
| if action == "use_tool": | |
| tool_name = payload.get("tool") | |
| arg = payload.get("input") # Default for single-input tools | |
| if tool_name == "accuse": | |
| result = self.game.make_accusation(payload.get("suspect_id")) | |
| if result["result"] == "win": | |
| return { | |
| "action": "game_over", | |
| "data": { | |
| "message": result["message"], | |
| "verdict": True | |
| } | |
| } | |
| elif result["result"] == "loss": | |
| return { | |
| "action": "game_over", | |
| "data": { | |
| "message": result["message"], | |
| "verdict": False | |
| } | |
| } | |
| else: | |
| return { | |
| "action": "round_failure", | |
| "data": { | |
| "message": result["message"], | |
| "eliminated_id": result["eliminated_id"], | |
| "round": result["new_round"], | |
| "points": result["new_points"] | |
| } | |
| } | |
| kwargs = {} | |
| if tool_name == "get_location": | |
| kwargs = {"phone_number": arg} | |
| elif tool_name == "call_alibi": | |
| # Support both simple string (old) and structured (new) | |
| if "alibi_id" in payload: | |
| arg = payload.get("alibi_id") # Update arg for formatter | |
| kwargs = { | |
| "alibi_id": arg, | |
| "question": payload.get("question") | |
| } | |
| else: | |
| kwargs = {"phone_number": arg} # Fallback | |
| elif tool_name == "get_dna_test": | |
| kwargs = {"evidence_id": arg} | |
| elif tool_name == "get_footage": | |
| kwargs = {"location": arg} | |
| result = self.game.use_tool(tool_name, **kwargs) | |
| if "error" in result: | |
| return { | |
| "action": "tool_error", | |
| "data": {"message": result["error"]} | |
| } | |
| # Format the result nicely | |
| evidence_data = format_tool_response(tool_name, arg, result, self.game.scenario) | |
| # Include updated points and unlocks in response | |
| evidence_data["updated_points"] = self.game.points | |
| evidence_data["unlocked_evidence"] = self.game.unlocked_evidence | |
| if "newly_unlocked" in result and result["newly_unlocked"]: | |
| evidence_data["newly_unlocked"] = result["newly_unlocked"] | |
| return { | |
| "action": "add_evidence", | |
| "data": evidence_data | |
| } | |
| return None | |
| def format_tool_response(tool_name, arg, result, scenario): | |
| """Formats tool output into HTML and finds associated suspect.""" | |
| suspect_id = None | |
| suspect_name = None | |
| html = "" | |
| title = f"Tool: {tool_name}" | |
| # Helpers to find suspect | |
| def find_by_phone(phone): | |
| clean_input = "".join(filter(str.isdigit, str(phone))) | |
| for s in scenario["suspects"]: | |
| s_phone = "".join(filter(str.isdigit, str(s.get("phone_number", "")))) | |
| if clean_input and s_phone.endswith(clean_input): | |
| return s | |
| return None | |
| def find_by_name(name): | |
| for s in scenario["suspects"]: | |
| if s["name"].lower() == name.lower(): | |
| return s | |
| return None | |
| def find_by_alibi_id(aid): | |
| for s in scenario["suspects"]: | |
| if s.get("alibi_id") == aid: | |
| return s | |
| return None | |
| # Logic per tool | |
| if tool_name == "get_location": | |
| suspect = find_by_phone(arg) | |
| if suspect: | |
| suspect_id = suspect["id"] | |
| suspect_name = suspect["name"] | |
| title = f"π Location Data" | |
| if "history" in result: | |
| html += "<ul>" | |
| for entry in result["history"]: | |
| html += f"<li>{entry}</li>" | |
| html += "</ul>" | |
| elif "description" in result: | |
| html += f"<div><strong>Time:</strong> {result.get('timestamp')}</div>" | |
| html += f"<div><strong>Loc:</strong> {result.get('description')}</div>" | |
| elif "error" in result: | |
| html += f"<div style='color:red'>{result['error']}</div>" | |
| else: | |
| html += str(result) | |
| elif tool_name == "call_alibi": | |
| suspect = find_by_phone(arg) # Try phone first | |
| if not suspect: | |
| suspect = find_by_alibi_id(arg) # Try ID | |
| if suspect: | |
| suspect_id = suspect["id"] | |
| suspect_name = suspect["name"] | |
| title = f"π Alibi Check" | |
| if "error" in result: | |
| html += f"<div style='color:red'>{result['error']}</div>" | |
| else: | |
| html += f"<div><strong>Contact:</strong> {result.get('contact_name')}</div>" | |
| html += f"<div style='margin-top:5px; font-style:italic;'>\"{result.get('response')}\"</div>" | |
| html += f"<div style='font-size:0.8em; color:#555'>Confidence: {result.get('confidence')}</div>" | |
| elif tool_name == "get_dna_test": | |
| # Get the label for the evidence item | |
| evidence_label = scenario["evidence"]["dna_evidence"].get(arg, {}).get("label", arg) | |
| title = f"𧬠DNA Result for {evidence_label}" | |
| if "matches" in result: | |
| # Multiple matches | |
| html += f"<div><strong>Mixed Sample:</strong></div><ul>" | |
| for name in result["matches"]: | |
| html += f"<li>{name}</li>" | |
| html += "</ul>" | |
| html += f"<div><strong>Notes:</strong> {result.get('notes')}</div>" | |
| # Note: We don't auto-assign a suspect_id for mixed results to avoid cluttering one card | |
| else: | |
| # Single match | |
| if "primary_match" in result and result["primary_match"] != "Unknown": | |
| suspect = find_by_name(result["primary_match"]) | |
| if suspect: | |
| suspect_id = suspect["id"] | |
| suspect_name = suspect["name"] | |
| if "error" in result: | |
| html += f"<div style='color:red'>{result['error']}</div>" | |
| else: | |
| html += f"<div><strong>Match:</strong> {result.get('primary_match')}</div>" | |
| html += f"<div><strong>Confidence:</strong> {result.get('confidence')}</div>" | |
| html += f"<div><strong>Notes:</strong> {result.get('notes')}</div>" | |
| elif tool_name == "get_footage": | |
| title = "πΉ Security Footage" | |
| if "error" in result: | |
| html += f"<div style='color:red'>{result['error']}</div>" | |
| else: | |
| html += f"<div><strong>Cam:</strong> {result.get('location')}</div>" | |
| html += f"<div><strong>Time:</strong> {result.get('time_range')}</div>" | |
| html += f"<div><strong>Visible:</strong> {', '.join(result.get('visible_people', []))}</div>" | |
| html += f"<div><strong>Detail:</strong> {result.get('key_details')}</div>" | |
| else: | |
| html = str(result) | |
| return { | |
| "title": title, | |
| "html_content": html, | |
| "suspect_id": suspect_id, | |
| "suspect_name": suspect_name | |
| } | |
| session = GameSession() | |
| # --- Gradio App --- | |
| def get_game_iframe(): | |
| with open("ui/templates/game_interface.html", "r") as f: | |
| html_content = f.read() | |
| html_content = html_content.replace('../static/', '/static/') | |
| html_content_escaped = html_content.replace('"', '"') | |
| # Iframe is hidden initially | |
| iframe = f""" | |
| <iframe | |
| id="game-iframe" | |
| srcdoc="{html_content_escaped}" | |
| style="width: 100%; height: 50vh; border: none;" | |
| allow="autoplay; fullscreen" | |
| ></iframe> | |
| """ | |
| return iframe | |
| def start_game_from_ui(case_name, mode, voice): | |
| difficulty = "medium" | |
| if "Coffee" in case_name: difficulty = "easy" | |
| if "Gallery" in case_name: difficulty = "hard" | |
| mode_slug = "spectator" if "Spectator" in mode else "interactive" | |
| init_data = session.start(difficulty, mode_slug, voice) | |
| # Extract data for tools | |
| phones = [s["phone_number"] for s in init_data["data"]["scenario"]["suspects"]] | |
| cameras = init_data["data"]["available_cameras"] | |
| suspects = [s["name"] for s in init_data["data"]["scenario"]["suspects"]] | |
| # Return visible updates | |
| return ( | |
| gr.update(visible=False), # Hide selector row | |
| gr.update(visible=True), # Show game frame | |
| json.dumps(init_data), # Send init data to bridge | |
| gr.update(choices=phones, value=phones[0] if phones else None), | |
| gr.update(choices=cameras, value=cameras[0] if cameras else None), | |
| gr.update(choices=suspects, value=suspects[0] if suspects else None) | |
| ) | |
| css = """ | |
| #bridge-input, #bridge-output, #log-input { display: none !important; } | |
| /* Remove viewport-based height constraints */ | |
| .gradio-container { | |
| padding: 0 !important; | |
| max-width: 100% !important; | |
| display: flex; | |
| flex-direction: column; | |
| } | |
| /* Fixed height for game container */ | |
| #game-frame-container { | |
| height: 70vh !important; | |
| min-height: 500px; | |
| max-height: 800px; | |
| border: none; | |
| overflow: hidden; | |
| padding: 0; | |
| margin-bottom: 2rem; | |
| } | |
| #game-frame-container > .html-container { | |
| height: 100% !important; | |
| display: flex; | |
| flex-direction: column; | |
| } | |
| #game-frame-container .prose { | |
| height: 100% !important; | |
| max-width: 100% !important; | |
| } | |
| /* Setup screen styling */ | |
| #setup-container { | |
| max-width: 800px; | |
| margin: 2rem auto; | |
| padding: 2rem; | |
| } | |
| footer { display: none !important; } | |
| """ | |
| with gr.Blocks(title="Murder.Ai") as demo: | |
| gr.HTML(f"<style>{css}</style>") | |
| # --- Initial Setup Screen --- | |
| with gr.Row(elem_id="setup-container", visible=True) as setup_col: | |
| with gr.Column(): | |
| gr.Markdown("# π΅οΈ MURDER.AI") | |
| gr.Markdown("### 1. Select Case File") | |
| case_dropdown = gr.Dropdown( | |
| choices=["The Silicon Valley Incident (Medium)", "The Coffee Shop Murder (Easy)", "The Gallery Heist (Hard)"], | |
| value="The Silicon Valley Incident (Medium)", | |
| show_label=False | |
| ) | |
| gr.Markdown("### 2. Game Configuration") | |
| with gr.Row(): | |
| game_mode = gr.Radio(["Interactive", "AI Spectator (Beta)"], value="Interactive", label="Game Mode") | |
| voice_toggle = gr.Checkbox(value=True, label="Enable Voice (ElevenLabs)") | |
| gr.Markdown("### 3. Investigation") | |
| start_btn = gr.Button("π OPEN CASE FILE", variant="primary", size="lg") | |
| # Game Frame (Hidden Initially) | |
| with gr.Group(visible=False, elem_id="game-frame-container") as game_group: | |
| game_html = gr.HTML(value=get_game_iframe()) | |
| bridge_input = gr.Textbox(elem_id="bridge-input", visible=True) | |
| bridge_output = gr.Textbox(elem_id="bridge-output", visible=True) | |
| log_input = gr.Textbox(elem_id="log-input", visible=True) # Input from JS for logs | |
| # Log Box | |
| with gr.Accordion("System Logs (MCP Traffic)", open=False): | |
| with gr.Row(): | |
| refresh_logs_btn = gr.Button("π Refresh Logs", scale=0) | |
| auto_refresh = gr.Checkbox(label="Auto-refresh (1s)", value=False, scale=0) | |
| log_box = gr.Textbox(label="Traffic", lines=10, max_lines=10, interactive=False, autoscroll=True, elem_id="visible-log-box") | |
| # Log Polling | |
| log_timer = gr.Timer(1, active=False) | |
| def poll_logs(): | |
| return "".join(LOG_BUFFER) | |
| log_timer.tick(fn=poll_logs, outputs=log_box) | |
| refresh_logs_btn.click(fn=poll_logs, outputs=log_box) | |
| def toggle_timer(active): | |
| return gr.Timer(active=active) | |
| auto_refresh.change(fn=toggle_timer, inputs=auto_refresh, outputs=log_timer) | |
| # --- Footer / Showcase --- | |
| gr.Markdown("---") | |
| gr.Markdown("### π₯ How to Play (Demo)") | |
| gr.HTML('<iframe width="100%" height="400" src="https://www.youtube.com/embed/uPPwrhlSzdA" frameborder="0" allow="autoplay; encrypted-media" allowfullscreen></iframe>') | |
| gr.Markdown("# π οΈ Try MCP Tools Directly (Click Open Case Files first)") | |
| gr.Markdown("*Note: Start a game first to populate these tools.*") | |
| with gr.Tabs(): | |
| with gr.Tab("π Location"): | |
| loc_phone = gr.Radio(label="Select Phone Number", choices=[], interactive=True) | |
| loc_btn = gr.Button("Get Location") | |
| loc_out = gr.JSON(label="Result") | |
| with gr.Tab("πΉ Footage"): | |
| foot_cam = gr.Radio(label="Select Camera", choices=[], interactive=True) | |
| foot_btn = gr.Button("Get Footage") | |
| foot_out = gr.JSON(label="Result") | |
| with gr.Tab("𧬠DNA"): | |
| dna_id = gr.Textbox(label="Evidence ID") | |
| dna_btn = gr.Button("Test DNA") | |
| dna_out = gr.JSON(label="Result") | |
| with gr.Tab("π Alibi"): | |
| alibi_id = gr.Textbox(label="Alibi ID") | |
| alibi_q = gr.Textbox(label="Question", value="Where were they?") | |
| alibi_btn = gr.Button("Call Alibi") | |
| alibi_out = gr.JSON(label="Result") | |
| with gr.Tab("π¬ Interrogate"): | |
| int_suspect = gr.Radio(label="Select Suspect", choices=[], interactive=True) | |
| int_q = gr.Textbox(label="Question", value="Where were you?") | |
| int_btn = gr.Button("Interrogate") | |
| int_out_text = gr.Markdown(label="Response") | |
| int_out_audio = gr.Audio(label="Voice Response", autoplay=True) | |
| # --- Event Handlers for Tools --- | |
| def wrap_tool(tool_name, *args): | |
| if not session.game: return {"error": "Start game first"} | |
| kwargs = {} | |
| if tool_name == "get_location": kwargs = {"phone_number": args[0]} | |
| elif tool_name == "get_footage": kwargs = {"location": args[0]} | |
| elif tool_name == "get_dna_test": kwargs = {"evidence_id": args[0]} | |
| elif tool_name == "call_alibi": kwargs = {"alibi_id": args[0], "question": args[1]} | |
| return session.game.use_tool(tool_name, **kwargs) | |
| loc_btn.click(lambda p: wrap_tool("get_location", p), inputs=loc_phone, outputs=loc_out) | |
| foot_btn.click(lambda c: wrap_tool("get_footage", c), inputs=foot_cam, outputs=foot_out) | |
| dna_btn.click(lambda e: wrap_tool("get_dna_test", e), inputs=dna_id, outputs=dna_out) | |
| alibi_btn.click(lambda i, q: wrap_tool("call_alibi", i, q), inputs=[alibi_id, alibi_q], outputs=alibi_out) | |
| def wrap_chat(suspect_name, question): | |
| if not session.game: return "Start game first", None | |
| # Find ID from name | |
| s_id = next((s["id"] for s in session.game.scenario["suspects"] if s["name"] == suspect_name), None) | |
| if not s_id: return "Suspect not found", None | |
| resp = session.game.question_suspect(s_id, question) | |
| # Audio | |
| audio_path = None | |
| suspect = next((s for s in session.game.scenario["suspects"] if s["id"] == s_id), None) | |
| # Clean text | |
| cleaned = resp | |
| if cleaned.strip().startswith('(') and ')' in cleaned: | |
| cleaned = cleaned[cleaned.find(')')+1:].strip() | |
| cleaned = cleaned.replace('*', '').strip() | |
| if suspect and "voice_id" in suspect and cleaned: | |
| audio_bytes = session.game.voice_manager.generate_audio(cleaned, suspect["voice_id"]) | |
| if audio_bytes: | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: | |
| fp.write(audio_bytes) | |
| audio_path = fp.name | |
| return resp, audio_path | |
| int_btn.click(wrap_chat, inputs=[int_suspect, int_q], outputs=[int_out_text, int_out_audio]) | |
| # Start Game Event | |
| start_btn.click( | |
| fn=start_game_from_ui, | |
| inputs=[case_dropdown, game_mode, voice_toggle], | |
| outputs=[setup_col, game_group, bridge_output, loc_phone, foot_cam, int_suspect] | |
| ) | |
| # Bridge Logic with Logging (Legacy/Fallback) | |
| def bridge_logic_with_log(input_data, current_log): | |
| # ... existing logic ... | |
| return None, None # Disabled | |
| # Bridge Logic (Python -> JS) | |
| bridge_output.change( | |
| None, | |
| inputs=[bridge_output], | |
| js=""" | |
| (data) => { | |
| if (!data) return; | |
| const iframe = document.querySelector('#game-frame-container iframe'); | |
| if (iframe && iframe.contentWindow) { | |
| iframe.contentWindow.postMessage(JSON.parse(data), '*'); | |
| } | |
| } | |
| """ | |
| ) | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |